# Sentiment Analysis of Tweets

## Dependencies

This project uses some python 3.10 features, so make sure you have python version **>= 3.10** installed.

To install the external modules used in this project run (`pip install -r requirements.txt`).

After that, run then following cell to download the used nltk resources:

In [1]:
import nltk

nltk.download('punkt')
nltk.download('wordnet')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /home/tom/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /home/tom/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.
[nltk_data] Downloading package stopwords to /home/tom/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

## Data

I'm using the [SemEval-2013](https://aclanthology.org/S13-2052/) dataset for this project.

## Preparation

I'm defining some functions and classes to keep the codebase clean.

In [2]:
class InvalidPatternException(Exception):
    """
    Exception raised when the pattern is not supported.

    Attributes:
        pattern     The pattern that could not be matched.
    """

    def __init__(self, pattern: str):
        self.pattern = pattern
        self.message = f"'{pattern}' is not a valid pattern"

        super().__init__(self.message)


In [3]:
from enum import Enum

class Sentiment(Enum):
    """
    A sentiment (either POSITIVE, NEGATIVE or NEUTRAL).
    """

    POSITIVE = 1,
    NEGATIVE = 2,
    NEUTRAL = 0,

In [4]:
def create_sentiment(s: str) -> Sentiment:
    """
    Matches a sentiment in form of a string from the dataset to a Sentiment.

    :param s: The 'sentiment' string from the dataset.

    :raises InvalidPatternException: If s does not matches any value.
    """

    match s:
        case "positive": return Sentiment.POSITIVE
        case "negative": return Sentiment.NEGATIVE
        case "neutral": return Sentiment.NEUTRAL
        case _: raise InvalidPatternException(s)


In [5]:
from dataclasses import dataclass

@dataclass
class DataPoint():
    """
    A dataclass that represents a 'unit' of data.

    Attributes:
        sentiment   The sentiment of the tweet.
        tweet       A list of words containing the tweet.
    """

    sentiment: Sentiment
    tweet: list[str]
      

In [6]:
from nltk.tokenize import word_tokenize
import re

def read_data(file: str, lang: str = "english",
              remove_unicode: bool = True,
              remove_links: bool = True,
              do_remove_punctuation: bool = True) -> list[DataPoint]:
    """
    Reads the raw data from a file and creates a list of DataPoint objects.

    :param file:    The file to read the data from.
    :returns:       List of DataPoint objects.

    :raises AssertionError: If a line does not match the regular expression. This should never happen.
    """

    data = list()
    with open(file) as f:
        lines = f.readlines()

    pattern = r"^([0-9]+)\s+(positive|negative|neutral)\s+\"?(.+)\"?$"
    for line in lines:
        match = re.match(pattern, line)

        assert match
        
        clean_string = match.group(3)
        
        if remove_links:
            clean_string = re.sub(r"https?:\/\/www\.[a-z0-9]+\.[a-z]+(\.[a-z]+)?(\/[a-z0-9]+)*", "", clean_string)
            
        if remove_unicode:
            clean_string = re.sub(r"\\u[a-fA-F0-9]+", "", clean_string)
        
        if do_remove_punctuation:
            clean_string = re.sub(r"[#@\.;,\?!:\-%\\'\$]", "", clean_string)

        sentiment = create_sentiment(match.group(2))
        words = word_tokenize(clean_string, lang)

        data_point = DataPoint(sentiment, words)

        data.append(data_point)

    return data

### Pre-processing the data

Apart 'cleaning' the data (removing links, unicode and punctuation), I'm also doing some prepocessing

I use the `stopwords` from the `nltk.corpus` package to remove stopwords from the data

In [7]:
from nltk.corpus import stopwords

def remove_stopwords(s:list[str], lang: str = "english") -> list[str]:
    """
    Removes stopwords from a given list of words.

    :param s: The input list of words.
    :returns: The input list without stopwords.
    """

    words = [word for word in s if word not in stopwords.words(lang)]

    return words


Stemmers and/or Lemmatizers can be used to 'normalize' the words. Additionally the words can be converted to all lower case letters.

In [9]:
from typing import Callable, Optional

def normalize(s: list[str],
              lower: bool = False,
              Stemmer = None,
              stemmer_args: Optional[list] = None,
              Lemmatizer = None) -> list[str]:
    """
    Normalizes a list of words using methods specified in the function parameters.


    :param Stemmer:      A Stemmer that can optionally be used in normalization.
    :param stemmer_args: The constructor arguments of the Stemmer (if any).

    :param Lemmatizer: A Lemmatizer that can optionally be used in normalization.

    :returns: The normalized list of words.
    """
    
    if lower:
        s = [word.lower() for word in s]

    if Stemmer:
        stemmer = Stemmer(*stemmer_args) if stemmer_args else Stemmer()

        s = [stemmer.stem(word) for word in s]

    if Lemmatizer:
        lemmatizer = Lemmatizer()

        s = [lemmatizer.lemmatize(word) for word in s]

    return s


