## NLP Assignment 1
---

### Task 1

In [1]:
import re
import os
from collections import defaultdict

In [2]:
class Tokenizer:
  def __init__(self):
    self.vocab = defaultdict(int)
    self.all_tokens= defaultdict(int)  # all tokens from first to last
    self.merge_rules=[]
    self.final_tokens=[]         # only the final tokens



  def merge_vocabulary(self,pair, vocab_old):
    vocab_new = {}
    self.merge_rules.append(pair)
    bigram = re.escape(' '.join(pair))
    regex=re.compile(r'(?<!\S)' + bigram + r'(?!\S)')

   # merge the old vocab based on the new merging rule
    for word in vocab_old:
        w_out = regex.sub(''.join(pair), word) #find all words where bigram was found in the old vocab
        all_tokens=w_out.split()
        for a in all_tokens: self.all_tokens[a]+=1
        vocab_new[w_out] = vocab_old[word]
    return vocab_new



  def learn_vocabulary(self, corpus, num_merges):
      ### create a list of strings
      data = corpus.split('\n')

      ### updating dict vocab with words and frequencies
      for line in data:
          for word in line.split():
            new_word=' '.join(list(word)) + ' $'
            self.vocab[new_word] += 1

            all_tokens=new_word.split()
            for a in all_tokens: self.all_tokens[a]+=1


      ### making pairs and updating their frequencies
      for _ in range(num_merges):
          pairs = defaultdict(int)
          for word,freq in self.vocab.items():
              chars = word.split()
              for j in range(len(chars)-1):
                  pairs[chars[j],chars[j+1]] += freq


          best_pair = max(pairs, key=pairs.get)
          self.vocab = self.merge_vocabulary(best_pair, self.vocab)

      t = " ".join(list(self.vocab.keys()))
      d=defaultdict(int)
      l=t.split()
      for a in l: d[a]+=1
      self.final_tokens=list(d.keys())



  def tokenize(self,text_lists):
    # divide the text into individual letters

     ans_list=[]
     for a in text_lists:
          text=a
          data = text.split('\n')
          new_text=""
          for line in data:
              for word in line.split():
                  new_word=' '.join(list(word)) + ' $ '
                  new_text+=new_word

          # Tokenize the text based on merge rules
          merge_rules=self.merge_rules
          for rule in merge_rules:
              merged_token = "".join(rule)
              new_text = new_text.replace(" ".join(rule),merged_token)

          tokens = new_text.split()
          # print(tokens)
          # print()
          ans_list.append(tokens)


     return ans_list
  

  def write_to_file(self,root,text_list):
      token_path = os.path.join(root,"tokens.txt")
      rules_path = os.path.join(root,"merge_rules.txt")
      samples_path = os.path.join(root,"tokenized_samples.txt")

      file=open(token_path,"w+")
      for a in list(self.all_tokens.keys()):
          file.write(a+"\n")
      file.close()


      file=open(rules_path,"w+")
      for a in self.merge_rules:
          file.write(a[0]+","+a[1]+"\n")
      file.close()


      file=open(samples_path,"w")
      for a in text_list:
          s=",".join(a)+"\n"
          print(s)
          file.write(s)
      file.close()

In [3]:
if __name__ == "__main__":
    num_merges = 500

    file=open("./corpus/corpus.txt","r")
    corpus=file.read()
    # file.seek(0)
    # test_corpus=file.readlines()

    tokenizer = Tokenizer()
    tokenizer.learn_vocabulary(corpus, num_merges)


In [5]:
if __name__ == "__main__":
    test_corpus=['''Tokenization is the process of breaking down a sequence of text into smaller units called tokens, which can be words, phrases, or even individual characters.''',
    '''Tokenization is often the first step in natural languages processing tasks such as text classification, named entity recognition, and sentiment analysis.''',
    '''The resulting tokens are typically used as input to further processing steps, such as vectorization, where the tokens are converted into numerical representations for machine learning models to use.'''  ]
    
    text_list=tokenizer.tokenize(test_corpus)

    tokenizer.write_to_file("./merged_tokens/",text_list)

T,o,ken,i,z,ation$,is$,the$,proc,ess$,of$,brea,king$,down$,a$,se,qu,ence$,of$,tex,t$,into$,s,maller$,un,its$,called$,to,ken,s,,,$,which$,can$,be$,wor,d,s,,,$,p,h,rases,,,$,or$,even$,in,di,vi,du,al$,charac,ter,s,.,$

T,o,ken,i,z,ation$,is$,of,ten$,the$,fir,st$,step$,in$,n,atural$,l,angu,ages$,proc,es,sing$,tas,ks$,su,ch$,as$,tex,t$,cl,as,si,fication,,,$,n,amed$,en,tity$,re,co,g,n,ition,,,$,and$,sen,timent$,an,al,y,sis,.,$

T,he$,resul,ting$,to,ken,s$,are$,t,y,p,ically$,used$,as$,in,pu,t$,to$,fur,ther$,proc,es,sing$,step,s,,,$,su,ch$,as$,v,e,c,tori,z,ation,,,$,where$,the$,to,ken,s$,are$,con,ver,ted$,into$,n,u,merical$,represen,tations$,for$,machine$,lear,ning$,mo,del,s$,to$,use,.,$



