In [1]:
!pip install datasets --quiet

In [2]:
### importing libraries
from torch.utils.data import DataLoader, Dataset
import numpy as np
import random

from tqdm.notebook import tqdm
from tensorflow.keras.preprocessing.text import Tokenizer
from datasets import load_dataset

In [3]:
### load dataset from huggigface
dataset_id = "rahular/simple-wikipedia"

def tokenize_dataset(dataset_id):
  """load the datasets from huggingface tokenized using keras tokenizer"""

  dataset = load_dataset(dataset_id, split="train", streaming=True)
  data_list = [data["text"] for data in dataset]
  tokenizer = Tokenizer(oov_token="<UNK>")
  tokenizer.fit_on_texts(data_list)
  tokenized_text = tokenizer.texts_to_sequences(data_list)
  fraction = int(0.1 * len(tokenized_text))
  tokenized_text = tokenized_text[:fraction]
  return tokenizer, tokenized_text


def get_data(tokenized_text, bucket_size):
  """prepare the datasets"""

  tokenized_text = [word for sublist in tokenized_text for word in sublist]
  print(len(tokenized_text))
  targets = []
  contexts = []

  for i in range(len(tokenized_text)):
    target = tokenized_text[i]
    context_start = max(0, i - bucket_size)
    context_end = min(len(tokenized_text), i + bucket_size + 1)

    context_word = [tokenized_text[j] for j in range(context_start, context_end) if j != i]

    if len(context_word) != 2*bucket_size:
      if context_start == 0:
        context_word = [0] * (2*bucket_size - len(context_word)) + context_word

      elif context_end == len(tokenized_text):
        context_word = context_word + [0] * (2*bucket_size - len(context_word))

    targets.append(target)
    contexts.append(context_word)

  return [targets, contexts]


class Datasets(Dataset):
  def __init__(self, dataset):
    self.dataset = dataset

  def __len__(self):
    return len(self.dataset[0])

  def __getitem__(self, idx):
    return np.array(self.dataset[0][idx]), np.array(self.dataset[1][idx])

tokenizer, tokenized_data = tokenize_dataset(dataset_id)
print(len(tokenized_data))
prepared_data = Datasets(get_data(tokenized_data, 10))
dataloader = DataLoader(prepared_data, batch_size=128, shuffle=True)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


76976
3167640


In [4]:
class SkipGram:
  def __init__(self, vocab_size, embedding_dim):
    self.vocab_size = vocab_size
    self.embedding_dim = embedding_dim
    self.word_embedding = np.random.randn(vocab_size, embedding_dim) / np.sqrt(embedding_dim)
    self.output_weights = np.random.randn(vocab_size, embedding_dim) / np.sqrt(embedding_dim)

  def softmax(self, x):
    """"create softmax probabilities"""
    exp = np.exp(x - np.max(x, axis=1, keepdims=True))
    return exp / np.sum(exp, axis=1, keepdims=True)

  def sigmoid(self, x):
    """create sigmoid function for numerical stability"""
    return np.where(x >= 0, 1 / (1 + np.exp(-x)), np.exp(x) / (1 + np.exp(x)))

  def forward_pass(self, input_word):
    """forward pass"""

    word_vec = self.word_embedding[input_word]
    logits = np.dot(self.output_weights, word_vec)
    prob = self.softmax(logits)
    return prob

  def negative_sampling_loss(self, input_word, context_words, num_negative_samples=5):
    """loss function to minimze"""

    input_vec = self.word_embedding[input_word]

    loss = 0
    for context_word in context_words:
      context_vec = self.output_weights[context_word]
      pos_score = np.dot(input_vec, context_vec)
      pos_loss = -np.log(self.sigmoid(pos_score))
      loss += pos_loss

    for _ in range(len(context_words) * num_negative_samples):
      negative_word = np.random.randint(0, self.vocab_size)

      while ((negative_word in context_words) and (negative_word == input_word)):
        negative_word = np.random.randint(0, self.vocab_size)

      negative_vec = self.output_weights[negative_word]
      neg_score = np.dot(input_vec, negative_vec)
      neg_loss = -np.log(self.sigmoid(-neg_score))
      loss += neg_loss

    return loss


  def update_weights(self, input_word, context_words, learning_rate=0.01, num_negative_samples=5):
    """update the weights"""

    target_vec = self.word_embedding[input_word]

    for context_word in context_words:
      context_vec = self.output_weights[context_word]
      pos_score = np.dot(target_vec, context_vec)
      grad = self.sigmoid(pos_score) - 1

      # update the wieghts
      self.word_embedding[input_word] -= learning_rate * grad * context_vec
      self.output_weights[context_word] -= learning_rate * grad * target_vec


    for _ in range(len(context_words) * num_negative_samples):
      negative_word = np.random.randint(0, self.vocab_size)
      while ((negative_word in context_words) and (negative_word == input_word)):
        negative_word = np.random.randint(0, self.vocab_size)

      negative_vec = self.output_weights[negative_word]
      neg_score = np.dot(target_vec, negative_vec)
      grad = self.sigmoid(-neg_score)

      # update the weights
      self.word_embedding[input_word] += learning_rate * grad * negative_vec
      self.output_weights[negative_word] += learning_rate * grad * target_vec


  def train(self, dataloader, epochs=10, window_size=5, learning_rate=0.001):
    """Training the model on input data"""
    for epoch in range(epochs):
      total_loss = 0

      train_loader = tqdm(dataloader, desc=f"Epoch {epoch} Training")
      for batch, (input_word, context_words) in enumerate(train_loader):
        for i in range(len(input_word)):
          target = input_word[i]
          context = context_words[i]

          loss = self.negative_sampling_loss(target, context)
          total_loss += loss

          self.update_weights(target, context, learning_rate)

        train_loader.set_postfix({"loss": total_loss / (batch + 1)})


  def get_word_embeeding(self, word_index):
    """get the word embedding"""

    return self.word_embedding[word_index]

In [5]:
vocab_size = len(tokenizer.word_index)

In [6]:
skip_gram = SkipGram(vocab_size, 100)

In [8]:
skip_gram.train(dataloader)

Epoch 0 Training:   0%|          | 0/24748 [00:00<?, ?it/s]

KeyboardInterrupt: 