I'm defining a wrapper function that I can configure so that these functions don't have to be called by themselves.

In [39]:
def preprocess_data(data: list[DataPoint],
                    do_remove_stopwords: bool = False,
                    do_normalize: bool = False,
                    lower: bool = False,
                    stemmer = None,
                    stemmer_args = None,
                    lemmatizer = None) -> list[DataPoint]:
    """
    Preprocesses the data.

    :param data:               A list of data points containing the data.
    :param remove_stopwords:   Tells the function to remove the stopwords if true.
    :param do_normalize:       Tells the function to do some normalization if true.
    :param lower:              Converty every letter to lower case if true.
    :param stemmer:            The stemmer that is supposed to be used. Only takes effect if 'do_normalize' is True.
    :param stemmer_args:       Arguements for the stemmer. Only takes effect if 'do_normalize' is True and a Stemmer has been selected.
    :param lemmatizer:         The lemmatizer that is supposed to be used. Only takes effect if 'do_normalize' is True.

    :returns: A processed list of data points.
    """

    for d in data:
        tweet = d.tweet

        if do_remove_stopwords:
            tweet = remove_stopwords(tweet)

        if do_normalize:
            
            tweet = normalize(tweet,
                              lower=lower,
                              Stemmer=stemmer,
                              stemmer_args=stemmer_args,
                              Lemmatizer=lemmatizer)

        d.tweet = tweet

    return data


### Extracting the features and creating a "bag of words"

In [40]:
def extract_features(train_data: list[DataPoint]) -> list[str]:
  """
  Extracts the features from the training data.
  
  This is done by creating a list that contains every word in the dataset.
  So every word basically is a feature.
  
  :param train_data: is the training data that the feature set is built upon.

  :returns: A list that contains each word in the dataset once.
  """

  wordlist = set()

  for d in train_data:
      wordlist.update(d.tweet)

  wordlist = list(wordlist)

  return wordlist


In [41]:
import numpy as np

def create_bag_of_words(
        wordlist: list[str],
        data: list[DataPoint],
        force_positive_numbers: bool = False) -> list[list[int]]:
    """
    Creates an NxM matrix where N is the total number of unique words in the dataset
    and M is the amount of tweets.

    The matrix is filled with integers, where the integer for each word represents how
    often it appears in that tweet.

    So if we had three tweets looking like this:

    I did good.
    I went to the concert.
    We went to see him.

    And assuming the wordlist generated would look like this:

    ["I", "did", "good", "went", "to", "the", "concert", "We", "see", "him"]

    The matrix would look like this:

    [
    [1, 1, 1, 0, 0, 0, 0, 0, 0, 0],
    [1, 0, 0, 1, 1, 1, 1, 0, 0, 0]
    [0, 0, 0, 1, 1, 0, 0, 1, 1, 1]
    ]

    Negative number are possible:

    I did not good.

    Generates the following matrix:

    [[1, 1, 1, -1]]

    The reason for this is that everything after a "not" is inverted.
    This can be avoided by using the 'force_positive_numbers' flag and setting it to true.

    :param wordlist:                the list of words that are defined as features.
    :param data:                    the list of data points that represents the data set.
    :param force_positive_numbers:  optional flag to avoid negative values in the matrix.
    """

    bag_of_words = list()

    for d in data:
      mod = 1
      word_vector = np.zeros((len(wordlist), ), dtype=np.int64)
      for word in d.tweet:
          try:
              word_index = wordlist.index(word)
              word_vector[word_index] += (1 * mod)
              if word == "not":
                  mod *= -1
          except ValueError:
              # NOTE: We just skip the word if it's not in our wordlist
              continue

      bag_of_words.append(word_vector)

    if force_positive_numbers:
        for lst in bag_of_words:
            for i, value in enumerate(lst):
                lst[i] = max(value, 0)




    return bag_of_words


### Reading the data

Now I can start reading the data:

In [42]:
train_data = read_data("./semeval2013/twitter-2013train-A.txt")
dev_data = read_data("./semeval2013/twitter-2013dev-A.txt", do_remove_punctuation=False)
test_data = read_data("./semeval2013/twitter-2013test-A.txt", do_remove_punctuation=False)

Preprocessing and extracting features:

In [60]:
data = preprocess_data(train_data, do_remove_stopwords=True, do_normalize=False)

word_list = extract_features(data)

Creating the training and testing sets:

In [61]:
x_train = create_bag_of_words(word_list, data)
y_train = [d.sentiment._value_[0] for d in data]

