Implementation of word2vec model based on https://rguigoures.github.io/word2vec_pytorch/ tutorial

In [None]:
import itertools
import nltk
nltk.download('brown')
from nltk.corpus import brown
import re
from numpy.random import multinomial

[nltk_data] Downloading package brown to /root/nltk_data...
[nltk_data]   Package brown is already up-to-date!


In [None]:
corpus = []

# faz-se a tokenização do dataset de noticias:
for cat in ["news"]:
  for text_id in brown.fileids(cat):
        raw_text = list(itertools.chain.from_iterable(brown.sents(text_id)))
        text = ' '.join(raw_text)
        text = text.lower()
        text.replace('\n', ' ')
        text = re.sub('[^a-z ]+', '', text)
        corpus.append([w for w in text.split() if w != ''])

In [None]:
from collections import Counter
import random, math

Agora, vamos calcular as ocorrências das palavras no corpus e dps calcular a probabilidade de manter a palavra no corpus, que é definida por:
$$P(w_i) = \frac{10^{-3}}{p_i}(\sqrt{10^3p_i} + 1)$$

In [None]:
def subsample_frequent_words(corpus):
  filtered_corpus = []
  word_counts = dict(Counter(list(itertools.chain.from_iterable(corpus))))
  total = sum(list(word_counts.values()))
  # proportion of each word in corpus
  word_counts = {word: word_counts[word]/float(total) for word in word_counts}

  for text in corpus:
    filtered_corpus.append([])
    for word in text:
      if random.random() < (1+math.sqrt(word_counts[word]*1e3) * 1e-3 / word_counts[word]):
        filtered_corpus[-1].append(word)
  return filtered_corpus

In [None]:
corpus = subsample_frequent_words(corpus)
vocabulary = set(itertools.chain.from_iterable(corpus))

word_to_index = {word: index for (index, word) in enumerate(vocabulary)}
index_to_word = {index: word for (index, word) in enumerate(vocabulary)}

Agora construindo o Bag of Words

In [None]:
import numpy as np

In [None]:
import random
def get_batches(context, batch_size = 100):
  random.shuffle(context)
  batches = []
  batch_target, batch_meaning, batch_negative = [], [], []
  for i in range(len(context)):

    batch_target.append(word_to_index[context[i][0]])
    batch_meaning.append(word_to_index[context[i][1]])
    batch_negative.append([word_to_index[w] for w in context[i][2]])

    if (i+1) % batch_size or i == len(context) - 1:
      tensor_target = autograd.Variable(torch.from_numpy(np.array(batch_target)).long())
      tensor_meaning = autograd.Variable(torch.from_numpy(np.array(batch_meaning)).long())
      tensor_negative = autograd.Variable(torch.from_numpy(np.array(batch_negative)).long())
      batches.append((tensor_target, tensor_meaning, tensor_negative))
      batch_target, batch_meaning, batch_negative = [], [], []
  return batches


In [None]:
def sample_negative(size):
  probability = {}
  word_counts = dict(Counter(list(itertools.chain.from_iterable(corpus))))
  normalizing_factor = sum([v**0.75 for v in word_counts.values()])

  for word in word_counts:
    probability[word] = word_counts[word]**0.75 / normalizing_factor

  words = np.array(list(word_counts.keys()))

  while True:
    word_list = []
    index = np.array(multinomial(size, list(probability.values())))
    for i, count in enumerate(index):
      for _ in range(count):
        word_list.append(words[i])
    print(word_list)
    yield word_list

In [None]:
context = []
window = 4
negative_samples = sample_negative(8)

In [None]:
negative_samples

<generator object sample_negative at 0x78e9536428f0>

In [None]:
for text in corpus:
  for i, word in enumerate(text):
    start = max(0, i-window)
    end = min(i+window, len(text))
    for j in range(start, end):
      if i != j:
        context.append((word,text[j], next(negative_samples)))

print(f"Tem-se um total de {len(context)} pares de palavras alvo e palavras contextuais")

# Criando de fato o word2vec

In [None]:
import torch
import torch.nn as nn
import torch.autograd as autograd
import torch.optim as optim
import torch.nn.functional as F

In [None]:
class Word2Vec(nn.Module):
  def __init__(self, embedding_size, vocab_size):
    super(Word2Vec, self).__init__()
    self.target_embeddings = nn.Embedding(vocab_size, embedding_size)
    self.context_embeddings = nn.Embedding(vocab_size, embedding_size)

  def forward(self, target_word, context_word, negative_example):
    emb_target = self.target_embeddings(target_word)
    emb_context = self.context_embeddings(context_word)
    emb_product = torch.mul(emb_target, emb_context)
    emb_product = torch.sum(emb_product, dim=1)
    output = torch.sum(F.logsigmoid(emb_product))
    emb_negative = self.context_embeddings(negative_example)
    emb_product = torch.bmm(emb_negative, emb_target.unsqueeze(2))
    emb_product = torch.sum(emb_product, dim=1)
    output += torch.sum(F.logsigmoid(-emb_product))
    return -output

Definição de uma função que para o aprendizado assim que a função de custo parar de decrescer significantemente

In [None]:
class EarlyStopping():
  def __init__(self, patience=5, min_percent_gain=0.1):
    self.patience = patience
    self.loss = []
    self.min_percent_gain = min_percent_gain/100.

  def update_loss(self, loss):
    self.loss.append(loss)
    if len(self.loss) > self.patience:
      del self.loss[0]

  def stop_training(self):
    if len(self.loss) == 1:
      return False

    gain = (max(self.loss) - min(self.loss)/max(self.loss))
    print(f"Ganho de custo: {round(100*gain,2)}")
    return gain < self.min_percent_gain

Etapa de aprendizado

In [None]:
vocab_size = len(vocabulary)

w2v = Word2Vec(embedding_size= 200, vocab_size = vocab_size)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(w2v.parameters())
early_stopping = EarlyStopping()
context_tensor = []

In [None]:
for target, meaning in context:
  target_tensor = autograd.Variable(torch.LongTensor([word_to_index[target]]))
  meaning_tensor = autograd.Variable(torch.LongTensor([word_to_index[meaning]]))
  context_tensor.append((target_tensor, meaning_tensor))

In [None]:
while True:
  losses = []
  for target_tensor, meaning_tensor in context_tensor:
    w2v.zero_grad()
    log_probs = w2v(meaning_tensor)
    loss = loss_function(log_probs, target_tensor)
    loss.backward()
    optimizer.step()
    losses.append(loss.data)
  print("loss: ", np.mean(losses))
  early_stopping.update_loss(np.mean(losses))
  if early_stopping.stop_training():
    break

  output = F.log_softmax(hidden)


loss:  11.132483
loss:  11.2609005
Ganho de custo: 1027.23
loss:  11.245563
Ganho de custo: 1027.23
loss:  11.225306
Ganho de custo: 1027.23
