In [1]:
# !pip install transformers
# !pip install numpy
# !pip install tensorflow
!pip install scikit-learn

Collecting scikit-learn
  Using cached scikit_learn-1.4.0-1-cp310-cp310-macosx_12_0_arm64.whl.metadata (11 kB)
Collecting scipy>=1.6.0 (from scikit-learn)
  Using cached scipy-1.12.0-cp310-cp310-macosx_12_0_arm64.whl.metadata (112 kB)
Collecting joblib>=1.2.0 (from scikit-learn)
  Using cached joblib-1.3.2-py3-none-any.whl.metadata (5.4 kB)
Collecting threadpoolctl>=2.0.0 (from scikit-learn)
  Using cached threadpoolctl-3.2.0-py3-none-any.whl.metadata (10.0 kB)
Using cached scikit_learn-1.4.0-1-cp310-cp310-macosx_12_0_arm64.whl (10.6 MB)
Using cached joblib-1.3.2-py3-none-any.whl (302 kB)
Using cached scipy-1.12.0-cp310-cp310-macosx_12_0_arm64.whl (31.4 MB)
Using cached threadpoolctl-3.2.0-py3-none-any.whl (15 kB)
Installing collected packages: threadpoolctl, scipy, joblib, scikit-learn
Successfully installed joblib-1.3.2 scikit-learn-1.4.0 scipy-1.12.0 threadpoolctl-3.2.0


In [2]:
import tensorflow
import numpy as np 
from transformers import pipeline
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.svm import SVC
from sklearn.metrics import classification_report, accuracy_score
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
classifier = pipeline("text-classification",model='bhadresh-savani/distilbert-base-uncased-emotion', return_all_scores=True,)

  from .autonotebook import tqdm as notebook_tqdm
All model checkpoint layers were used when initializing TFDistilBertForSequenceClassification.

All the layers of TFDistilBertForSequenceClassification were initialized from the model checkpoint at bhadresh-savani/distilbert-base-uncased-emotion.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFDistilBertForSequenceClassification for predictions without further training.


In [3]:

class BigramLM:
    def __init__(self):
        self.vocab_size = 0
        self.vocabulary_index = {}
        self.word_count = {}
        self.index_vocabulary = {}
        self.bigram_counts = None
        self.bigram_probabilities = None
        self.dataset = None


    def build_corpus(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as file:
            sentences = [line.strip().split() for line in file]
        self.dataset =  sentences
    
    def build_vocab(self):
        for line in self.dataset:
            for word in line:
                if word not in self.vocabulary_index:
                    self.vocabulary_index[word] = self.vocab_size
                    self.index_vocabulary[self.vocab_size] = word
                    self.word_count[word] = 0
                    self.vocab_size += 1
                self.word_count[word] += 1
    def learn(self, file_path):
        # Build vocabulary and initialize bigram counts
        self.build_corpus(file_path)
        self.build_vocab()

        self.bigram_counts = np.zeros((self.vocab_size, self.vocab_size), dtype=int)

        for line in self.dataset:
            for index in range(len(line) - 1):
                first_word_index = self.vocabulary_index[line[index]]
                second_word_index = self.vocabulary_index[line[index + 1]]
                self.bigram_counts[first_word_index, second_word_index] += 1

    def calculate_probability(self, word1, word2):
        return self.bigram_counts[self.vocabulary_index[word1], self.vocabulary_index[word2]]/self.word_count[word1]
    
    def laplace_smoothing(self, word1, word2):
        return (self.bigram_counts[self.vocabulary_index[word1], self.vocabulary_index[word2]] + 1)/(self.word_count[word1] + self.vocab_size)

    def kneser_ney_smoothing(self, word1, word2, discount = 0):
        discounted_prob = max(self.bigram_counts[self.vocabulary_index[word1], self.vocabulary_index[word2]]-discount, 0)/self.word_count[word1]
        alpha_word1 = (discount* np.sum(self.bigram_counts[self.vocabulary_index[word1], :] > 0))/self.word_count[word1]
        cont_prob = np.sum(self.bigram_counts[:, self.vocabulary_index[word2]] > 0)/np.sum(self.bigram_counts > 0)
        print("alpha_word1", alpha_word1)
        print("discounted_prob",discounted_prob )
        print("self.word_count[word1]", self.word_count[word1])
        print("np.sum(self.bigram_counts[self.vocabulary_index[word1], :] > 0))", np.sum(self.bigram_counts[self.vocabulary_index[word1], :] > 0))
        print("np.sum(self.bigram_counts[:, self.vocabulary_index[word2]] > 0)", np.sum(self.bigram_counts[:, self.vocabulary_index[word2]] > 0))
        print("np.sum(self.bigram_counts > 0)", np.sum(self.bigram_counts > 0))

        return discounted_prob + alpha_word1*cont_prob
    
    def emotion_scores(self, sample): 
        emotion=classifier(sample)
        return emotion[0]

    def calculate_probability_emotion(self, word1, word2, emotion_id):
        emo_prob = self.emotion_scores(word1 + " " +word2)
        return self.bigram_counts[self.vocabulary_index[word1], self.vocabulary_index[word2]]/self.word_count[word1] + emo_prob[emotion_id]['score']
    
    def build_probability_matrix(self, mode, discount = 0, emotion_id = 0):
        self.bigram_probabilities =  np.zeros((self.vocab_size, self.vocab_size), dtype=float)
        if mode == 0:
            for i in range(self.vocab_size):
                for j in range(self.vocab_size):
                    self.bigram_probabilities[i, j] =  self.calculate_probability(self.index_vocabulary[i], self.index_vocabulary[j])

        elif mode == 1:
            for i in range(self.vocab_size):
                for j in range(self.vocab_size):
                    self.bigram_probabilities[i, j] =  self.laplace_smoothing(self.index_vocabulary[i], self.index_vocabulary[j])
                    
        elif mode == 2:
            for i in range(self.vocab_size):
                for j in range(self.vocab_size):
                    print(i, j)
                    self.bigram_probabilities[i, j] =  self.kneser_ney_smoothing(self.index_vocabulary[i], self.index_vocabulary[j], discount= discount) 

        else:
            for i in range(self.vocab_size):
                for j in range(self.vocab_size):
                    print(i, j)
                    self.bigram_probabilities[i, j] =  self.calculate_probability_emotion(self.index_vocabulary[i], self.index_vocabulary[j], emotion_id= emotion_id)                              
# Display the formed corpus


In [4]:
bigramLM = BigramLM()
bigramLM.learn("corpus.txt")


In [85]:
# bigramLM.emotion_scores('violent')
# 0 -> sadness
# 1 -> joy
# 2 ->'love
# 3 -> 'anger
# 4-> fear
# 5-> surprise


[{'label': 'sadness', 'score': 0.0006333347409963608},
 {'label': 'joy', 'score': 0.00038153710193000734},
 {'label': 'love', 'score': 0.00023734646674711257},
 {'label': 'anger', 'score': 0.9974260926246643},
 {'label': 'fear', 'score': 0.0011390680447220802},
 {'label': 'surprise', 'score': 0.00018260569777339697}]

In [5]:
class BigramLM_efficient:
    def __init__(self):
        self.vocab_size = 0
        self.vocabulary_index = {}
        self.word_count = {}
        self.index_vocabulary = {}
        self.bigram_counts = None
        self.bigram_probabilities = None
        self.dataset = None


    def build_corpus(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as file:
            sentences = [line.strip().split() for line in file]
        self.dataset =  sentences
    
    def build_vocab(self):
        for line in self.dataset:
            for word in line:
                if word not in self.vocabulary_index:
                    self.vocabulary_index[word] = self.vocab_size
                    self.index_vocabulary[self.vocab_size] = word
                    self.word_count[word] = 0
                    self.vocab_size += 1
                self.word_count[word] += 1

    def build_probability_matrix(self, mode, discount=0, emotion_id=0):
        self.bigram_probabilities = np.zeros((self.vocab_size, self.vocab_size), dtype=float)

        if mode == 0:
            self.bigram_probabilities = self.calculate_probability_matrix()
        elif mode == 1:
            self.bigram_probabilities = self.laplace_smoothing_matrix()
        elif mode == 2:
            self.bigram_probabilities = self.kneser_ney_smoothing_matrix(discount=discount)
     

    def calculate_probability_matrix(self):
        return self.bigram_counts / self.word_count_matrix[:, np.newaxis]

    def laplace_smoothing_matrix(self):
        return (self.bigram_counts + 1) / (self.word_count_matrix[:, np.newaxis] + self.vocab_size)

    def kneser_ney_smoothing_matrix(self, discount=0):
        discounted_probs = np.maximum(self.bigram_counts - discount, 0) / self.word_count_matrix[:, np.newaxis]
        alpha_word1 = (discount * np.sum(self.bigram_counts > 0, axis=1)) / self.word_count_matrix
        cont_probs = np.sum(self.bigram_counts > 0, axis=0) / np.sum(self.bigram_counts > 0)
        return discounted_probs + alpha_word1[:, np.newaxis] * cont_probs

    def calculate_probability_emotion_row(self, first_wrod, emotion_id=0):
        non_zero_indices = np.nonzero(self.bigram_counts[self.vocabulary_index[first_wrod], :])[0]
        emo_probs = np.zeros((self.vocab_size, 6))
        for second_word_index in non_zero_indices:
            k = self.emotion_scores(first_wrod + " " +  self.index_vocabulary[second_word_index])
            prob_score = []
            for label_score in range(6):
                prob_score.append(k[label_score]['score'])
            emo_probs[second_word_index] = np.array(prob_score)
        first_word_mat = self.bigram_counts / self.word_count_matrix[:, np.newaxis]
        return first_word_mat[self.vocabulary_index[first_wrod],:] + emo_probs[:, emotion_id]
    
    def emotion_scores(self, sample): 
        emotion=classifier(sample)
        return emotion[0]

    def learn(self, file_path):
        self.build_corpus(file_path)
        self.build_vocab()

        self.bigram_counts = np.zeros((self.vocab_size, self.vocab_size), dtype=int)
        self.word_count_matrix = np.array(list(self.word_count.values()))

        for line in self.dataset:
            for index in range(len(line) - 1):
                first_word_index = self.vocabulary_index[line[index]]
                second_word_index = self.vocabulary_index[line[index + 1]]
                self.bigram_counts[first_word_index, second_word_index] += 1
                
    def generate_samples(self, emotion_id = 0, num_samples = 50):
            generated_samples = []
            for _ in range(num_samples):
                sample = self.generate_sample(emotion_id)
                generated_samples.append(sample)
            return generated_samples

    def generate_sample(self, emotion_id = 0, max_length = 7):

        start_word = np.random.choice(['i', 'im', 'ive'], p=[0.7, 0.2, 0.1])
        current_word = start_word
        sample = [current_word]

        for _ in range(max_length - 1):
            current_word_index = self.vocabulary_index[current_word]

            probabilities = self.calculate_probability_emotion_row(current_word, emotion_id)

            # Check if all probabilities are zero
            if np.all(probabilities == 0):
                break

            probabilities /= probabilities.sum()

            next_word_index = np.random.choice(self.vocab_size, p=probabilities)
            next_word = self.index_vocabulary[next_word_index]

            sample.append(next_word)
            current_word = next_word

        return ' '.join(sample)                
# Example usage:
corpus_path = 'corpus.txt'
bigram_model = BigramLM_efficient()
bigram_model.learn(corpus_path)
# bigram_model.build_probability_matrix(mode=1, discount=0.5)


In [None]:
emotions = ['sadness', 'joy', 'love', 'anger', 'fear', 'surprise']
length = 10
for emotion in emotions:
    generated_samples = bigram_model.generate_samples(num_samples = length, emotion_id=emotion)
    output_file = f'gen_5_{emotions[emotion]}.txt'

    with open(output_file, 'w', encoding='utf-8') as file:
        for sample in generated_samples:
            file.write(sample + '\n')

In [7]:


# Load training data
corpus_path = 'corpus.txt'
labels_path = 'labels.txt'

with open(corpus_path, 'r', encoding='utf-8') as file:
    texts = [line.strip() for line in file]

with open(labels_path, 'r', encoding='utf-8') as file:
    labels = [line.strip() for line in file]


X_train, y_train = texts, labels

# Load testing data (generated samples for each emotion)
emotions = ['sadness', 'joy', 'love', 'anger', 'fear', 'surprise']
testing_samples = []

X_test = []  # List to store text samples
y_test = []  # List to store corresponding emotions

for emotion in emotions:
    emotion_file_path = f'Generated files/gen_{emotion}_filtered.txt'
    with open(emotion_file_path, 'r', encoding='utf-8') as file:
        emotion_samples = [line.strip() for line in file]

        # Extend X_test with emotion_samples
        X_test.extend(emotion_samples)

        # Extend y_test with the corresponding emotion labels
        y_test.extend([emotion] * len(emotion_samples))

# Convert y_test to a list
y_test = list(y_test)

In [None]:
pipeline = make_pipeline(TfidfVectorizer(), StandardScaler(with_mean = False), SVC())

param_grid = {
    'tfidfvectorizer__ngram_range': [(1, 1), (1, 2), (2,2)],
    'tfidfvectorizer__max_features': [500, 1000, 3000, None],
    'svc__C': [0.1, 1, 10],
    'svc__kernel': ['linear', 'rbf', 'poly'],
}

grid_search = GridSearchCV(pipeline, param_grid, cv = 5, scoring='accuracy')
grid_search.fit(X_train, y_train)

best_model = grid_search.best_estimator_

y_pred = best_model.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
classification_rep = classification_report(y_test, y_pred)

print("Best Model Parameters:", grid_search.best_params_)
print("Accuracy on Testing Data:", accuracy)
print("Classification Report:\n", classification_rep)

In [8]:
tfidf_vectorizer = TfidfVectorizer()

X_train_tfidf = tfidf_vectorizer.fit_transform(X_train)

X_test_tfidf = tfidf_vectorizer.transform(X_test)

svc_model = SVC(kernel = 'rbf', C = 113, gamma = 0.0095, break_ties = True, probability = True)
svc_model.fit(X_train_tfidf, y_train)

y_pred = svc_model.predict(X_test_tfidf)

accuracy = accuracy_score(y_test, y_pred)
classification_rep = classification_report(y_test, y_pred)

print("Accuracy:", accuracy * 100)

# Print the results
print("Classification Report:\n", classification_rep)

Accuracy: 75.0
Classification Report:
               precision    recall  f1-score   support

       anger       0.83      0.40      0.54        50
        fear       0.89      0.68      0.77        50
         joy       0.58      0.68      0.62        50
        love       0.85      0.90      0.87        50
     sadness       0.59      0.86      0.70        50
    surprise       0.92      0.98      0.95        50

    accuracy                           0.75       300
   macro avg       0.78      0.75      0.74       300
weighted avg       0.78      0.75      0.74       300