In [1]:
from collections import defaultdict
import numpy as np
import utils

  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(


<br><br><br>
<h3> Task 2: Bigram Model </h3>

In [2]:
class BigramLMWithEmotion:
    def __init__(self, start_token='<s>', end_token='<eos>'):
        self.vocab = set()
        self.data = None
        self.start_token = start_token
        self.end_token = end_token
        self.bigram_counts = {}
        self.n_bigrams = 0
        self.unigram_counts = defaultdict(int)
        self.bigram_probs = {}
        self.bigram_emotion_probs = {}
        self.smoothing_method = None
        self.emotion_classifier = utils.emotion_scores


    
    def learn_vocabulary(self, data, smoothing_method=None, with_emotion=False):
        self.data = data
        self.smoothing_method = smoothing_method
        self._count_bigrams()
        self._estimate_probabilities()
        if with_emotion:
            self._modify_bigram_probs_with_emotions()



    def _count_bigrams(self):
        self.n_bigrams = 0
        #compute bigram counts
        for sentence in self.data:
            tokens = sentence.split()
            tokens = [self.start_token] + tokens + [self.end_token]

            self.vocab.update(tokens)

            for i in range(len(tokens) - 1):
                current_bigram = (tokens[i],tokens[i+1])
                self.unigram_counts[tokens[i]]+=1
                self.bigram_counts[current_bigram] = self.bigram_counts.get(current_bigram, 0) + 1
                self.n_bigrams += 1

            self.unigram_counts[tokens[len(tokens)-1]]+=1


    
    def _estimate_probabilities(self):
        if self.smoothing_method == 'laplace':
            self._laplace_smoothing()
        elif self.smoothing_method == 'kneser-ney':
            self._kneser_ney_smoothing()
        else:
            for bigram in self.bigram_counts:
                first_word = bigram[0]
                count=self.bigram_counts[bigram]
                probability=count/ self.unigram_counts[first_word]
                self.bigram_probs[bigram]=probability



    def _laplace_smoothing(self):
        V = len(self.vocab)
        for bigram in self.bigram_counts:
            first_word = bigram[0]
            curr_bigram_count=self.bigram_counts[bigram]
            smoothed_prob=(curr_bigram_count+1)/(self.unigram_counts[first_word]+V)
            self.bigram_probs[bigram]=smoothed_prob


    def _kneser_ney_smoothing(self):
        discount = 0.5
        n_bigrams = self.n_bigrams

        for bigram in self.bigram_counts:
            first_word,second_word = bigram
            cnt_bigram = self.bigram_counts[bigram]
            cnt_unigram = self.unigram_counts[first_word]

            cont_cnt = len(set([w1 for (w1,w2) in self.bigram_counts.keys() if w2==second_word]))
            alpha_cnt = len(set([w2 for (w1,w2) in self.bigram_counts.keys() if w1==first_word]))
            p_cont = cont_cnt/n_bigrams
            alpha = (discount/cnt_unigram)*(alpha_cnt)

            prob = (max(0,cnt_bigram - discount))/cnt_unigram + alpha*p_cont
            self.bigram_probs[bigram] = prob



    def generate_text(self,emotion,max_length=20):
        generated_text = [self.start_token]

        for _ in range(max_length):
            next_word = self.generate_next_word(generated_text[-1],emotion)
            if next_word == self.end_token:
                break
            generated_text.append(next_word)

        return ' '.join(generated_text)


    def generate_next_word(self, prev_word, emotion):
        suggestions = [(next_word,self.bigram_emotion_probs[(prev_word,next_word)][emotion]) for (prev_word,next_word) in self.bigram_probs.keys()]
        
        if(len(suggestions)==0):
            return self.end_token
        
        next_words,probabilities = zip(*suggestions)
        total = sum(probabilities)
        probabilities = list(probabilities)
        for i in range(len(probabilities)):
            probabilities[i] /= total
        next_word = np.random.choice(next_words,p=probabilities)
        return next_word



    def _modify_bigram_probs_with_emotions(self):
        for bigram in self.bigram_probs:
            first_word, second_word = bigram
            bigram_emotion_score = self.emotion_classifier(first_word+" "+second_word)
            unigram_emotion_score = self.emotion_classifier(first_word)

            total = 0.0

            self.bigram_emotion_probs[bigram] = defaultdict(float)
            for i,item in enumerate(bigram_emotion_score):
                self.bigram_emotion_probs[bigram][item['label']] = self.bigram_probs[bigram] + item['score']/unigram_emotion_score[i]['score']
                total += self.bigram_emotion_probs[bigram][item['label']]
            for key in self.bigram_emotion_probs[bigram].keys():
                self.bigram_emotion_probs[bigram][key] /= total

In [3]:
file=open("./corpus/corpus.txt","r")
corpus=file.readlines()

In [4]:
def print_top_5_bigrams(bigram_model):
    top_5_bigrams = sorted(bigram_model.bigram_probs.items(), key=lambda x: x[1], reverse=True)[:5]
    
    print("Top 5 Bigrams and Probabilities:")
    print("\n{:<20} {:<20}".format("Bigram", "Probability"))
    print("="*40)
    for bigram, probability in top_5_bigrams:
        print("{:<20} {:.4f}".format(str(bigram), probability))

In [8]:
bigram_nosmooth = BigramLMWithEmotion()
bigram_nosmooth.learn_vocabulary(data=corpus)
print_top_5_bigrams(bigram_nosmooth)

Top 5 Bigrams and Probabilities:

Bigram               Probability         
('href', 'http')     1.0000
('mooshilu', '<eos>') 1.0000
('tychelle', 'to')   1.0000
('hang', 'out')      1.0000
('nonexistent', 'social') 1.0000


In [5]:
bigram_laplace = BigramLMWithEmotion()
bigram_laplace.learn_vocabulary(data=corpus,smoothing_method='laplace')
print_top_5_bigrams(bigram_laplace)

Top 5 Bigrams and Probabilities:

Bigram               Probability         
('<s>', 'i')         0.2693
('i', 'feel')        0.1104
('feel', 'like')     0.0351
('i', 'am')          0.0319
('<s>', 'im')        0.0272


In [7]:
bigram_kneserney = BigramLMWithEmotion()
bigram_kneserney.learn_vocabulary(data=corpus,smoothing_method='kneser-ney')
print_top_5_bigrams(bigram_kneserney)

Top 5 Bigrams and Probabilities:

Bigram               Probability         
('href', 'http')     0.9800
('don', 't')         0.9746
('didn', 't')        0.9722
('sort', 'of')       0.9708
('supposed', 'to')   0.9450


In [9]:
bigram_model = BigramLMWithEmotion()
bigram_model.learn_vocabulary(data=corpus,smoothing_method='kneser-ney',with_emotion=True)

In [10]:
import pickle

with open('bigram_model.pkl','wb') as file:
    pickle.dump(bigram_model,file)
with open('bigram_model.pkl','rb') as file:
    loaded_model = pickle.load(file)

### Sample Generation

In [11]:
class SentenceGenerator:
    def __init__(self,emotions,bigram_model):
        self.emotions = emotions
        self.bigram_model = bigram_model
    
    def generate(self,n_sentences,min_len):
        out_text = {}
        for emotion in self.emotions:
            out_text[emotion] = []
            while(len(out_text[emotion])<n_sentences):
                sentence = self.bigram_model.generate_text(emotion)[4:]
                if(len(sentence.split(' '))<min_len):
                    continue
                out_text[emotion].append(sentence)
                # print(sentence)
        return out_text

In [12]:
emotions = ['sadness','joy','fear','love','anger','surprise']
n_sentences = 50
min_len = 7

generated_samples = SentenceGenerator(emotions,bigram_model).generate(n_sentences,min_len)
test_data = []
test_labels = []

!rm -rf generated_samples
!mkdir generated_samples

for emotion in emotions:
    path = "./generated_samples/gen_"+emotion+".txt"
    file = open(path,"w+")
    for sent in generated_samples[emotion]:
        file.write(sent+"\n")
        test_data.append(sent)
        test_labels.append(emotion)
    file.close()
    print(f"{emotion}: Generated!")

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


sadness: Generated!
joy: Generated!
fear: Generated!
love: Generated!
anger: Generated!
surprise: Generated!


In [13]:
train_labels = []
for sent in corpus:
    scores = utils.emotion_scores(sent)
    mx = 0.0
    emot = ""
    for item in scores:
        if item['score']>mx:
            emot = item['label']
            mx = item['score']
    train_labels.append(emot)

### Extrinsic Evaluation

In [19]:
import warnings
warnings.filterwarnings('ignore')

In [27]:
from sklearn.svm import SVC
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report,accuracy_score


vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(corpus)
X_test = vectorizer.transform(test_data)

svc = SVC()
param_grid = {
    'kernel':['rbf','linear','sigmoid'],
    'C':[0.9,0.85,0.8,0.7],
    'class_weight':['balanced',None],
    'max_iter':[-1,700,500]
}

grid_search = GridSearchCV(estimator=svc,param_grid=param_grid,cv=5,scoring='accuracy')
grid_search.fit(X,train_labels)

best_params = (grid_search.best_params_)
print(best_params)

svc_best = grid_search.best_estimator_
y_pred = svc_best.predict(X_test)

print(classification_report(test_labels,y_pred))
print(accuracy_score(test_labels,y_pred))

{'C': 0.9, 'class_weight': None, 'kernel': 'linear', 'max_iter': -1}
              precision    recall  f1-score   support

       anger       0.68      0.68      0.68        50
        fear       0.73      0.66      0.69        50
         joy       0.61      0.82      0.70        50
        love       0.80      0.72      0.76        50
     sadness       0.66      0.80      0.72        50
    surprise       0.94      0.60      0.73        50

    accuracy                           0.71       300
   macro avg       0.74      0.71      0.71       300
weighted avg       0.74      0.71      0.71       300

0.7133333333333334


In [26]:
print(vectorizer.vocabulary_)

