In [1]:
import re
import numpy as np
from collections import defaultdict
import random
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from gensim.models import KeyedVectors
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import nltk
from tqdm import trange
nltk.download('punkt') # for text tokenization
nltk.download('stopwords')
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\ACER\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\ACER\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\ACER\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

In [31]:
class Vocabulary:
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.vocab_size = 0
        self.total_words = 0
        self.word_count = defaultdict(int)

    # Crawling through a sentence and counting each word, creating an index for each word and vice versa
    def build_vocab(self, sentence, min_count=2):
        for word in sentence:
            self.word_count[word] += 1
        idx = 0
        for word, count in self.word_count.items():
            if count >= min_count:
                self.word2idx.update({word: idx})
                idx += 1

        self.idx2word = {idx: word for word, idx in self.word2idx.items()}
        self.vocab_size = len(self.word2idx)
        self.total_words = sum(
            [count for word, count in self.word_count.items() if count >= min_count]
        )

    def word_to_index(self, word):
        return self.word2idx.get(word, -1)

    def index_to_word(self, index):
        return self.idx2word.get(index, None)

In [32]:
# generating (context_word,center_word) in a sentence which context_word is neighbor and center_word is center word in a window
def generate_training_data(vocab, sentence, window_size=2):
    training_data = []
    sentence_indices = [
        vocab.word_to_index(word)
        for word in sentence
        if vocab.word_to_index(word) != -1
    ]

    for center_idx, center_word in enumerate(sentence_indices):
        context_start = max(0, center_idx + window_size)
        context_end = min(len(sentence_indices), center_idx + window_size + 1)

        for context_idx in range(context_start, context_end):
            if center_idx != center_word:
                context_word = sentence_indices[context_idx]
                training_data.append((center_word, context_word))

    return np.array(training_data)

In [33]:
class Word2vec:
    def __init__(self, vocab_size, embed_size=100, learning_rate=0.001):
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.learning_rate = learning_rate
        # from input to hidden layer
        self.W = np.random.uniform(-0.5, 0.5, (vocab_size, embed_size))
        # from hidden to output
        self.W_prime = np.random.uniform(-0.5, 0.5, (embed_size, vocab_size))

    def softmax(self, x):
        x_exp = np.exp(x - np.max(x))
        return x_exp / np.sum(x_exp)

    def train(self, training_data, epochs=1000):
        for epoch in range(epochs):
            loss = 0
            for center_word, context_word in training_data:
                # input to hidden(input isnt written because its one-hot for center word)
                h = self.W[center_word]
                u = np.dot(h, self.W_prime)
                y_pred = self.softmax(u)

                y_true = np.zeros(self.vocab_size)
                y_true[context_word] = 1

                # dloss/dy_pred=y_true*dy_pred/y_pred, dpred/du=dsoftmax(u)
                error = y_pred - y_true
                # dloss/dw_prime=dloss/dy_pred*dypred/du*du/dw_prime
                self.W_prime -= self.learning_rate * np.outer(h, error)
                # dloss/dw=dloss/dy_pred*dy_pred/du*du/dh*dh/dw
                self.W[center_word] -= self.learning_rate * np.dot(
                    error, self.W_prime.T
                )

                # y_true isnt written because its one-hot for context word
                loss -= np.sum(np.log(y_pred[context_word]))

            if epoch % 100 == 0:
                print(f"Epoch :{epoch}, Loss:{loss}")

In [34]:
# preprocessing text and filtering stopwords
with open("persian.txt", "r", encoding="utf-8") as f:
    stop_words = set(f.read().splitlines())


text = "ملکه و زن ها در کنار همسران و خانواده خود یعنی شاه و مرد ها در یک سرزمین پهناور زندگی می‌کردند شاه همیشه به مرد ها تذکر میداد که قدرت در اتحاد مرد ها و شاه نهفته است و در این قلمرو ملکه به زن ها یادآوری می‌کرد که همبستگی زن ها و ملکه مهم است و در این داستان هر مرد که نزد شاه یا زن که نزد ملکه می‌آمد از آنها حکم می‌گرفت تا به دیگران کمک کنند شاه عادل و قادر بود و ملکه خردمند و زیبا و هر مرد که از حکمت شاه یا زن که از عدالت ملکه راضی نبود نزد آنها می‌رفت تا شکایت خود را مطرح کند شاه و مرد و ملکه و زن در کنار هم بودند و هیچ کس از شاه یا ملکه نمی‌ترسید شاه همیشه به زبان میاورد که مرد ها باید به یکدیگر کمک کنند و ملکه تأکید داشتند زن ها هم باید متحد باشند"

