In [1]:
import math
import plotly.express as px
import pandas as pd
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

## Read the data from the files

In [2]:
training = pd.read_csv('training.txt', header=None, delimiter='@@@', names=['text', 'sentiment'], engine='python')
test = pd.read_csv('test.txt', header=None, delimiter='@@@', names=['text', 'sentiment'], engine='python')

## Get the vocabulary

In [3]:

def getVocab(input):
    """
    Get the vocabulary of the input

    Args:
        input (DataFrame): The input

    Returns:
        list: The vocabulary
    """
    vocab = {}
    for index, row in training.iterrows():
        for word in row['text'].split():
            if word not in vocab:
                vocab[word] = 0
            vocab[word] += 1
    vocab = sorted(vocab.items(), key=lambda x: x[1], reverse=True)

    return vocab

vocab = getVocab(training)

vocab[:10]

[('the', 3509),
 ('to', 1978),
 ('in', 1304),
 ('on', 1217),
 ('a', 1152),
 ('and', 1150),
 ('i', 1140),
 ('of', 1026),
 ('for', 985),
 ('is', 849)]

## Do the Naive Bayes training 

In [4]:
# Create functions to be used later
def vectorize(input, features):
    """
    Vectorize the input data using the features

    Args:
        input (pandas.DataFrame): The input data
        features (list): The vocabulary

    Returns:
        list: The vectorized data
        list: The labels
    """
    Vectors = []
    Labels = []

    for index, row in input.iterrows():
        temp = []
        for feature in features:
            if feature[0] in row['text'].split():
                #Calculate the frequency of the word in the document to use bag of words
                temp.append(row['text'].count(feature[0]))
            else:
                temp.append(0)
        Vectors.append(temp)
        Labels.append(row['sentiment'])

    return Vectors, Labels

def NaiveBayesTrain(trainingVectors, trainingLabels):
    """
    Train the Naive Bayes model

    Args:
        Vectors (list): The vectorized data
        labels (list): The labels

    Returns:
        dict: The Naive Bayes model with the probabilities of each word, the probabilities of each label, and the different labels
    """
    numberTrainingDocuments=len(trainingVectors)

    #Number of features in the vectors (training and test have the same number)
    NumberFeatures=len(trainingVectors[0])

    #Calculate the probability that a document is associated to a label
    differentLabels=set(trainingLabels)
    probability = {}
    for label in differentLabels:
        probability[label] = trainingLabels.count(label) / float(numberTrainingDocuments)

    #initialize the numerator and denominator for the p(xi|positive) and p(xi|negative) calculations.
    NumeratorProbability= {}
    for label in differentLabels:
        NumeratorProbability[label] = [1]*NumberFeatures

    DenominatorProbability = {}
    for label in differentLabels:
        DenominatorProbability[label] = 2
    
    #Iterate over training documents
    for x in range(numberTrainingDocuments):
        counter = 0
        for y in trainingVectors[x]:
            NumeratorProbability[trainingLabels[x]][counter] += y
            counter += 1
        DenominatorProbability[trainingLabels[x]] += sum(trainingVectors[x])
    
    WordProbability = {}
    for label in differentLabels:
        WordProbability[label] = []
        for x in NumeratorProbability[label]:
            WordProbability[label].append(math.log(x/float(DenominatorProbability[label])))
    
    return {"WordProbability": WordProbability, "probability": probability, "labels": differentLabels}

def NaiveBayesPredict(vector, model):
    """
    Predict the label of the vector using the Naive Bayes model

    Args:
        vector (list): The vector to predict
        model (dict): The Naive Bayes model

    Returns:
        string: The predicted label
    """

    probabilities = { label: [] for label in model["labels"]}

    #Go through the vector bag of words
    for counter, x in enumerate(vector):
        # Go through the different labels
        for label in model["labels"]:
            probabilities[label].append(x * model["WordProbability"][label][counter])

    p = {}
    for label in model["labels"]:
        p[label] = sum(probabilities[label]) + math.log(model["probability"][label])

    return max(p, key=p.get)

