# Assignment Two:  Sentiment Classification

For this exercise you will be using the "SemEval 2017 task 4" corpus provided on the module website, available through the following link: https://warwick.ac.uk/fac/sci/dcs/teaching/material/cs918/semeval-tweets.tar.bz2 You will focus particularly on Subtask A, i.e. classifying the overall sentiment of a tweet as positive, negative or neutral.

You are requested to produce a Jupyter notebook for the coursework submission. The input to your program is the SemEval data downloaded. Note that TAs need to run your program on their own machine by using the original SemEval data. As such, don’t submit a Python program that takes as input some preprocessed files.

#### Define Utility Functions

In [135]:
from collections.abc import Generator
from os import path, getcwd, listdir, makedirs


def read_file_lines_from(file_path: str, /) -> Generator[str, None, None]:
    """
    Read lines from a file and yield each line as a string.
    The path to the file is relative to the current working directory.

    Args:
        file_path (str): The path to the file to be read.

    Yields:
        str: Each line of the file, stripped of leading and trailing whitespace.
    """
    full_path = path.join(getcwd(), file_path)
    buffer_size = 1024 * 1024
    with open(full_path, 'r', buffering=buffer_size, encoding='utf8') as file:
        for line in file:
            yield line.strip()


def ls(dir_path: str, /) -> tuple[str, ...]:
    """
    List all files and directories in the specified directory.
    """
    full_path = path.join(getcwd(), dir_path)
    return tuple(listdir(full_path))


def mkdir(dir_path: str, /) -> None:
    """
    Create a directory.
    """
    full_path = path.join(getcwd(), dir_path)
    if not path.exists(full_path):
        makedirs(full_path)


def path_exists(location: str, /) -> bool:
    """
    Check if the specified path exists.
    """
    full_path = path.join(getcwd(), location)
    return path.exists(full_path)

In [136]:
from collections.abc import Callable
from threading import Thread, Lock
from typing import final


@final
class BackgroundTask:
    """
    Represents a background task that can be executed concurrently.

    Args:
        task (Callable): The function or method to be executed as a background task.
        *args: Variable length argument list to be passed to the task.
        **kwargs: Arbitrary keyword arguments to be passed to the task.
    """

    def __init__(self, task: Callable[..., None], *args, **kwargs):
        self.__task = Thread(
            target=task,
            args=args,
            kwargs=kwargs,
            daemon=True
        )
        with Lock():
            self.__task.start()

    def wait(self) -> None:
        """
        Waits for the background task to complete.
        """
        return self.__task.join()

In [137]:
def unzip_file_to(file_path: str, /, destination: str) -> None:
    """
    Unzip a file to a specified destination.

    Args:
        file_path (str): The path to the file to be unzipped.
        destination (str): The path to the directory where the file will be unzipped.
    """
    import zipfile
    full_path = path.join(getcwd(), file_path)
    with zipfile.ZipFile(full_path, 'r') as zip_ref:
        zip_ref.extractall(destination)

In [138]:
from typing import Any, Final, Optional
from shelve import open as shelve_open


class GlobalCache:
    """
    A simple global cache for storing data in memory.
    """

    __runtime_cache: Final[dict[str, Any]] = {}
    __cache_file_name: Final[str] = 'cache'

    def put(self, key: str, value: object, /) -> None:
        """
        Put a value into the cache.

        Args:
            key (str): The key to be used to store the value.
            value (object): The value to be stored.
        """

        self.__runtime_cache[key] = value
        with shelve_open(self.__cache_file_name, 'c') as cache:
            cache[key] = value

    def get(self, key: str, /) -> Optional[Any]:
        """
        Get a value from the cache.

        Args:
            key (str): The key to be used to retrieve the value.

        Returns:
            object: The value stored in the cache.
        """

        if key in self.__runtime_cache:
            return self.__runtime_cache[key]

        with shelve_open(self.__cache_file_name, 'c') as cache:
            return cache.get(key)

    def remove(self, key: str, /) -> None:
        """
        Remove a value from the cache.

        Args:
            key (str): The key to be used to remove the value.
        """

        self.__runtime_cache.pop(key, None)
        with shelve_open(self.__cache_file_name, 'c') as cache:
            del cache[key]

    def clear(self) -> None:
        """
        Clear the cache.
        """

        self.__runtime_cache.clear()
        with shelve_open(self.__cache_file_name, 'c') as cache:
            cache.clear()