x_dev = create_bag_of_words(word_list, dev_data)
y_dev = [d.sentiment._value_[0] for d in dev_data]

### Classifier

I'm using the [Random Forest Classifier](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html) for this project.

In [66]:
from sklearn.ensemble import RandomForestClassifier


random_forest_classifier = RandomForestClassifier(n_estimators=900,
                                                  max_depth=8,
                                                  verbose=1,
                                                  n_jobs=50,
                                                  class_weight="balanced")


random_forest_classifier.fit(x_train, y_train)



[Parallel(n_jobs=50)]: Using backend ThreadingBackend with 50 concurrent workers.
[Parallel(n_jobs=50)]: Done 100 tasks      | elapsed:   14.5s
[Parallel(n_jobs=50)]: Done 350 tasks      | elapsed:   44.1s
[Parallel(n_jobs=50)]: Done 700 tasks      | elapsed:  1.4min
[Parallel(n_jobs=50)]: Done 900 out of 900 | elapsed:  1.8min finished


RandomForestClassifier(class_weight='balanced', max_depth=8, n_estimators=900,
                       n_jobs=50, verbose=1)

In [67]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score


predictions = random_forest_classifier.predict(x_train)

print(classification_report(predictions, y_train))
print(confusion_matrix(predictions, y_train))
print(accuracy_score(predictions, y_train))

[Parallel(n_jobs=50)]: Using backend ThreadingBackend with 50 concurrent workers.
[Parallel(n_jobs=50)]: Done 100 tasks      | elapsed:    0.1s
[Parallel(n_jobs=50)]: Done 350 tasks      | elapsed:    0.3s


              precision    recall  f1-score   support

           0       0.86      0.73      0.79      5381
           1       0.65      0.83      0.73      2826
           2       0.68      0.67      0.67      1477

    accuracy                           0.75      9684
   macro avg       0.73      0.74      0.73      9684
weighted avg       0.77      0.75      0.75      9684

[[3930 1081  370]
 [ 370 2354  102]
 [ 286  205  986]]
0.7507228418009088


[Parallel(n_jobs=50)]: Done 700 tasks      | elapsed:    0.5s
[Parallel(n_jobs=50)]: Done 900 out of 900 | elapsed:    0.6s finished


In [68]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

predictions = random_forest_classifier.predict(x_dev)

print(classification_report(predictions, y_dev))
print(confusion_matrix(predictions, y_dev))
print(accuracy_score(predictions, y_dev))

[Parallel(n_jobs=50)]: Using backend ThreadingBackend with 50 concurrent workers.
[Parallel(n_jobs=50)]: Done 100 tasks      | elapsed:    0.0s
[Parallel(n_jobs=50)]: Done 350 tasks      | elapsed:    0.1s


              precision    recall  f1-score   support

           0       0.81      0.61      0.70       987
           1       0.49      0.70      0.58       399
           2       0.46      0.59      0.52       268

    accuracy                           0.63      1654
   macro avg       0.59      0.63      0.60      1654
weighted avg       0.68      0.63      0.64      1654

[[601 250 136]
 [ 72 281  46]
 [ 66  44 158]]
0.6287787182587666


[Parallel(n_jobs=50)]: Done 700 tasks      | elapsed:    0.2s
[Parallel(n_jobs=50)]: Done 900 out of 900 | elapsed:    0.3s finished


In [65]:
x_test = create_bag_of_words(word_list, test_data)
y_test = [d.sentiment._value_[0] for d in test_data]


predictions = random_forest_classifier.predict(x_test)

print(classification_report(predictions, y_test))
print(confusion_matrix(predictions, y_test))
print(accuracy_score(predictions, y_test))

[Parallel(n_jobs=50)]: Using backend ThreadingBackend with 50 concurrent workers.
[Parallel(n_jobs=50)]: Done 100 tasks      | elapsed:    0.1s
[Parallel(n_jobs=50)]: Done 350 tasks      | elapsed:    0.2s


              precision    recall  f1-score   support

           0       0.91      0.55      0.69      2505
           1       0.42      0.84      0.56       736
           2       0.32      0.58      0.41       306

    accuracy                           0.61      3547
   macro avg       0.55      0.66      0.55      3547
weighted avg       0.76      0.61      0.64      3547

[[1382  784  339]
 [  76  618   42]
 [  55   73  178]]
0.6140400338314068


[Parallel(n_jobs=50)]: Done 700 tasks      | elapsed:    0.3s
[Parallel(n_jobs=50)]: Done 900 out of 900 | elapsed:    0.4s finished


### Summary

In this model I can reach an accuracy of **62.73%** on the validation set.
The f1-scores are also decent accross the board.