def calculate_results(predictions, testLabels):
    """
    Calculate the accuracy, precision, recall, and F1 score of the model

    Args:
        predictions (list): The predicted labels
        testLabels (list): The true labels

    Returns:
        dict: The results of the model
    """
    results = {}

    results['test_accuracy'] = accuracy_score(testLabels, predictions)
    results['test_precision_macro'] = precision_score(testLabels, predictions, average='macro')
    results['test_recall_macro'] = recall_score(testLabels, predictions, average='macro')
    results['test_f1_macro'] = f1_score(testLabels, predictions, average='macro')

    return results

def model_NB(train, test, numFeatures):
    # Vectorize the data
    trainingVectors, trainingLabels = vectorize(train, vocab[:numFeatures])
    testVectors, testLabels = vectorize(test, vocab[:numFeatures])
    # Manual Naive Bayes
    model = NaiveBayesTrain(trainingVectors, trainingLabels)
    predictions = []
    for index, row in enumerate(testVectors):
        predictions.append(NaiveBayesPredict(row, model))
    manual_results = calculate_results(predictions, testLabels)

    # Sklearn Naive Bayes
    model = MultinomialNB()
    model.fit(trainingVectors, trainingLabels)
    predictions = model.predict(testVectors)
    result = calculate_results(predictions, testLabels)

    return manual_results, result

# Compare the results with different number of features
list_results = {'test_accuracy': [], 'test_precision_macro': [], 'test_recall_macro': [], 'test_f1_macro': []}
list_sklearn_results = {'test_accuracy': [], 'test_precision_macro': [], 'test_recall_macro': [], 'test_f1_macro': []}
for numFeatures in [20, 40, 60, 80, 100, 120, 200]:
    manual_results, result = model_NB(training, test, numFeatures)
    list_results['test_accuracy'].append(manual_results['test_accuracy'])
    list_results['test_precision_macro'].append(manual_results['test_precision_macro'])
    list_results['test_recall_macro'].append(manual_results['test_recall_macro'])
    list_results['test_f1_macro'].append(manual_results['test_f1_macro'])

    list_sklearn_results['test_accuracy'].append(result['test_accuracy'])
    list_sklearn_results['test_precision_macro'].append(result['test_precision_macro'])
    list_sklearn_results['test_recall_macro'].append(result['test_recall_macro'])
    list_sklearn_results['test_f1_macro'].append(result['test_f1_macro'])

## Results

In [5]:
manual_results, result = model_NB(training, test, 200)
px.bar(y=[manual_results['test_accuracy'], result['test_accuracy']], x=["Manual Naive Bayes", "Sklearn Naive Bayes"], title="Accuracy")

In [6]:
df_results = pd.DataFrame(list_results)

fig = px.line(df_results, y=['test_accuracy', 'test_precision_macro', 'test_recall_macro', 'test_f1_macro'], x=[20, 40, 60, 80, 100, 120, 200], title="Results", range_y=[0, 1])
fig.show()

## Rebalance the training set

In [7]:
#Get the category with the least number of rows in the training set
number = training['sentiment'].value_counts().min()

print(number)

# Drop the rows that exceed the number of each category
training = training.groupby('sentiment').head(number).reset_index(drop=True)

training['sentiment'].value_counts()

859


sentiment
positive    859
negative    859
neutral     859
Name: count, dtype: int64

### Compare before and after rebalancing

In [8]:
new_manual_results, new_result = model_NB(training, test, 200)

# Compare the results without and with rebalancing
px.line(y=[manual_results['test_accuracy'], new_manual_results['test_accuracy']], x=["Without rebalancing", "With rebalancing"], title="Accuracy", range_y=[0, 1])

## Other strategies to try to improve the model

In [12]:
# Remove stopwords from the vocabulary
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS

vocab = [word for word in vocab if word not in ENGLISH_STOP_WORDS]

trainingVectors, trainingLabels = vectorize(training, vocab[:200])
testVectors, testLabels = vectorize(test, vocab[:200])

model = MultinomialNB()
model.fit(trainingVectors, trainingLabels)
predictions = model.predict(testVectors)
result = calculate_results(predictions, testLabels)

print(result)

{'test_accuracy': 0.5155709342560554, 'test_precision_macro': 0.5127051369175314, 'test_recall_macro': 0.5176457310535523, 'test_f1_macro': 0.5051328270697142}