#### Package imports for Application logic

In [139]:
import pandas as pd
import numpy as np
import csv

from typing import Final, final
from types import NoneType
from functools import lru_cache
from enum import Enum
from collections import defaultdict
from huggingface_hub import hf_hub_download

#### Define global instances

In [140]:
dataset_base_path: Final[str] = 'data'
glove_data_dir: Final[str] = f"{dataset_base_path}/glove"
target_glove_file_name: Final[str] = "glove.6B.100d.txt"

# names of the test set files
test_set_names: Final[tuple[str, ...]] = (
    'twitter-test1.txt',
    'twitter-test2.txt',
    'twitter-test3.txt',
)
training_data_file_name: Final[str] = 'twitter-training-data.txt'
devlopment_data_file_name: Final[str] = 'twitter-dev-data.txt'


@final
class Sentiment(Enum):
    """
    An enumeration of the three possible sentiment values.
    """
    positive = 1
    negative = -1
    neutral = 0

    @classmethod
    @lru_cache
    def gts(cls) -> frozenset[str]:
        return frozenset(cls.__members__.keys())


global_cache = GlobalCache()

TweetID = str
ShouldMarkedAsBackground = NoneType

#### Define data retrieval functions

In [141]:
@lru_cache(typed=True)
def get_tweets_from(file_name_: str, /) -> dict[TweetID, tuple[str, Sentiment]]:
    """
    Reads a file containing tweets and their associated sentiment labels,
    and returns a dictionary mapping tweet IDs to tuples of tweet content and sentiment.

    Parameters:
    file_name_ (str): The name of the file to read.

    Returns:
    dict[TweetID, tuple[str, Sentiment]]: A dictionary mapping tweet IDs to tuples
    containing the tweet content and sentiment label.
    """
    id_gts: dict[TweetID, tuple[str, Sentiment]] = {}
    lines = read_file_lines_from(f'{dataset_base_path}/{file_name_}')
    for line in lines:
        fields = line.split('\t')
        tweet_id = fields[0]
        gt = fields[1]
        content = ' '.join(fields[2:])
        id_gts[tweet_id] = content, Sentiment[gt]

    return id_gts

#### Define GloVe data preparation functions

In [142]:
def prepare_glove_data() -> ShouldMarkedAsBackground:
    if path_exists(glove_data_dir) and len(ls(glove_data_dir)) == 4:
        return

    glove_data_pack_name = 'glove.6B.zip'

    hf_hub_download(
        repo_id='stanfordnlp/glove',
        filename=glove_data_pack_name,
        local_dir=dataset_base_path,
        revision='1db2080b2d94def6e5b0386a523102f9d8849e9d',
    )

    # perform shell command using python code since the thread management can be done in python.
    mkdir(glove_data_dir)
    unzip_file_to(
        f'{dataset_base_path}/{glove_data_pack_name}',
        destination=glove_data_dir
    )

In [143]:
@lru_cache(typed=True)
def parse_glove_data(file_name_: str) -> tuple[dict[int, str], np.ndarray]:
    """
    Parse the GloVe data from a given file.

    Args:
        file_name_ (str): The name of the file containing the GloVe data.

    Returns:
        tuple[dict[int, str], np.ndarray]: A tuple containing a dictionary mapping integers to words and a NumPy array of word vectors.
    """
    file_frame = pd.read_csv(
        f"{glove_data_dir}/{file_name_}",
        delimiter=' ',
        quoting=csv.QUOTE_NONE,
        header=None,
        encoding='utf-8',
        skip_blank_lines=True,
    )

    return file_frame[0].to_dict(), file_frame.iloc[:, 1:].to_numpy(dtype=np.float64)

#### Define confusion matrix function