words = text.split()
filtered_text = [word for word in words if word not in stop_words]

cleand_text = " ".join(filtered_text)
print(cleand_text)

ملکه زن همسران خانواده شاه مرد سرزمین پهناور زندگی می‌کردند شاه مرد تذکر میداد قدرت اتحاد مرد شاه نهفته قلمرو ملکه زن یادآوری می‌کرد همبستگی زن ملکه مهم داستان مرد شاه زن ملکه می‌آمد حکم می‌گرفت کمک شاه عادل قادر ملکه خردمند زیبا مرد حکمت شاه زن عدالت ملکه راضی می‌رفت شکایت مطرح شاه مرد ملکه زن شاه ملکه نمی‌ترسید شاه زبان میاورد مرد کمک ملکه تأکید زن متحد


In [35]:
vocab = Vocabulary()
vocab.build_vocab(cleand_text.split(" "))
train_data = generate_training_data(vocab, cleand_text.split(" "))
word2vec_model = Word2vec(vocab.vocab_size)
word2vec_model.train(train_data, epochs=1000)

Epoch :0, Loss:53.356789408085106
Epoch :100, Loss:35.75096563064102
Epoch :200, Loss:34.276498804407936
Epoch :300, Loss:33.79475420045126
Epoch :400, Loss:33.56259002051476
Epoch :500, Loss:33.427073104511926
Epoch :600, Loss:33.338642132957084
Epoch :700, Loss:33.276584117281274
Epoch :800, Loss:33.23074489200006
Epoch :900, Loss:33.19557221139599


In [36]:
def get_word_embedding(word, vocab, model):
    word_idx = vocab.word_to_index(word)
    if word_idx != -1:
        return model.W[word_idx]
    else:
        return None


embedding = get_word_embedding("مرد", vocab, word2vec_model)
print(embedding)

