In [4]:
#Word2Vec from scratch (skip-gram)	Negative sampling, embeddings	Wikipedia dump or custom

In [5]:
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict,Counter
import random
import pickle
from typing import List, Tuple,Dict,Set
import re
from tqdm import tqdm


In [14]:
class Word2Vec:
    def __init__(self,vector_size=100,window_size=5,negative_samples=5,learning_rate=0.01,min_count=1,epochs=5):

        self.vector_size = vector_size
        self.window_size = window_size
        self.negative_samples = negative_samples
        self.min_count = min_count
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.window=window_size

        # Vocabulary and mapping

        self.vocab = {}
        self. index2word = {}
        self.word_counts=Counter()
        self.vocab_size=0

        #model parameters
        self.W1 = None  # Input to hidden layer weights  
        self.W2 = None  # Hidden to output layer weights

        #for negative sampling
        self.unigram_table =None
        self.table_size=1e8

    def preprocess_text(self,text:str)->List[str]:
        text=re.sub(r'[^a-zA-Z\s]',"",text.lower())
        #split into words
        words=text.split()
        return words
    def build_vocabulary(self,sentences:List[List[str]]):
        print("Building vocabulary")
        #count word frequencies
        for sentence in sentences:
            for word in sentence:
                self.word_counts[word]+=1
        #filter words below min_count
        filtered_words={word:count for word,count in self.word_count.items() if count>=self.min_count}
        #create vocabulary mappings
        self.vocab={word: i for i ,word in enumerate(filtered_words.keys())}
        self.index2word={i:word for word,i in self.vocab.items()}
        self.vocab_size=len(self.vocab)

        print(f"Vocabulary size:{self.vocab_size}")
        #build uniform table for negative sampling
        self._build_unigram_table()

    def _build_unigram_table(self):
        print("Building unigram table for negative sampling")
        #calculate probabilities
        total_count=sum(self.word_counts[word] for word in self.vocab.keys())
        word_probs={}
        for word in self.vocab.keys():
            word_probs[word]=(self.word_counts[word]/total_count)**0.75

        #Normalize Probabilities
        total_probs=sum(word_probs.values())
        for word in word_probs:
            word_probs[word]!=total_probs
        #create unigram table
        self.unigram_table=[]
        table_size=int(self.table_size)

        for word,prob in word_probs.items():
            count=int(prob*table_size)
            self.unigram_table.extend([self.vocab[word]]*count)
        self.unigram_table=np.array(self.unigram_table)
        print(f"Unigram table size:{len(self.unigram_table)}")
    
    def initialize_weights(self):
        print("Initializing weights...")
        #initialize input embeddings(small random values)
        self.w1=np.random.uniform(-0.5,0.5,(self.vocab_size,self.vector_size))
        self.w1=self.w1/self.vector_size
        #initialize output embeddings(Zeros)
        self.w2=np.zeros((self.vocab_size,self.vector_size))
    def generate_training_data(self,sentences:List[List[str]]):
        #generate skip gram training pairs
        training_data=[]
        for sentence in sentences:
            word_indices=[self.vocab[word] for word in sentence if word in self.vocab]
            for i,target_word in enumerate (word_indices):
                start=max(0,i-self.window)
                end=min(len(word_indices),i+self.window+1)
                for j in range (start,end):
                    if i!=j:
                        context_word=word_indices[j]
                        training_data.append((target_word,context_word))
        return training_data

    def get_negative_samples(self,target_word:int,content_word:int)->List[int]:
        negatives=[]
        while len(negatives)<self.negatives_samples:
            neg_sample=np.random.choice(self.unigram_table)
            if neg_sample!=target_word and neg_sample!=content_word:
                negatives.append(neg_sample)
        return negatives
    
    def sigmoid(self,x):
        x=np.clip(x,-500,500)
        return 1/(1+np.exp(-x))
    
    def train_pair(self,target_word:int,context_word:int):
        #get embeddingd
        target_embed=self.w1[target_word]
        #positive sample
        context_embed=self.w2[context_word]
        #calculate positive sample loss and gradients
        pos_score=np.dot(target_embed,context_embed)
        pos_prob=self.sigmoid(pos_score)
        pos_error=pos_prob-1#target is 1 for positive samples
        #update gradients for positive samples
        grad_target=pos_error*context_embed
        grad_context=pos_error*target_embed
        #Apply gradients
        self.w1[target_word]-=self.learning_rate*grad_target
        self.w2[context_word]-=self.learning_rate*grad_context
        #Negative samples
        negative_samples=self.get_negative_samples(target_word,context_word)

        for neg_word in negative_samples:
            neg_embed=self.w2[neg_word]
            #Calculate negative sample loss and gradients
            neg_score=np.dot(target_embed,neg_embed)
            neg_prob=self.sigmoid(neg_score)
            neg_error=neg_prob-0#target is 0 for negative samples
            #update gradients for negative sample
            grad_target+=neg_error*neg_embed
            grad_neg=neg_error*target_embed
            #Apply gradients
            self.w2[neg_word]-=self.learning_rate*grad_neg
        #Update target embedding with acccumulated gradients
        self.w1[target_word]-=self.learning_rate*grad_target
    
    def train(self,sentences:List[List[str]]):
        print("Starting Training....")
        #build vocabulary
        self.build_vocabulary(sentences)
        #Initialize weights
        self.initialize_weights()
        #generate Trainng data
        training_data=self.generate_training_data(sentences)
        print(f"Generated {len(training_data)} training pairs")
        #Trainning Loop
        for epoch in range(self.epochs):
            print(f"\n Epoch{epoch+1/self.epochs}")
            #shuffle training data
            random.shuffle(training_data)
            #Train on each pair
            total_loss=0
            for target_word,context_word in tqdm(training_data,desc=f"Epoch{epoch+1}"):
                self.train_pair(target_word,context_word)
            #decay learning rate
            self.learning_rate*=0.95
            print(f"Learning rate:{self.learning_rate}")
    def get_word_vector(self,word:str)->np.ndarray:
        if word in self.vocab:
            return self.w1[self.vocab[word]]
        else:
            raise KeyError(f"{word} not in vocabulary") 
    def most_similar(self,word:str,topn=10)->List[Tuple[str,float]]:
        if word not in self.vocab:
            raise KeyError(f"{word} not in vocabulary")
        word_vector=self.get_word_vector(word)
        #calculate cosine similarities
        similarities=[]
        for other_word in self.vocab:
            if other_word!=word:
                other_vector=self.get_word_vector(other_word)
                #cosine similarity
                cos_sim=np.dot(word_vector,other_vector)/(np.linalg.norm(word_vector)*np.Linalg.norm(other_vector))
                similarities.append((other_word,cos_sim))
        #sort by similarity
        similarities.sort(key=lambda x:x[1],reverse=True)
        return similarities[:topn]
    
    def save_model(self,filepath:str):
        model_data={
            'w1':self.w1,
            'w2':self.w2,
            'vocab':self.vocab,
            'index2word':self.index2word,
            'word_counts':self.word_counts,
            'vector_size':self.vector_size,
            'vocab_size':self.vocab_size
            }
        with open(filepath,'wb') as f:
            pickle.dump(model_data,f)
        print(f"Model saved to {filepath}")      
    
    def load_model(self,filepath:str):
        with open(filepath,"rb") as f:
            model_data=pickle.load(f)
        self.w1=model_data['w1']
        self.w2=model_data['w2']
        self.vocab=model_data['vocab']
        self.index2word=model_data['index2word']
        self.word_counts=model_data['word_counts']
        self.vector_size=model_data['vector_size']
        self.vocab_size=model_data['vocab_size']
        print(f"Model loaded from {filepath}")
    
       