In [144]:
def show_confusion(*, predict_results: dict[TweetID, Sentiment], test_set_file_name_: str) -> None:
    """
    Display the confusion matrix based on the predicted results and the sentiment labels from the test set file.

    Args:
        predict_results (dict[TweetID, Sentiment]): A dictionary containing the predicted sentiment for each tweet ID.
        test_set_file_name_ (str): The file name of the test set containing the sentiment labels for each tweet ID.
    """
    id_sentiments = get_tweets_from(test_set_file_name_)

    conf: Final[dict[Sentiment, dict[Sentiment, int]]] = defaultdict(
        lambda: {
            Sentiment.positive: 0,
            Sentiment.negative: 0,
            Sentiment.neutral: 0,
        }
    )

    for tweet_id, [_, sentiment] in id_sentiments.items():
        if tweet_id in predict_results:
            pred = predict_results[tweet_id]
        else:
            pred = Sentiment.neutral
        conf[pred][sentiment] += 1

    print(''.ljust(12) + '  '.join(Sentiment.gts()))

    for c1 in Sentiment:
        print(c1.name.ljust(12), end='')
        for c2 in Sentiment:
            if c1_sum := sum(conf[c1].values()) > 0:
                p = conf[c1][c2] / float(c1_sum)
                print(f"{p:.3f}     ", end='')
            else:
                print('0.000     ', end='')
        print()
    print()

#### Define evaluation functions

In [145]:
def evaluate(predict_results: dict[TweetID, Sentiment], test_set_file_name_: str, classifier_name_: str) -> None:
    """
    Evaluate the performance of a sentiment classifier by comparing the predicted results with the ground truth sentiment labels.

    Parameters:
        - predict_results: A dictionary mapping TweetIDs to predicted Sentiments.
        - test_set_file_name_: The name of the test set file.
        - classifier_name_: The name of the classifier.
    """
    id_sentiments = get_tweets_from(test_set_file_name_)

    acc_by_class: Final[dict[Sentiment, dict[str, int]]] = defaultdict(
        lambda: {'tp': 0, 'fp': 0, 'tn': 0, 'fn': 0}
    )

    for tweet_id, [_, sentiment] in id_sentiments.items():
        if tweet_id in predict_results:
            pred = predict_results[tweet_id]
        else:
            pred = Sentiment.neutral

        if sentiment == pred:
            acc_by_class[sentiment]['tp'] += 1
        else:
            acc_by_class[sentiment]['fn'] += 1
            acc_by_class[pred]['fp'] += 1

    cat_count = 0
    item_count = 0
    macro: dict[str, float] = {'p': 0.0, 'r': 0.0, 'f1': 0.0}
    micro: dict[str, float] = {'p': 0.0, 'r': 0.0, 'f1': 0.0}
    sem_eval_macro: dict[str, float] = {'p': 0.0, 'r': 0.0, 'f1': 0.0}

    micro_tp = 0.0
    micro_fp = 0.0
    micro_tn = 0.0
    micro_fn = 0.0

    cat_f1s: dict[Sentiment, float] = {}

    for cat, acc in acc_by_class.items():
        cat_count += 1

        micro_tp += acc['tp']
        micro_fp += acc['fp']
        micro_tn += acc['tn']
        micro_fn += acc['fn']

        p = 0.0
        if (acc['tp'] + acc['fp']) > 0:
            p = float(acc['tp']) / (acc['tp'] + acc['fp'])

        r = 0.0
        if (acc['tp'] + acc['fn']) > 0:
            r = float(acc['tp']) / (acc['tp'] + acc['fn'])

        f1 = 0.0
        if (p + r) > 0:
            f1 = 2 * p * r / (p + r)

        cat_f1s[cat] = f1

        n = acc['tp'] + acc['fn']

        macro['p'] += p
        macro['r'] += r
        macro['f1'] += f1

        if cat in ['positive', 'negative']:
            sem_eval_macro['p'] += p
            sem_eval_macro['r'] += r
            sem_eval_macro['f1'] += f1

        item_count += n

    micro['p'] = micro_tp / (micro_tp + micro_fp)
    micro['r'] = micro_tp / (micro_tp + micro_fn)
    micro['f1'] = 2 * micro['p'] * micro['r'] / (micro['p'] + micro['r'])

    sem_eval_macro_f1 = sem_eval_macro['f1'] / 2

    print(
        f"{test_set_file_name_} ({classifier_name_}): {sem_eval_macro_f1:.3f}"
    )