[-0.01514983  0.32530012 -0.14619844  0.20274366  0.3263011   0.09942745
 -0.41776853  0.3118245  -0.30856868  0.43939278 -0.31279673  0.42955828
 -0.43436397  0.14516801 -0.09908249 -0.51064728  0.52586765 -0.48288213
  0.3252349   0.36155198  0.24463679  0.49802222  0.2449491   0.30149437
  0.21963121 -0.26395015 -0.36538522 -0.54492915 -0.2408588  -0.36645249
  0.48578376 -0.06634267  0.34094661  0.44863162 -0.43166904 -0.09979197
  0.35877761  0.33496308 -0.3725752   0.07430546 -0.31158981  0.37156995
 -0.02653476  0.01624794 -0.51013922  0.27518936 -0.13202072 -0.36648907
  0.52247758 -0.28531512 -0.17128328  0.4558974  -0.52916916  0.21066672
  0.20695822 -0.35715205 -0.49090298  0.17846657  0.30772896 -0.25442862
  0.36552792 -0.5797372   0.24104936 -0.42488975  0.51584712 -0.18441626
  0.3243224  -0.13941809 -0.36663684  0.50584705  0.23580769  0.34287
  0.08187537 -0.08127052  0.23602389  0.16436381  0.01783608  0.02897745
 -0.40595076  0.40958983 -0.22148578  0.46927259 -0.31

In [37]:
# Finding the similarity between a word and other words in a sentence, based solely on neighboring
def cosine_similarity(word1, word2):
    norm_1 = np.linalg.norm(word1)
    norm_2 = np.linalg.norm(word2)
    return np.dot(word1, word2) / (norm_1 * norm_2)


def find_similar_words(word, vocab, model, k=5):
    similarity = []
    word_idx = vocab.word_to_index(word)

    if word_idx == -1:
        return f"{word} does not exist in vocabulary"

    word_vec1 = model.W[word_idx]

    for idx in range(vocab.vocab_size):
        word_vec2 = model.W[idx]
        if idx != word_idx:
            embedding = cosine_similarity(word_vec1, word_vec2)
            similarity.append((vocab.index_to_word(idx), embedding))
            similarity = sorted(similarity, key=lambda x: x[1], reverse=True)
    return similarity[:k]


similarity = find_similar_words("مرد", vocab, word2vec_model)
similarity

[('کمک', 0.08309825786618957),
 ('ملکه', 0.06986142746230715),
 ('شاه', -0.007272829870618061),
 ('زن', -0.13480019839779123)]

In [43]:
# "Finding a similarity between word_a and word_b that corresponds to the analogy between word_c and analogy(word_a, word_b, word_c)
def find_analogy(word_a, word_b, word_c, vocab, model):
    vec_a = (
        model.W[vocab.word_to_index(word_a)]
        if vocab.word_to_index(word_a) != -1
        else None
    )
    vec_b = (
        model.W[vocab.word_to_index(word_b)]
        if vocab.word_to_index(word_b) != -1
        else None
    )
    vec_c = (
        model.W[vocab.word_to_index(word_c)]
        if vocab.word_to_index(word_c) != -1
        else None
    )
    if vec_a is None or vec_b is None or vec_c is None:
        return f"one of {word_a} or {word_b} or {word_c} is not in vocabulary"

    target_vec = vec_b - vec_a + vec_c
    similarity = []
    for idx in range(vocab.vocab_size):
        word_vec = model.W[idx]
        if (
            idx != vocab.word_to_index(word_a)
            and idx != vocab.word_to_index(word_b)
            and idx != vocab.word_to_index(word_c)
        ):
            embedding = cosine_similarity(word_vec, target_vec)
            similarity.append((vocab.index_to_word(idx), embedding))
    similarity = sorted(similarity, key=lambda x: x[1], reverse=True)
    return similarity[0]


find_analogy("زن", "شاه", "مرد", vocab, word2vec_model)

('ملکه', 0.04607401566984035)

In [10]:
def preprocess_text(
    text,
    minimum_length=1,
    stopword_removal=True,
    stopwords_domain=[],
    lower_case=True,
    punctuation_removal=True,
):
    if lower_case:
        text = text.lower()

    if punctuation_removal:
        text = re.sub(r"[^\w\s]", "", text)

    tokens = word_tokenize(text)

    if stopword_removal:
        stop_words = set(stopwords.words("english"))
        stop_words.update(stopwords_domain)
        tokens = [word for word in tokens if word not in stop_words]

    tokens = [word for word in tokens if len(word) >= minimum_length]

    processed_text = " ".join(tokens)

    return processed_text

In [28]:
class PreTrainedWord2Vec:
    def __init__(self, preprocessor=None, model=None):
        self.preprocessor = preprocessor
        self.model = model

    def get_query_embedding(self, sentence):
        if self.preprocessor:
            sentence = self.preprocessor(sentence)

        word_vector = []
        for word in sentence.split(" "):
            word_vector.append(model[word])

        sentence_vector = np.mean(word_vector, axis=0)

        return sentence_vector

    def analogy(self, word1, word2, word3):
        new_word1 = self.preprocessor(word1)
        new_word2 = self.preprocessor(word2)
        new_word3 = self.preprocessor(word3)

        word_vec1 = model[new_word1]
        word_vec2 = model[new_word2]
        word_vec3 = model[new_word3]

        word_vector = word_vec2 - word_vec1 + word_vec3

        vocab_vectors = {word: model[word] for word in list(model.index_to_key)}

        possible_results = {
            word: vec for word, vec in vocab_vectors.items() if word not in word_vector
        }

        nearest_word = max(
            possible_results,
            key=lambda word: np.dot(possible_results[word], word_vector)
            / (np.linalg.norm(possible_results[word]) * np.linalg.norm(word_vector)),
        )

        return nearest_word

In [12]:
# import gensim.downloader as api

# # Load a pre-trained skip-gram Word2Vec model (Google News model)
# model = api.load('word2vec-google-news-300')
model = KeyedVectors.load_word2vec_format("./GoogleNews-vectors-negative300.bin", binary=True)

In [29]:
w2v = PreTrainedWord2Vec(preprocess_text, model)

In [30]:
word = "queen"
neighboars = w2v.model.most_similar(word)
for neighboar in neighboars:
    print(f"word: {neighboar[1]}, similarity: {neighboar[0]} ")

word1 = "man"
word2 = "woman"
word3 = "boy"
print(f"Similarity between {word1} and {word2} is like similarity between {word3} and {w2v.analogy(word1, word2, word3)}")

word: 0.739944338798523, similarity: queens 
word: 0.7070532441139221, similarity: princess 
word: 0.6510957479476929, similarity: king 
word: 0.6383602023124695, similarity: monarch 
word: 0.6357026100158691, similarity: very_pampered_McElhatton 
word: 0.6163408160209656, similarity: Queen 
word: 0.606067955493927, similarity: NYC_anglophiles_aflutter 
word: 0.5923796892166138, similarity: Queen_Consort 
word: 0.5908074975013733, similarity: princesses 
word: 0.5637185573577881, similarity: royal 
Similarity between man and woman is like similarity between boy and girl


In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
import numpy as np

# DATASET: Positive and Negative Examples
positive_examples = [
    "I absolutely loved the movie!",
    "The food was delicious and the service was excellent.",
    "This product exceeded my expectations.",
    "I had a fantastic time at the concert.",
    "The weather was beautiful during our vacation.",
    "The customer support team was incredibly helpful.",
    "I would definitely recommend this restaurant.",
    "The book was a joy to read from start to finish.",
    "I'm very happy with my purchase.",
    "The experience was truly unforgettable."
]

negative_examples = [
    "The movie was terrible and a complete waste of time.",
    "The food was cold and the service was slow.",
    "I was very disappointed with the quality of this product.",
    "The concert was boring and not worth the money.",
    "The weather ruined our entire trip.",
    "The customer support team was unhelpful and rude.",
    "I will never visit this restaurant again.",
    "The book was poorly written and hard to follow.",
    "I'm extremely dissatisfied with my purchase.",
    "The experience was awful and I regret it."
]

positive_labels=[1]*len(positive_examples)
positive_sentence=list(zip(positive_examples,positive_labels))
negative_labels=[0]*len(negative_examples)
negative_sentence=list(zip(negative_examples,negative_labels))
dataset=positive_sentence+negative_sentence

random.shuffle(dataset)
train_data,val_data=train_test_split(dataset,test_size=0.2,random_state=42)

In [16]:
# In the Dataset class, the __len__ and __getitem__ methods are abstract and need to be implemented by subclasses.
class SentimentDataset(Dataset): 
    def __init__(self, data, word2vec_model):
        self.data = data
        self.word2vec_model = word2vec_model

    def __len__(self):
        return len(self.data)

    def get_embedding(self, sentence):
        embedding = self.word2vec_model.get_query_embedding(sentence)
        return torch.tensor(embedding).float()

    def __getitem__(self, idx):
        sentence, label = self.data[idx]
        embedding = self.get_embedding(sentence)
        return embedding, torch.tensor(label, dtype=torch.float)
        

class SentimentClassifier(nn.Module):
    def __init__(self, embedding_dim):
        super().__init__()
        self.embedding_dim = embedding_dim

        self.stack_layers = nn.Sequential(
            nn.Linear(self.embedding_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        out = self.stack_layers(x)
        return out

In [17]:
embedding_dim = 300

train_dataset = SentimentDataset(train_data, w2v)
val_dataset = SentimentDataset(val_data, w2v)

# When a DataLoader is created from an object of the SentimentDataset class,some of its functions (like __len__ and __getitem__)are called automatically
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=True)

nn_model = SentimentClassifier(embedding_dim)
nn_optimizer = optim.Adam(nn_model.parameters(), lr=1e-4)
nn_loss = nn.BCELoss()

batch_size = 4


def train(dataloader, model, optimizer, loss_nn):
    model.train()
    running_loss = 0
    for batch_idx, (data, label) in enumerate(dataloader):
        y_pred = model(data)
        optimizer.zero_grad()
        loss = loss_nn(y_pred.squeeze(), label)
        running_loss += loss.item()
        loss.backward()
        optimizer.step()
    return running_loss


def test(dataloader, model, loss_nn):
    model.eval()
    running_loss = 0
    with torch.no_grad():
        for batch_idx, (data, label) in enumerate(dataloader):
            y_pred = model(data)
            running_loss += loss_nn(y_pred.squeeze(), label).item()
    return running_loss

In [18]:
num_epochs = 100

for t in (pbar := trange(num_epochs)):
    loss = train(train_loader, nn_model, nn_optimizer, nn_loss)
    val_loss = test(val_loader, nn_model, nn_loss)
    pbar.set_description(f"Train loss: {loss:.4f}| Val loss: {val_loss:.4f}")

Train loss: 2.0355| Val loss: 0.8368: 100%|██████████████████████████████████████████| 100/100 [00:04<00:00, 23.41it/s]


In [19]:
def predict_sentiment(model, sentence):
    model.eval()
    with torch.no_grad():
        test_vector = w2v.get_query_embedding(query_sentence)
        test_vector = torch.tensor(test_vector)
        test_pred = model(test_vector)
        sentiment = "positive" if test_pred.item() >= 0.5 else "negative"
    return sentiment

In [20]:
query_sentence = "The movie was terrible and a i hated it"
sentiment=predict_sentiment(nn_model, query_sentence)
sentiment

'negative'

In [21]:
query_sentence = "The movie was amazing and i loved it"
sentiment=predict_sentiment(nn_model, query_sentence)
sentiment

'positive'