In [None]:
!pip install -U pip setuptools wheel
!pip install -U 'spacy[cuda11x,transformers,lookups]'
!python -m spacy download ru_core_news_lg
!pip install transformers

In [None]:
!python -m spacy download en_core_web_trf

In [None]:
import numpy as np
from transformers import AutoTokenizer, AutoModel
import torch
import spacy

spacy.prefer_gpu()
nlp = spacy.load("ru_core_news_lg")

In [None]:
def custom_annotation_setter(docs, trf_data):
    doc_data = list(trf_data.doc_data)
    for doc, data in zip(docs, doc_data):
        doc._.custom_attr = data

nlp = spacy.load("en_core_web_trf")
nlp.get_pipe("transformer").set_extra_annotations = custom_annotation_setter
doc = nlp("This is a text")
assert isinstance(doc._.custom_attr, TransformerData)
print(doc._.custom_attr.tensors)

In [None]:



#Mean Pooling - Take attention mask into account for correct averaging


#Load AutoModel from huggingface model repository
tokenizer = AutoTokenizer.from_pretrained("ai-forever/sbert_large_nlu_ru")
model = AutoModel.from_pretrained("ai-forever/sbert_large_nlu_ru")

sentences = ['Привет! Как твои дела?',
             'А правда, что 42 твое любимое число?']

#Tokenize sentences
encoded_input = tokenizer(sentences, padding=True, truncation=True, max_length=24, return_tensors='pt')

#Compute token embeddings
with torch.no_grad():
    model_output = model(**encoded_input)

#Perform pooling. In this case, mean pooling
sentence_embeddings = mean_pooling(model_output, encoded_input['attention_mask'])


In [None]:
model_output[0].shape

In [None]:
def _mean_pooling(self, model_output, attention_mask):
  token_embeddings = model_output[0] #First element of model_output contains all token embeddings
  input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
  sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
  sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
  return sum_embeddings / sum_mask

In [None]:
encoded_input['attention_mask']

In [None]:
model2core = {
    'sbert': 'ai-forever/sbert_large_nlu_ru',
    'spacy': 'en_core_web_sm'
}

In [None]:
def return_vector(name_model, text):
  if name_model == 'sbert':
    tokenizer = AutoTokenizer.from_pretrained("ai-forever/sbert_large_nlu_ru")
    sbert_model = AutoModel.from_pretrained("ai-forever/sbert_large_nlu_ru")
  if name_model == 'spacy':
    pass


In [None]:
class Place:
  def __init__(self,  tags, vec_model, text_tokenizer):
    '''
    tags = tags for each place or text prompt
    vec_model = Language Model for getting tags embeddings
    '''
    self.vec_model = vec_model
    self.text_tokenizer = text_tokenizer
    self.tags = tags
    self.embs = torch.tensor([])


  def compute_embeddings(self, is_return=False):
    tensor_tags = self.text_tokenizer(self.tags, padding=True, truncation=True, max_length=24, return_tensors='pt')
    with torch.no_grad():
      model_out = self.vec_model(**tensor_tags)
    self.embs = self._mean_pooling(model_out).squeeze()
    if is_return:
      return self.embs



  def _mean_pooling(self, model_output, attention_mask):
    token_embeddings = model_output[0] #First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
    sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    return sum_embeddings / sum_mask

In [None]:
def get_cos_sim(self, idx, return_max=True):
    '''
    func get index of person and return cos_sim of person with every man from BD
    '''
    man = self.all_embeddings[idx]
    indices = torch.tensor([i for i in range(self.all_embeddings.shape[0]) if i != idx])
    other_embeddings = torch.index_select(self.all_embeddings, 0, indices)
    cosine_arr = cos(man, other_embeddings)
    if return_max:
      max_sim_idx = torch.topk(cosine_arr, 5).indices
      max_sim_person = indices[max_sim_idx]
      return max_sim_person
    min_sim_idx = torch.topk(1/(cosine_arr*100), 5).indices #I need get min topk but I lazy and I solve invert numbers(I dont want to search min topk method)
    min_sim_person = indices[min_sim_idx]
    return min_sim_person


In [None]:
class User:
  def __init__(self,  tags, prompt, vec_model, text_tokenizer, liked_history=None):
    self.vec_model = vec_model
    self.text_tokenizer = text_tokenizer
    self.prompt = prompt
    self.tags = tags
    self.cos = torch.nn.CosineSimilarity(dim=0)
    self.liked_history = liked_history
    tensor_tags = self.text_tokenizer(tags, padding=True, truncation=True, max_length=24, return_tensors='pt')
    tensor_prompt = self.text_tokenizer(prompt, padding=True, truncation=True, max_length=50, return_tensors='pt')
    with torch.no_grad():
      model_out_tags = self.vec_model(**tensor_tags)
      model_out_prompt = self.vec_model(**tensor_prompt)
    embs_tags = self._mean_pooling(model_out_tags).squeeze()
    embs_prompt = self._mean_pooling(model_out_prompt).squeeze()
    self.user_embs = (embs_tags + embs_prompt) / 2


  def get_topk_rec(self, idx, place_embeddings, k=5):
    '''
    func get index of person and return cos_sim of person with every man from BD
    '''
    man = self.all_embeddings[idx]
    indices = torch.tensor([i for i in range(self.place_embeddings.shape[0])])
    other_embeddings = torch.index_select(self.all_embeddings, 0, indices)
    cosine_arr = self.cos(man, place_embeddings)
    max_sim_idx = torch.topk(cosine_arr, k).indices
    max_sim_person = indices[max_sim_idx]
    return max_sim_person





  def _mean_pooling(self, model_output, attention_mask):
    token_embeddings = model_output[0] #First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
    sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    return sum_embeddings / sum_mask