#### Load training set, dev set and testing set
Here, you need to load the training set, the development set and the test set. For better classification results, you may need to preprocess tweets before sending them to the classifiers.

In [146]:
training_data = get_tweets_from(training_data_file_name)
dev_data = get_tweets_from(devlopment_data_file_name)
test_datas = [get_tweets_from(test_set_name) for test_set_name in test_set_names]

#### Download network resources

In [147]:
glove_prepare_task = BackgroundTask(prepare_glove_data)
glove_prepare_task.wait()

In [148]:
glove_word_indexes = global_cache.get('glove_word_indexes')
glove_word_vectors = global_cache.get('glove_word_vectors')

if glove_word_indexes is None or glove_word_vectors is None:
    glove_word_indexes, glove_word_vectors = parse_glove_data(
        target_glove_file_name
    )
    global_cache.put('glove_word_indexes', glove_word_indexes)
    global_cache.put('glove_word_vectors', glove_word_vectors)

#### TF-IDF Vectorization

#### Build sentiment classifiers
You need to create your own classifiers (at least 3 classifiers). For each classifier, you can choose between the bag-of-word features and the word-embedding-based features. Each classifier has to be evaluated over 3 test sets. Make sure your classifier produce consistent performance across the test sets. Marking will be based on the performance over all 5 test sets (2 of them are not provided to you).

In [149]:
# Build traditional sentiment classifiers. An example classifier name 'svm' is given
# in the code below. You should replace the other two classifier names
# with your own choices. For features used for classifier training,
# the 'bow' feature is given in the code. But you could also explore the
# use of other features.
for classifier in ('svm', '<classifier-2-name>', '<classifier-3-name>',):
    for features in ('bow', '<feature-2-name>',):
        # Skeleton: Creation and training of the classifiers
        if classifier == 'svm':
            # write the svm classifier here
            print('Training ' + classifier)
        elif classifier == '<classifier-2-name>':
            # write the classifier 2 here
            print('Training ' + classifier)
        elif classifier == '<classifier-3-name>':
            # write the classifier 3 here
            print('Training ' + classifier)
        elif classifier == 'LSTM':
            # write the LSTM classifier here
            if features == 'bow':
                continue
            print('Training ' + classifier)
        else:
            print('Unknown classifier name' + classifier)
            continue

        # Prediction performance of the classifiers
        for test_set_name in test_set_names:
            id_predicts = {}
            # write the prediction and evaluation code here
            evaluate(id_predicts, test_set_name, features + '-' + classifier)
            

Training svm
twitter-test1.txt (bow-svm): 0.000
twitter-test2.txt (bow-svm): 0.000
twitter-test3.txt (bow-svm): 0.000
Training svm
twitter-test1.txt (<feature-2-name>-svm): 0.000
twitter-test2.txt (<feature-2-name>-svm): 0.000
twitter-test3.txt (<feature-2-name>-svm): 0.000
Training <classifier-2-name>
twitter-test1.txt (bow-<classifier-2-name>): 0.000
twitter-test2.txt (bow-<classifier-2-name>): 0.000
twitter-test3.txt (bow-<classifier-2-name>): 0.000
Training <classifier-2-name>
twitter-test1.txt (<feature-2-name>-<classifier-2-name>): 0.000
twitter-test2.txt (<feature-2-name>-<classifier-2-name>): 0.000
twitter-test3.txt (<feature-2-name>-<classifier-2-name>): 0.000
Training <classifier-3-name>
twitter-test1.txt (bow-<classifier-3-name>): 0.000
twitter-test2.txt (bow-<classifier-3-name>): 0.000
twitter-test3.txt (bow-<classifier-3-name>): 0.000
Training <classifier-3-name>
twitter-test1.txt (<feature-2-name>-<classifier-3-name>): 0.000
twitter-test2.txt (<feature-2-name>-<classifier