In [15]:
#example usage and testing
def create_sample_corpus():
    corpus=[
        "Monika Chaulagain is filthy rich"
        "Monika Chaulagain is very beautiful"
        "Monuu is very intelligent."
        "Everybody loves Monuu."
        "Monika is filled with love ,grace and happieness."
        "She has done some great jobss which will be remembered in the manknind history"
        "Monika Chaulagain is an amazing person"
        "Monika Chaulagain is a great friend"
    ]*100 #replicate to increase corpus size
    return corpus

In [16]:
def main():
    #Example of Word2vec implementatiion
    print("Word2vec skip gram implementation")
    print("="*40)
    
    print("Creating sample corpus...")
    raw_corpus=create_sample_corpus()

    model = Word2Vec()  # Use all defaults first
    print("Default initialization works!")

    #Initialize model
    model=Word2Vec(
        vector_size=50,
        window=3,
        negative_samples=5,
        learning_rate=0.025,
        min_count=2,
        epochs=10
    )
    #Preprocess corpus
    print("Preprocessing corpus...")
    sentences=[model.preprocess_text(text) for text in raw_corpus]
    #Train model
    model.train(sentences)
    #Test model
    print("\n"+"="*40)
    print("Testing the model:")
    print("="*40)
    test_words=["monika","beautiful","intelligent","friend"]
    for word in test_words:
        if word in model.vocab:
            print(f"\nMOst similar to '{word}':")
            similar=model.most_similar(word,topn=5)
            for sim_word,sim_score in similar:
                print(f"{sim_word}:{sim_score:.4f}")
    #save model
    print("\nSaving model...")
    model.save_model("word2vec_model.pkl")
    print("\n Training completed!")

if __name__=="__main__":
    main()

Word2vec skip gram implementation
Creating sample corpus...
Default initialization works!


TypeError: Word2Vec.__init__() got an unexpected keyword argument 'window'