## Setup

In [None]:
!pip install gradio
!pip install git+https://github.com/UKPLab/sentence-transformers.git
!pip install tensorflow-text

In [2]:
# import gradio as gr
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import tensorflow_text

In [3]:
meta_data = pd.read_csv("/content/drive/MyDrive/DS310_Final/meta_final.csv")
# meta_data.head()

In [4]:
from typing import List, Union
from sentence_transformers import SentenceTransformer
import torch
import pickle

class SentenceTransformerBackend():

    def __init__(self, embedding_model: Union[str, SentenceTransformer]):
        super().__init__()

        if isinstance(embedding_model, SentenceTransformer):
            self.embedding_model = embedding_model
        elif isinstance(embedding_model, str):
            self.embedding_model = SentenceTransformer(embedding_model)
        else:
            raise ValueError("Please select a correct SentenceTransformers model: \n"
                             "`from sentence_transformers import SentenceTransformer` \n"
                             "`model = SentenceTransformer('all-MiniLM-L6-v2')`")

    def embed(self,
              documents: List[str],
              verbose: bool = False) -> np.ndarray:
        embeddings = self.embedding_model.encode(documents, show_progress_bar=verbose)
        embeddings = torch.from_numpy(embeddings)
        self.embed_matrix = embeddings
        return embeddings

In [5]:
from tqdm import tqdm
import nltk
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import re

from gensim.models import KeyedVectors
from gensim.models.word2vec import Word2Vec

# from sklearn.feature_extraction.text import TfidfVectorizer


nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')

STOPWORDS = set(stopwords.words('english'))
MIN_WORDS = 4
MAX_WORDS = 200


class Word2Vec_SenEmbed():
    def __init__(self, stopwords=STOPWORDS, min_df=5, max_df=0.95, max_features=8000, word2vec_model=None, path=None):
        self.min_df = min_df
        self.max_df = max_df
        self.max_features = max_features
        self.stopwords = stopwords
        # if not word2vec_model:
        #     self.word2vec = word2vec_model
        # else:
        #     self.load_embeddings_matrix(path)
        EMBEDDING_FILE = '/content/drive/MyDrive/DS310_Final/GoogleNews-vectors-negative300.bin.gz'
        self.google_word2vec = KeyedVectors.load_word2vec_format(EMBEDDING_FILE, binary=True)
    def embed(self, corpus, verbose=False):
        token_stop = self.tokenizer(' '.join(STOPWORDS), lemmatize=False)
        corpus = [self.tokenizer(sentence) for sentence in corpus]
        embedding_matrices = []
        for sentence in corpus:
            embedding_matrix = self.sentence_embedding(sentence, self.google_word2vec)
            embedding_matrices.append(embedding_matrix)

        embedding_matrices = np.array(embedding_matrices)
        embed_matrix_tensor = torch.from_numpy(embedding_matrices)
        self.embed_matrix = embed_matrix_tensor
        return embed_matrix_tensor

    # def load_embeddings_matrix(self, path):
    #     with open(path, 'rb') as f:
    #         embed_matrix = pickle.load(f)
    #         self.embed_matrix = torch.tensor(embed_matrix, dtype=float)

    def tokenizer(self, sentence, min_words=MIN_WORDS, max_words=MAX_WORDS, stopwords=STOPWORDS, lemmatize=True):
        if lemmatize:
            stemmer = WordNetLemmatizer()
            tokens = [stemmer.lemmatize(w) for w in word_tokenize(sentence)]
        else:
            tokens = [w for w in word_tokenize(sentence)]
        token = [w for w in tokens if (len(w) > min_words and len(w) < max_words
                                                            and w not in stopwords)]
        return tokens


    def sentence_embedding(self, sentence, word2vec_model):
    # Calculate the mean of word vectors for words present in the Word2Vec model
        return np.mean([word2vec_model[word] for word in sentence if word in word2vec_model], axis=0)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...


In [6]:
from sklearn.feature_extraction.text import TfidfVectorizer


nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')

STOPWORDS = set(stopwords.words('english'))
MIN_WORDS = 4
MAX_WORDS = 200


class TFIDF_SenEmbed():
    def __init__(self, stopwords=STOPWORDS, min_df=5, max_df=0.95, max_features=8000):
        self.min_df = min_df
        self.max_df = max_df
        self.max_features = max_features
        self.stopwords = stopwords

    def embed(self, corpus, verbose=False):
        token_stop = self.tokenizer(' '.join(STOPWORDS), lemmatize=False)
        self.vectorizer = TfidfVectorizer(stop_words=token_stop,
                                     tokenizer=self.tokenizer,
                                     min_df=self.min_df,
                                     max_df=self.max_df,
                                     max_features=self.max_features)

        embed_matrix = self.vectorizer.fit_transform(corpus)
        embed_matrix_dense = embed_matrix.toarray()
        embed_matrix_tensor = torch.from_numpy(embed_matrix_dense)
        self.embed_matrix = embed_matrix_tensor
        return embed_matrix_tensor

    def tfidf_transform(self):
      return self.vectorizer

    # def load_embeddings_matrix(self, path):
    #     with open(path, 'rb') as f:
    #         embed_matrix = pickle.load(f)
    #         self.embed_matrix = torch.tensor(embed_matrix, dtype=float)

    def tokenizer(self, sentence, min_words=MIN_WORDS, max_words=MAX_WORDS, stopwords=STOPWORDS, lemmatize=True):
        if lemmatize:
            stemmer = WordNetLemmatizer()
            tokens = [stemmer.lemmatize(w) for w in word_tokenize(sentence)]
        else:
            tokens = [w for w in word_tokenize(sentence)]
        token = [w for w in tokens if (len(w) > min_words and len(w) < max_words
                                                            and w not in stopwords)]
        return tokens

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [7]:
with open('/content/drive/MyDrive/DS310_Final/embeddings/tfidf_vectorizer.pkl', 'rb') as f:
    vectorizer_tfidf = pickle.load(f)
word2vec_model = Word2Vec_SenEmbed()

## Build Gradio app

In [8]:
import gradio as gr
import time

from PIL import Image
from io import BytesIO
import requests
import os
import base64

# Replace these with your actual embedding model and metadata
# Assuming ST5 is an instance of your embedding model

def embed_query(query, model):
    if not isinstance(query, list):
        query = [query]
    query = [q.lower() for q in query]

    if model.__class__.__name__ == 'TfidfVectorizer':
      # print("helloooooo")
      query_embedding = torch.from_numpy(model.transform(query).toarray())
    else:
      query_embedding = model.embed(query)
    return query_embedding


def get_metadata(idx):
    return meta_data.iloc[idx]

def extract_best_indices(m, topk, mask=None):
    if len(m.shape) > 1:
        cos_sim = np.mean(m, axis=0)
    else:
        cos_sim = m
    index = np.argsort(cos_sim)[::-1]
    if mask is not None:
        assert mask.shape == m.shape
        mask = mask[index]
    else:
        mask = np.ones(len(cos_sim))
    mask = np.logical_or(cos_sim[index] != 0, mask)
    best_index = index[mask][:topk]

    # Return a list of tuples containing index and similarity score
    result = [(idx, cos_sim[idx]) for idx in best_index]
    return result

def choice_model(name_model):
  name_model = str(name_model)
  file_path = "/content/drive/MyDrive/DS310_Final/embeddings/"

  if name_model == 'sentence-transformers/sentence-t5-base':
      file_path += "sentence_t5_base.pkl"
  elif name_model == 'Muennighoff/SGPT-125M-weightedmean-nli-bitfit':
      file_path += "sgpt.pkl"
  else:
      file_path += name_model + '.pkl'

  with open(file_path, 'rb') as handle:
    embed_matrix = pickle.load(handle)

  if name_model != 'word2vec' and name_model != 'tfidf':
    model = SentenceTransformerBackend(name_model)
    model.embedding_model.max_seq_length = 512
  elif name_model == 'tfidf':
    model = vectorizer_tfidf
  else:
      model = word2vec_model

  return model, embed_matrix


def image_to_markdown(image):
    # Convert image to RGB mode
    if image.mode != 'RGB':
        image = image.convert('RGB')

    buffered = BytesIO()
    image.save(buffered, format="PNG")
    encoded_image = base64.b64encode(buffered.getvalue()).decode("utf-8")
    return f"![Image](data:image/png;base64,{encoded_image})"

def predict(*args):

    query = args[:-2]
    query = list(query)
    slider = args[-2]
    name_model = str(args[-1])
    # print(query_embedding)
    # print(slider)
    # print(name_model)
    query = query[:int(slider)]
    model, embed_matrix = choice_model(name_model)

    start_time = time.time()

    query_embeddings = embed_query(query, model)
    query_embeddings = torch.mean(query_embeddings, dim=0).reshape(1, -1)
    cos_sim = cosine_similarity(query_embeddings, embed_matrix)
    topk_indices = extract_best_indices(cos_sim, topk=30)  # Assuming you have a function named extract_best_indices

    run_time = time.time() - start_time
    run_time = float("{:.2f}".format(run_time))

    # Prepare output
    results = []
    for index, score in topk_indices:
        metadata = get_metadata(index)
        title = metadata['title']
        author_name = metadata['author_name']
        description = metadata['description']  # Truncate description if too long
        rating = metadata['average_rating']
        image_url = metadata['image_url']
        response = requests.get(image_url)
        img = Image.open(BytesIO(response.content))
        # temp_image_path = "temp_image.png"
        # img.save(temp_image_path)

      # Convert image to Markdown
        image_markdown = image_to_markdown(img)

      # Remove the temporary image
        # os.remove(temp_image_path)
        results.append([image_markdown, title, author_name, description, rating, score])


    return results, run_time

  # Gradio Interface

def variable_inputs(k):
  k = int(k)
  return [gr.Textbox(visible=True)]*k + [gr.Textbox(visible=False)]*(10-k)

In [None]:
with gr.Blocks() as iface:
  with gr.Row():
    # input = gr.Textbox(placeholder="Enter query here", label="Query")
    models = gr.Dropdown(['tfidf','word2vec', 'bert-base-uncased', 'bert-base-multilingual-uncased', 'sentence-transformers/sentence-t5-base', 'Muennighoff/SGPT-125M-weightedmean-nli-bitfit'], label="Embedding Model")
    button = gr.Button("Search")
    slider = gr.Slider(minimum=0, maximum=10, value=0, step=1, label="user items")
    inputs = []
    for i in range(10):
        label = f"Query/Item {i+1}"
        t = gr.Textbox(label=label, visible=False)
        inputs.append(t)
    slider.change(variable_inputs, inputs=slider, outputs=inputs)
  with gr.Row():
    run_time = gr.Number(label="Run Time")

  with gr.Row():
    output = gr.Dataframe(
        headers=['Image', "Title", "Author", "Description", "Rating", "Score"],
        datatype=['markdown', "str", "str", "str", "number", "number"],
        wrap=True,
        interactive=True
    )


  button.click(fn=predict, inputs=[*inputs, slider, models], outputs=[output, run_time])

if __name__ == "__main__":
    iface.launch(show_api=False, debug=True)