In [2]:
import copy
import os
import re

import numpy
import pandas
import random
import torch
import unicodedata
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from torch import nn
from torch.utils.data import Dataset, DataLoader
from matplotlib import pyplot
from sklearn.metrics import f1_score, recall_score, precision_score, confusion_matrix, ConfusionMatrixDisplay, \
classification_report

In [3]:
class Logger:

    def __init__(self, name):
        """
        Utility class created to thoroughly document the outcomes of each experiment.
        """
        currentDirectory = os.getcwd()
        directories = []

        try:
            os.mkdir(f"{currentDirectory}\\{name}")
        except OSError as osError:
            print(f"Logger: {osError}")

        currentDirectory = f"{currentDirectory}\\{name}"

        for dirName in os.listdir(currentDirectory):
            if "EXPERIMENT" in dirName:
                directories.append(int(dirName[dirName.index("-") + 1:]))

        if len(directories) == 0:
            self.__directory = f"{currentDirectory}\\EXPERIMENT-0"
            os.mkdir(self.__directory)

        else:
            directories.sort()
            latestDirectory = directories[-1]
            self.__directory = f"{currentDirectory}\\EXPERIMENT-{latestDirectory + 1}"
            os.mkdir(self.__directory)

        self.__log = open(f"{self.__directory}\\LOG.txt", "w", encoding="utf-8")

    def log(self, toLog, newLine=True, title=None):
        if newLine:
            toLog += "\n"
        if title is not None:
            self.__log.write(f"{title}\n")
        self.__log.write(toLog)
        self.__log.flush()

    def line(self):
        self.__log.write("\n\n")
        self.__log.flush()

    def directory(self):
        return self.__directory

    def close(self):
        self.__log.close()

In [4]:
class Embeddings:

    def __init__(self, path):
        """
        Utility class that stores all the essential information related to the word embeddings.
        """
        self.__embeddings = dict()
        self.__dimensions = -1
        file = open(path, "r", encoding="utf-8")

        for embedding in file:
            vector = []

            tokens = embedding.replace("\n", "").split()

            for i in range(1, len(tokens)):
                vector.append(float(tokens[i]))

            self.__embeddings[tokens[0]] = vector

            if self.__dimensions == -1:
                self.__dimensions = len(tokens) - 1

        file.close()

    def __contains__(self, item):
        return item in self.__embeddings.keys()

    def __getitem__(self, item):
        if item in self.__embeddings.keys():
            return self.__embeddings[item]
        else:
            return [0.0 for _ in range(self.__dimensions)]

    def dimensions(self):
        return self.__dimensions

In [5]:
class Tweet:
    classToNumber = {"POSITIVE": 0, "NEGATIVE": 1, "NEUTRAL": 2}
    numberToClass = {0: "POSITIVE", 1: "NEGATIVE", 2: "NEUTRAL"}
    
    def __init__(self):
        """
        Utility class that stores all the essential information related to a tweet.
        """
        self.__text = None
        self.__mentions = []
        self.__links = []
        self.__hashtags = []
        self.__sentiment = None
        self.__party = None
        self.__id = None

    def setID(self, id):
        self.__id = id

    def getID(self):
        return self.__id

    def addLinks(self, links):
        for link in links:
            self.__links.append(link)

    def getLinks(self):
        return self.__links

    def getTotalLinks(self):
        return len(self.__links)

    def addHashtags(self, hashtags):
        for hashtag in hashtags:
            self.__hashtags.append(hashtag)

    def getHashtags(self):
        return self.__hashtags

    def getTotalHashtags(self):
        return len(self.__hashtags)

    def addMentions(self, mentions):
        for mention in mentions:
            self.__mentions.append(mention)

    def getMentions(self):
        return self.__mentions

    def getTotalMentions(self):
        return len(self.__mentions)

    def setParty(self, party):
        self.__party = party

    def getParty(self):
        return self.__party

    def setSentiment(self, sentiment):
        self.__sentiment = sentiment

    def getSentiment(self):
        return self.__sentiment

    def setText(self, text):
        self.__text = text

    def getText(self):
        return self.__text

In [6]:
class Stopwords:

    def __init__(self, path):
        """
        Utility class that stores all the essential information related to Greek stopwords.
        """
        self.__stopwords = set()

        stopwordFile = open(path, "r", encoding="utf8")

        words = []

        for word in stopwordFile:
            words.append(word)

        for word in words:
            stopword = word.replace("\n", "")
            self.__stopwords.add(stopword)

        stopwordFile.close()

    def __contains__(self, item):
        return item in self.__stopwords

In [7]:
"""
Python implementation of the Greek stemmer based on the paper of Georgios Ntais which can be found at the following link https://people.dsv.su.se/~hercules/papers/Ntais_greek_stemmer_thesis_final.pdf
"""

__cases = dict()
__cases["ΦΑΓΙΑ"] = "ΦΑ"
__cases["ΦΑΓΙΟΥ"] = "ΦΑ"
__cases["ΦΑΓΙΩΝ"] = "ΦΑ"
__cases["ΣΚΑΓΙΑ"] = "ΣΚΑ"
__cases["ΣΚΑΓΙΟΥ"] = "ΣΚΑ"
__cases["ΣΚΑΓΙΩΝ"] = "ΣΚΑ"
__cases["ΟΛΟΓΙΟΥ"] = "ΟΛΟ"
__cases["ΟΛΟΓΙΑ"] = "ΟΛΟ"
__cases["ΟΛΟΓΙΩΝ"] = "ΟΛΟ"
__cases["ΣΟΓΙΟΥ"] = "ΣΟ"
__cases["ΣΟΓΙΑ"] = "ΣΟ"
__cases["ΣΟΓΙΩΝ"] = "ΣΟ"
__cases["ΤΑΤΟΓΙΑ"] = "ΤΑΤΟ"
__cases["ΤΑΤΟΓΙΟΥ"] = "ΤΑΤΟ"
__cases["ΤΑΤΟΓΙΩΝ"] = "ΤΑΤΟ"
__cases["ΚΡΕΑΣ"] = "ΚΡΕ"
__cases["ΚΡΕΑΤΟΣ"] = "ΚΡΕ"
__cases["ΚΡΕΑΤΑ"] = "ΚΡΕ"
__cases["ΚΡΕΑΤΩΝ"] = "ΚΡΕ"
__cases["ΠΕΡΑΣ"] = "ΠΕΡ"
__cases["ΠΕΡΑΤΟΣ"] = "ΠΕΡ"
__cases["ΠΕΡΑΤΑ"] = "ΠΕΡ"
__cases["ΠΕΡΑΤΩΝ"] = "ΠΕΡ"
__cases["ΤΕΡΑΣ"] = "ΤΕΡ"
__cases["ΤΕΡΑΤΟΣ"] = "ΤΕΡ"
__cases["ΤΕΡΑΤΑ"] = "ΤΕΡ"
__cases["ΤΕΡΑΤΩΝ"] = "ΤΕΡ"
__cases["ΦΩΣ"] = "ΦΩ"
__cases["ΦΩΤΟΣ"] = "ΦΩ"
__cases["ΦΩΤΑ"] = "ΦΩ"
__cases["ΦΩΤΩΝ"] = "ΦΩ"
__cases["ΚΑΘΕΣΤΩΣ"] = "ΚΑΘΕΣΤ"
__cases["ΚΑΘΕΣΤΩΤΟΣ"] = "ΚΑΘΕΣΤ"
__cases["ΚΑΘΕΣΤΩΤΑ"] = "ΚΑΘΕΣΤ"
__cases["ΚΑΘΕΣΤΩΤΩΝ"] = "ΚΑΘΕΣΤ"
__cases["ΓΕΓΟΝΟΣ"] = "ΓΕΓΟΝ"
__cases["ΓΕΓΟΝΟΤΟΣ"] = "ΓΕΓΟΝ"
__cases["ΓΕΓΟΝΟΤΑ"] = "ΓΕΓΟΝ"
__cases["ΓΕΓΟΝΟΤΩΝ"] = "ΓΕΓΟΝ"
__vowels = "[ΑΕΗΙΟΥΩ]"
__refinedVowels = "[ΑΕΗΙΟΩ]"

def stemWord(w: str, exceptions: dict = None):
    stem = None
    suffix = None
    test1 = True

    if exceptions is not None and w in exceptions.keys():
        return exceptions[w]

    if len(w) < 4:
        return w

    pattern = None
    pattern2 = None
    pattern3 = None
    pattern4 = None

    # Step1
    pattern = re.compile(
        r"(.*)(ΦΑΓΙΑ|ΦΑΓΙΟΥ|ΦΑΓΙΩΝ|ΣΚΑΓΙΑ|ΣΚΑΓΙΟΥ|ΣΚΑΓΙΩΝ|ΟΛΟΓΙΟΥ|ΟΛΟΓΙΑ|ΟΛΟΓΙΩΝ|ΣΟΓΙΟΥ|ΣΟΓΙΑ|ΣΟΓΙΩΝ|ΤΑΤΟΓΙΑ|ΤΑΤΟΓΙΟΥ|ΤΑΤΟΓΙΩΝ|ΚΡΕΑΣ|ΚΡΕΑΤΟΣ|ΚΡΕΑΤΑ|ΚΡΕΑΤΩΝ|ΠΕΡΑΣ|ΠΕΡΑΤΟΣ|ΠΕΡΑΤΑ|ΠΕΡΑΤΩΝ|ΤΕΡΑΣ|ΤΕΡΑΤΟΣ|ΤΕΡΑΤΑ|ΤΕΡΑΤΩΝ|ΦΩΣ|ΦΩΤΟΣ|ΦΩΤΑ|ΦΩΤΩΝ|ΚΑΘΕΣΤΩΣ|ΚΑΘΕΣΤΩΤΟΣ|ΚΑΘΕΣΤΩΤΑ|ΚΑΘΕΣΤΩΤΩΝ|ΓΕΓΟΝΟΣ|ΓΕΓΟΝΟΤΟΣ|ΓΕΓΟΝΟΤΑ|ΓΕΓΟΝΟΤΩΝ)$")

    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        suffix = fp[1]
        w = stem + __cases[suffix]
        test1 = False

    # Step 2a
    pattern = re.compile(r"^(.+?)(ΑΔΕΣ|ΑΔΩΝ)$")
    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        reg1 = re.compile(r"(ΟΚ|ΜΑΜ|ΜΑΝ|ΜΠΑΜΠ|ΠΑΤΕΡ|ΓΙΑΓΙ|ΝΤΑΝΤ|ΚΥΡ|ΘΕΙ|ΠΕΘΕΡ)$")

        if not reg1.match(w):
            w = w + "ΑΔ"

    # Step 2b
    pattern2 = re.compile(r"^(.+?)(ΕΔΕΣ|ΕΔΩΝ)$")
    if pattern2.match(w):
        fp = pattern2.match(w).groups()
        stem = fp[0]
        w = stem
        except2 = re.compile(r"(ΟΠ|ΙΠ|ΕΜΠ|ΥΠ|ΓΗΠ|ΔΑΠ|ΚΡΑΣΠ|ΜΙΛ)$")
        if except2.match(w):
            w = w + "ΕΔ"

    # Step 2c
    pattern3 = re.compile(r"^(.+?)(ΟΥΔΕΣ|ΟΥΔΩΝ)$")
    if pattern3.match(w):
        fp = pattern3.match(w).groups()
        stem = fp[0]
        w = stem
        except3 = re.compile(r"(ΑΡΚ|ΚΑΛΙΑΚ|ΠΕΤΑΛ|ΛΙΧ|ΠΛΕΞ|ΣΚ|Σ|ΦΛ|ΦΡ|ΒΕΛ|ΛΟΥΛ|ΧΝ|ΣΠ|ΤΡΑΓ|ΦΕ)$")
        if except3.match(w):
            w = w + "ΟΥΔ"

    # Step 2d
    pattern4 = re.compile("^(.+?)(ΕΩΣ|ΕΩΝ)$")
    if pattern4.match(w):
        fp = pattern4.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        except4 = re.compile(r"^(Θ|Δ|ΕΛ|ΓΑΛ|Ν|Π|ΙΔ|ΠΑΡ)$")
        if except4.match(w):
            w = w + "Ε"

    # Step 3
    pattern = re.compile(r"^(.+?)(ΙΑ|ΙΟΥ|ΙΩΝ)$")
    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        pattern2 = re.compile(__vowels + "$")
        test1 = False
        if pattern2.match(w):
            w = stem + "Ι"

    # Step 4
    pattern = re.compile(r"^(.+?)(ΙΚΑ|ΙΚΟ|ΙΚΟΥ|ΙΚΩΝ)$")
    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        pattern2 = re.compile(__vowels + "$")
        except5 = re.compile(
            r"^(ΑΛ|ΑΔ|ΕΝΔ|ΑΜΑΝ|ΑΜΜΟΧΑΛ|ΗΘ|ΑΝΗΘ|ΑΝΤΙΔ|ΦΥΣ|ΒΡΩΜ|ΓΕΡ|ΕΞΩΔ|ΚΑΛΠ|ΚΑΛΛΙΝ|ΚΑΤΑΔ|ΜΟΥΛ|ΜΠΑΝ|ΜΠΑΓΙΑΤ|ΜΠΟΛ|ΜΠΟΣ|ΝΙΤ|ΞΙΚ|ΣΥΝΟΜΗΛ|ΠΕΤΣ|ΠΙΤΣ|ΠΙΚΑΝΤ|ΠΛΙΑΤΣ|ΠΟΣΤΕΛΝ|ΠΡΩΤΟΔ|ΣΕΡΤ|ΣΥΝΑΔ|ΤΣΑΜ|ΥΠΟΔ|ΦΙΛΟΝ|ΦΥΛΟΔ|ΧΑΣ)$")
        if except5.match(w) or pattern2.match(w):
            w = w + "ΙΚ"

    # step 5a
    pattern = re.compile(r"^(.+?)(ΑΜΕ)$")
    pattern2 = re.compile(r"^(.+?)(ΑΓΑΜΕ|ΗΣΑΜΕ|ΟΥΣΑΜΕ|ΗΚΑΜΕ|ΗΘΗΚΑΜΕ)$")
    if w == "ΑΓΑΜΕ":
        w = "ΑΓΑΜ"

    if pattern2.match(w):
        fp = pattern2.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False

    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        except6 = re.compile(r"^(ΑΝΑΠ|ΑΠΟΘ|ΑΠΟΚ|ΑΠΟΣΤ|ΒΟΥΒ|ΞΕΘ|ΟΥΛ|ΠΕΘ|ΠΙΚΡ|ΠΟΤ|ΣΙΧ|Χ)$")
        if except6.match(w):
            w = w + "ΑΜ"

    # Step 5b
    pattern2 = re.compile(r"^(.+?)(ΑΝΕ)$")
    pattern3 = re.compile(r"^(.+?)(ΑΓΑΝΕ|ΗΣΑΝΕ|ΟΥΣΑΝΕ|ΙΟΝΤΑΝΕ|ΙΟΤΑΝΕ|ΙΟΥΝΤΑΝΕ|ΟΝΤΑΝΕ|ΟΤΑΝΕ|ΟΥΝΤΑΝΕ|ΗΚΑΝΕ|ΗΘΗΚΑΝΕ)$")
    if pattern3.match(w):
        fp = pattern3.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        pattern3 = re.compile(r"^(ΤΡ|ΤΣ)$")
        if pattern3.match(w):
            w = w + "ΑΓΑΝ"

    if pattern2.match(w):
        fp = pattern2.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        pattern2 = re.compile(__refinedVowels + "$")
        except7 = re.compile(
            r"^(ΒΕΤΕΡ|ΒΟΥΛΚ|ΒΡΑΧΜ|Γ|ΔΡΑΔΟΥΜ|Θ|ΚΑΛΠΟΥΖ|ΚΑΣΤΕΛ|ΚΟΡΜΟΡ|ΛΑΟΠΛ|ΜΩΑΜΕΘ|Μ|ΜΟΥΣΟΥΛΜ|Ν|ΟΥΛ|Π|ΠΕΛΕΚ|ΠΛ|ΠΟΛΙΣ|ΠΟΡΤΟΛ|ΣΑΡΑΚΑΤΣ|ΣΟΥΛΤ|ΤΣΑΡΛΑΤ|ΟΡΦ|ΤΣΙΓΓ|ΤΣΟΠ|ΦΩΤΟΣΤΕΦ|Χ|ΨΥΧΟΠΛ|ΑΓ|ΟΡΦ|ΓΑΛ|ΓΕΡ|ΔΕΚ|ΔΙΠΛ|ΑΜΕΡΙΚΑΝ|ΟΥΡ|ΠΙΘ|ΠΟΥΡΙΤ|Σ|ΖΩΝΤ|ΙΚ|ΚΑΣΤ|ΚΟΠ|ΛΙΧ|ΛΟΥΘΗΡ|ΜΑΙΝΤ|ΜΕΛ|ΣΙΓ|ΣΠ|ΣΤΕΓ|ΤΡΑΓ|ΤΣΑΓ|Φ|ΕΡ|ΑΔΑΠ|ΑΘΙΓΓ|ΑΜΗΧ|ΑΝΙΚ|ΑΝΟΡΓ|ΑΠΗΓ|ΑΠΙΘ|ΑΤΣΙΓΓ|ΒΑΣ|ΒΑΣΚ|ΒΑΘΥΓΑΛ|ΒΙΟΜΗΧ|ΒΡΑΧΥΚ|ΔΙΑΤ|ΔΙΑΦ|ΕΝΟΡΓ|ΘΥΣ|ΚΑΠΝΟΒΙΟΜΗΧ|ΚΑΤΑΓΑΛ|ΚΛΙΒ|ΚΟΙΛΑΡΦ|ΛΙΒ|ΜΕΓΛΟΒΙΟΜΗΧ|ΜΙΚΡΟΒΙΟΜΗΧ|ΝΤΑΒ|ΞΗΡΟΚΛΙΒ|ΟΛΙΓΟΔΑΜ|ΟΛΟΓΑΛ|ΠΕΝΤΑΡΦ|ΠΕΡΗΦ|ΠΕΡΙΤΡ|ΠΛΑΤ|ΠΟΛΥΔΑΠ|ΠΟΛΥΜΗΧ|ΣΤΕΦ|ΤΑΒ|ΤΕΤ|ΥΠΕΡΗΦ|ΥΠΟΚΟΠ|ΧΑΜΗΛΟΔΑΠ|ΨΗΛΟΤΑΒ)$")
        if (pattern2.match(w)) or (except7.match(w)):
            w = w + "ΑΝ"

    # //Step 5c
    pattern3 = re.compile(r"^(.+?)(ΕΤΕ)$")
    pattern4 = re.compile(r"^(.+?)(ΗΣΕΤΕ)$")
    if pattern4.match(w):
        fp = pattern4.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False

    if pattern3.match(w):
        fp = pattern3.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        pattern3 = re.compile(__refinedVowels + "$")
        except8 = re.compile(
            r"(ΟΔ|ΑΙΡ|ΦΟΡ|ΤΑΘ|ΔΙΑΘ|ΣΧ|ΕΝΔ|ΕΥΡ|ΤΙΘ|ΥΠΕΡΘ|ΡΑΘ|ΕΝΘ|ΡΟΘ|ΣΘ|ΠΥΡ|ΑΙΝ|ΣΥΝΔ|ΣΥΝ|ΣΥΝΘ|ΧΩΡ|ΠΟΝ|ΒΡ|ΚΑΘ|ΕΥΘ|ΕΚΘ|ΝΕΤ|ΡΟΝ|ΑΡΚ|ΒΑΡ|ΒΟΛ|ΩΦΕΛ)$")
        except9 = re.compile(
            r"^(ΑΒΑΡ|ΒΕΝ|ΕΝΑΡ|ΑΒΡ|ΑΔ|ΑΘ|ΑΝ|ΑΠΛ|ΒΑΡΟΝ|ΝΤΡ|ΣΚ|ΚΟΠ|ΜΠΟΡ|ΝΙΦ|ΠΑΓ|ΠΑΡΑΚΑΛ|ΣΕΡΠ|ΣΚΕΛ|ΣΥΡΦ|ΤΟΚ|Υ|Δ|ΕΜ|ΘΑΡΡ|Θ)$")
        if (pattern3.match(w)) or (except8.match(w)) or (except9.match(w)):
            w = w + "ΕΤ"

    # Step 5d
    pattern = re.compile(r"^(.+?)(ΟΝΤΑΣ|ΩΝΤΑΣ)$")
    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        except10 = re.compile(r"^(ΑΡΧ)$")
        except11 = re.compile(r"(ΚΡΕ)$")
        if except10.match(w):
            w = w + "ΟΝΤ"
        if except11.match(w):
            w = w + "ΩΝΤ"

    # Step 5e
    pattern = re.compile(r"^(.+?)(ΟΜΑΣΤΕ|ΙΟΜΑΣΤΕ)$")
    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        except11 = re.compile("^(ΟΝ)$")
        if except11.match(w):
            w = w + "ΟΜΑΣΤ"

    # Step 5f
    pattern = re.compile(r"^(.+?)(ΕΣΤΕ)$")
    pattern2 = re.compile(r"^(.+?)(ΙΕΣΤΕ)$")
    if pattern2.match(w):
        fp = pattern2.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        pattern2 = re.compile(r"^(Π|ΑΠ|ΣΥΜΠ|ΑΣΥΜΠ|ΑΚΑΤΑΠ|ΑΜΕΤΑΜΦ)$")
        if pattern2.match(w):
            w = w + "ΙΕΣΤ"

    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        except12 = re.compile(r"^(ΑΛ|ΑΡ|ΕΚΤΕΛ|Ζ|Μ|Ξ|ΠΑΡΑΚΑΛ|ΑΡ|ΠΡΟ|ΝΙΣ)$")
        if except12.match(w):
            w = w + "ΕΣΤ"

    # Step 5g
    pattern = re.compile(r"^(.+?)(ΗΚΑ|ΗΚΕΣ|ΗΚΕ)$")
    pattern2 = re.compile(r"^(.+?)(ΗΘΗΚΑ|ΗΘΗΚΕΣ|ΗΘΗΚΕ)$")
    if pattern2.match(w):
        fp = pattern2.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False

    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        except13 = re.compile(r"(ΣΚΩΛ|ΣΚΟΥΛ|ΝΑΡΘ|ΣΦ|ΟΘ|ΠΙΘ)$")
        except14 = re.compile(r"^(ΔΙΑΘ|Θ|ΠΑΡΑΚΑΤΑΘ|ΠΡΟΣΘ|ΣΥΝΘ|)$")
        if (except13.match(w)) or (except14.match(w)):
            w = w + "ΗΚ"

    # Step 5h
    pattern = re.compile(r"^(.+?)(ΟΥΣΑ|ΟΥΣΕΣ|ΟΥΣΕ)$")
    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        except15 = re.compile(
            r"^(ΦΑΡΜΑΚ|ΧΑΔ|ΑΓΚ|ΑΝΑΡΡ|ΒΡΟΜ|ΕΚΛΙΠ|ΛΑΜΠΙΔ|ΛΕΧ|Μ|ΠΑΤ|Ρ|Λ|ΜΕΔ|ΜΕΣΑΖ|ΥΠΟΤΕΙΝ|ΑΜ|ΑΙΘ|ΑΝΗΚ|ΔΕΣΠΟΖ|ΕΝΔΙΑΦΕΡ|ΔΕ|ΔΕΥΤΕΡΕΥ|ΚΑΘΑΡΕΥ|ΠΛΕ|ΤΣΑ)$")
        except16 = re.compile(r"(ΠΟΔΑΡ|ΒΛΕΠ|ΠΑΝΤΑΧ|ΦΡΥΔ|ΜΑΝΤΙΛ|ΜΑΛΛ|ΚΥΜΑΤ|ΛΑΧ|ΛΗΓ|ΦΑΓ|ΟΜ|ΠΡΩΤ)$")
        if (except15.match(w)) or (except16.match(w)):
            w = w + "ΟΥΣ"

    # Step 5i
    pattern = re.compile(r"^(.+?)(ΑΓΑ|ΑΓΕΣ|ΑΓΕ)$")
    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        except17 = re.compile(r"^(ΨΟΦ|ΝΑΥΛΟΧ)$")
        except20 = re.compile(r"(ΚΟΛΛ)$")
        except18 = re.compile(
            r"^(ΑΒΑΣΤ|ΠΟΛΥΦ|ΑΔΗΦ|ΠΑΜΦ|Ρ|ΑΣΠ|ΑΦ|ΑΜΑΛ|ΑΜΑΛΛΙ|ΑΝΥΣΤ|ΑΠΕΡ|ΑΣΠΑΡ|ΑΧΑΡ|ΔΕΡΒΕΝ|ΔΡΟΣΟΠ|ΞΕΦ|ΝΕΟΠ|ΝΟΜΟΤ|ΟΛΟΠ|ΟΜΟΤ|ΠΡΟΣΤ|ΠΡΟΣΩΠΟΠ|ΣΥΜΠ|ΣΥΝΤ|Τ|ΥΠΟΤ|ΧΑΡ|ΑΕΙΠ|ΑΙΜΟΣΤ|ΑΝΥΠ|ΑΠΟΤ|ΑΡΤΙΠ|ΔΙΑΤ|ΕΝ|ΕΠΙΤ|ΚΡΟΚΑΛΟΠ|ΣΙΔΗΡΟΠ|Λ|ΝΑΥ|ΟΥΛΑΜ|ΟΥΡ|Π|ΤΡ|Μ)$")
        except19 = re.compile(r"(ΟΦ|ΠΕΛ|ΧΟΡΤ|ΛΛ|ΣΦ|ΡΠ|ΦΡ|ΠΡ|ΛΟΧ|ΣΜΗΝ)$")
        if (except18.match(w) and except19.match(w)) and not ((except17.match(w)) or (except20.match(w))):
            w = w + "ΑΓ"

    # Step 5j
    pattern = re.compile("^(.+?)(ΗΣΕ|ΗΣΟΥ|ΗΣΑ)$")
    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        except21 = re.compile(r"^(Ν|ΧΕΡΣΟΝ|ΔΩΔΕΚΑΝ|ΕΡΗΜΟΝ|ΜΕΓΑΛΟΝ|ΕΠΤΑΝ)$")
        if except21.match(w):
            w = w + "ΗΣ"

    # Step 5k
    pattern = re.compile(r"^(.+?)(ΗΣΤΕ)$")

    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        except22 = re.compile(r"^(ΑΣΒ|ΣΒ|ΑΧΡ|ΧΡ|ΑΠΛ|ΑΕΙΜΝ|ΔΥΣΧΡ|ΕΥΧΡ|ΚΟΙΝΟΧΡ|ΠΑΛΙΜΨ)$")
        if except22.match(w):
            w = w + "ΗΣΤ"

    # Step 5l
    pattern = re.compile("^(.+?)(ΟΥΝΕ|ΗΣΟΥΝΕ|ΗΘΟΥΝΕ)$")
    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        except23 = re.compile("^(Ν|Ρ|ΣΠΙ|ΣΤΡΑΒΟΜΟΥΤΣ|ΚΑΚΟΜΟΥΤΣ|ΕΞΩΝ)$")
        if except23.match(w):
            w = w + "ΟΥΝ"

    # Step 5l
    pattern = re.compile(r"^(.+?)(ΟΥΜΕ|ΗΣΟΥΜΕ|ΗΘΟΥΜΕ)$")
    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem
        test1 = False
        except24 = re.compile(r"^(ΠΑΡΑΣΟΥΣ|Φ|Χ|ΩΡΙΟΠΛ|ΑΖ|ΑΛΛΟΣΟΥΣ|ΑΣΟΥΣ)$")
        if except24.match(w):
            w = w + "ΟΥΜ"

    # Step 6
    pattern = re.compile(r"^(.+?)(ΜΑΤΑ|ΜΑΤΩΝ|ΜΑΤΟΣ)$")
    pattern2 = re.compile(
        r"^(.+?)(Α|ΑΓΑΤΕ|ΑΓΑΝ|ΑΕΙ|ΑΜΑΙ|ΑΝ|ΑΣ|ΑΣΑΙ|ΑΤΑΙ|ΑΩ|Ε|ΕΙ|ΕΙΣ|ΕΙΤΕ|ΕΣΑΙ|ΕΣ|ΕΤΑΙ|Ι|ΙΕΜΑΙ|ΙΕΜΑΣΤΕ|ΙΕΤΑΙ|ΙΕΣΑΙ|ΙΕΣΑΣΤΕ|ΙΟΜΑΣΤΑΝ|ΙΟΜΟΥΝ|ΙΟΜΟΥΝΑ|ΙΟΝΤΑΝ|ΙΟΝΤΟΥΣΑΝ|ΙΟΣΑΣΤΑΝ|ΙΟΣΑΣΤΕ|ΙΟΣΟΥΝ|ΙΟΣΟΥΝΑ|ΙΟΤΑΝ|ΙΟΥΜΑ|ΙΟΥΜΑΣΤΕ|ΙΟΥΝΤΑΙ|ΙΟΥΝΤΑΝ|Η|ΗΔΕΣ|ΗΔΩΝ|ΗΘΕΙ|ΗΘΕΙΣ|ΗΘΕΙΤΕ|ΗΘΗΚΑΤΕ|ΗΘΗΚΑΝ|ΗΘΟΥΝ|ΗΘΩ|ΗΚΑΤΕ|ΗΚΑΝ|ΗΣ|ΗΣΑΝ|ΗΣΑΤΕ|ΗΣΕΙ|ΗΣΕΣ|ΗΣΟΥΝ|ΗΣΩ|Ο|ΟΙ|ΟΜΑΙ|ΟΜΑΣΤΑΝ|ΟΜΟΥΝ|ΟΜΟΥΝΑ|ΟΝΤΑΙ|ΟΝΤΑΝ|ΟΝΤΟΥΣΑΝ|ΟΣ|ΟΣΑΣΤΑΝ|ΟΣΑΣΤΕ|ΟΣΟΥΝ|ΟΣΟΥΝΑ|ΟΤΑΝ|ΟΥ|ΟΥΜΑΙ|ΟΥΜΑΣΤΕ|ΟΥΝ|ΟΥΝΤΑΙ|ΟΥΝΤΑΝ|ΟΥΣ|ΟΥΣΑΝ|ΟΥΣΑΤΕ|Υ|ΥΣ|Ω|ΩΝ)$")

    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem + "ΜΑ"

    if pattern2.match(w) and test1:
        fp = pattern2.match(w).groups()
        stem = fp[0]
        w = stem

    # Step 7 (ΠΑΡΑΘΕΤΙΚΑ)
    pattern = re.compile(r"^(.+?)(ΕΣΤΕΡ|ΕΣΤΑΤ|ΟΤΕΡ|ΟΤΑΤ|ΥΤΕΡ|ΥΤΑΤ|ΩΤΕΡ|ΩΤΑΤ)$")
    if pattern.match(w):
        fp = pattern.match(w).groups()
        stem = fp[0]
        w = stem

    return w

In [8]:
class Sanitize:
    HYPERLINKS_REGEX = r'(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)|(www\.[^ \s]+)'
    HASHTAGS_REGEX = r'#\w+'
    MENTIONS_REGEX = r'@\w+'

    _stopwordsPath = "Data/Greek-Stopwords.txt"
    greekStopwords = Stopwords(_stopwordsPath)

    @staticmethod
    def _clean(text, breakSpecials):
        """
        Utility method that tokenizes a text with the option to either break special characters like "!!!" into "!", "!", "!" or keep them together.
        """
        def isAcceptable(character):
            return character.isalnum() or character == " "

        cleanedText = ""

        i = 0
        while i < len(text):
            character = text[i]
            if character.isalnum() or character == " ":
                cleanedText += character
                i += 1
            else:
                if not breakSpecials:
                    newToken = ""
                    j = i
                    while j < len(text) and not isAcceptable(text[j]):
                        newToken += text[j]
                        j += 1
                        if j < len(text) and text[j] != text[j - 1]:
                            break

                    cleanedText += f" {newToken} "
                    i = j
                else:
                    cleanedText += f" {text[i]} "
                    i += 1

        return " ".join(cleanedText.split())

    @staticmethod
    def customTokenize(text, breakSpecials):
        return Sanitize._clean(text, breakSpecials).split(), []


    @staticmethod
    def removeAccents(token):
        """
        Utility method that eliminates a token's accents.
        """
        return ''.join(
            character for character in unicodedata.normalize('NFD', token) if unicodedata.category(character) != 'Mn')



    @staticmethod
    def removeSpecialCharacters(token):
        """
        Utility method that eliminates a token's special characters.
        """
        refinedToken = ""
        for character in token:
            if character.isalnum():
                refinedToken += character
        return refinedToken

    @staticmethod
    def isNumeric(token):
        """
        Utility method that checks if a token is numeric. 
        A token is considered numeric if it consists only of numbers and its length is not equal to 4, accounting for cases like dates.
        """
        for character in token:
            if not character.isdigit():
                return False

        return True if len(token) != 4 else False

    @staticmethod
    def refineHashtag(hashtag):
        """
        Utility method to refine a hashtag by: 
        Capitalizing it.
        Eliminating special characters, as hashtags like #EKLOGES2019 and #EKLOGES_2019 signify the same topic.
        Removing accents.
        """
        refinedHashtag = hashtag.upper()
        refinedHashtag = Sanitize.removeAccents(refinedHashtag)
        refinedHashtag = Sanitize.removeSpecialCharacters(refinedHashtag)
        return f"#{refinedHashtag}"

In [9]:
class Model(nn.Module):

    
    def __init__(self, inputSize, seed=None):
        """
        Utility class that implements a neural network following a TensorFlow-like structure.
        """
        super(Model, self).__init__()
        if seed is not None:
            Model.enableDeterminism(seed)
        self.__inputSize = inputSize
        self.__created = False
        self.__model = None
        self.__layers = []
        self.__previousLayerOutput = -1
        self.__shouldEvaluate = False

    def build(self):
        self.__created = True
        self.__model = nn.Sequential(*self.__layers)

    def forward(self, x):
        if self.__created:
            return self.__model(x)
        return None

    def addDropout(self, p):
        if not self.__created:
            self.__layers.append(nn.Dropout(p))
            self.__shouldEvaluate = True

    def addDense(self, neurons):
        if not self.__created:
            if self.__previousLayerOutput == -1:
                self.__layers.append(nn.Linear(self.__inputSize, neurons))
            else:
                self.__layers.append(nn.Linear(self.__previousLayerOutput, neurons))
            self.__previousLayerOutput = neurons

    def addRelu(self):
        if not self.__created:
            self.__layers.append(nn.ReLU())

    def addLeakyRelu(self, alpha=1e-2):
        if not self.__created:
            self.__layers.append(nn.LeakyReLU(negative_slope=alpha))

    def addSoftplus(self, beta=1, threshold=20):
        if not self.__created:
            self.__layers.append(nn.Softplus(beta=beta, threshold=threshold))

    def addSigmoid(self):
        if not self.__created:
            self.__layers.append(nn.Sigmoid())

    def addBatchNormalization(self):
        if not self.__created:
            if self.__previousLayerOutput == -1:
                self.__layers.append(nn.BatchNorm1d(self.__inputSize))
            else:
                self.__layers.append(nn.BatchNorm1d(self.__previousLayerOutput))

    def shouldEvaluate(self):
        return self.__shouldEvaluate

    @staticmethod
    def saveModel(stateDict, path):
        torch.save(stateDict, f"{path}\\Model.pth")

    @staticmethod
    def enableDeterminism(seed):
        torch.manual_seed(seed)
        numpy.random.seed(seed)
        random.seed(seed)

In [10]:
class EarlyStopper:

    def __init__(self, model, metricName, patience, delta, maximize):
        """
        A utility class implementing the early stopping concept with parameters for patience and delta.
        This class stores a snapshot of the training session by saving the best epoch, the optimal value for the chosen metric
        and the labels and predicted labels for both datasets, as the model will be evaluated based on its best epoch.
        """
        self.__metricName = metricName.upper()
        self.__maximize = maximize
        self.__patience = patience
        self.__delta = delta
        self.__counter = 0
        self.__bestValue = float('-inf') if maximize else float('inf')
        self.__bestEpoch = 0
        self.__state = copy.deepcopy(model.state_dict())
        self.__shouldStop = False
        self.__trainingLabels = None
        self.__predictedTrainingLabels = None
        self.__validationLabels = None
        self.__predictedValidationLabels = None

    def update(self, value, epoch, model, trainingLabels, predictedTrainingLabels, validationLabels, predictedValidationLabels):
        if not self.__maximize:
            # If the current value is lower than the best value minus the delta parameter, store all the required information and reset the counter.
            if value < self.__bestValue - self.__delta:
                self.__bestValue = value
                self.__bestEpoch = epoch
                self.__state = copy.deepcopy(model.state_dict())
                self.__trainingLabels = copy.deepcopy(trainingLabels)
                self.__predictedTrainingLabels = copy.deepcopy(predictedTrainingLabels)
                self.__validationLabels = copy.deepcopy(validationLabels)
                self.__predictedValidationLabels = copy.deepcopy(predictedValidationLabels)
                self.__counter = 0
            # Otherwise, increment the counter and verify if we have exceeded the patience limit.
            else:
                self.__counter += 1
                if self.__counter >= self.__patience:
                    self.__shouldStop = True

        else:
            # If the current value is greater than the best value plus the delta parameter, store all the required information and reset the counter.
            if value > self.__bestValue + self.__delta:
                self.__bestValue = value
                self.__bestEpoch = epoch
                self.__state = copy.deepcopy(model.state_dict())
                self.__trainingLabels = copy.deepcopy(trainingLabels)
                self.__predictedTrainingLabels = copy.deepcopy(predictedTrainingLabels)
                self.__validationLabels = copy.deepcopy(validationLabels)
                self.__predictedValidationLabels = copy.deepcopy(predictedValidationLabels)
                self.__counter = 0
            # Otherwise, increment the counter and verify if we have exceeded the patience limit.
            else:
                self.__counter += 1
                if self.__counter >= self.__patience:
                    self.__shouldStop = True

    def shouldStop(self):
        return self.__shouldStop

    def __str__(self):
        return (f"EarlyStopper:\n"
                f"Metric: {self.__metricName}\n"
                f"Maximize: {self.__maximize}\n"
                f"Value: {self.__bestValue}\n"
                f"Epoch: {self.__bestEpoch}\n"
                f"Patience: {self.__patience}\n"
                f"Delta: {self.__delta}")

    def getMetric(self):
        return self.__metricName

    def getEpoch(self):
        return self.__bestEpoch

    def getState(self):
        return self.__state

    def getTrainingLabels(self):
        return self.__trainingLabels, self.__predictedTrainingLabels

    def getValidationLabels(self):
        return self.__validationLabels, self.__predictedValidationLabels

In [11]:
class CustomScheduler:
    def __init__(self, scheduler, epoch=None):
        """
        Utility class implementing a custom scheduler that triggers the decay of the learning rate only after a specified number of epochs.
        """
        self.__scheduler = scheduler
        self.__epoch = epoch

    def step(self, epoch):
        if self.__epoch is not None and epoch >= self.__epoch:
            self.__scheduler.step()
        else:
            self.__scheduler.step()
            
    def __str__(self):
        return (f"Custom Scheduler\n"
                f"Epoch: {self.__epoch}\n"
                f"Scheduler:\n"
                f"gamma: {self.__scheduler.gamma}\n"
                f"step: {self.__scheduler.step_size}")

In [12]:
def sanitizeDataset(path, isTest):
    """
    A utility function designed to preprocess the given dataset.
    """
    dataframe = pandas.read_csv(path, encoding="utf-8")
    tweets = []
    
    hashtagPlaceholder = "TWEETHASHTAGPLACEHOLDER"
    linkPlaceholder = "TWEETLINKPLACEHOLDER"
    mentionPlaceholder = "TWEETMENTIONPLACEHOLDER"
    designatedMentions = {"@ATSIPRAS", "@YANISVAROUFAKIS", "@KMITSOTAKIS", "@VELOPKY", "@ELLINIKILISI",
                          "@ADONISGEORGIADI", "@SKAIGR", "@NEADEMOKRATIA", "@AVGIONLINE", "@ΝΕΑ"}


    for _, row in dataframe.iterrows():

        tweet = Tweet()

        tweet.setID(row['New_ID'])
        tweet.setParty(row['Party'])

        if not isTest:
            tweet.setSentiment(Tweet.classToNumber[row['Sentiment']])

    
        text = row['Text']
        

        # Identify and extract all links from the tweet.
        # Store all the links from the tweet.
        # Substitute all the identified links in the tweet with a placeholder for links. 
        # This approach was implemented to retain the links' positions during the training of the word embeddings model, acknowledging the importance of context.
        links = re.findall(Sanitize.HYPERLINKS_REGEX, text)
        links = ["".join(group) for group in links]
        tweet.addLinks(links)
        replacement = f" {linkPlaceholder} "
        for link in links:
            text = text.replace(link, replacement)
        
        # Identify and extract all mentions from the tweet.
        # Store all the mentions from the tweet.
        # Substitute all the identified mentions in the tweet with a placeholder for mentions. 
        # This approach was implemented to retain the mentions' positions during the training of the word embeddings model, acknowledging the importance of context.
        mentions = re.findall(Sanitize.MENTIONS_REGEX, text)
        tweet.addMentions(mentions)
        replacement = f" {mentionPlaceholder} "
        for mention in mentions:
            text = text.replace(mention, replacement)
            
        # Identify and extract all hashtags from the tweet.
        # Store all the hashtags from the tweet.
        # Substitute all the identified hashtags in the tweet with a placeholder for hashtags. 
        # This approach was implemented to retain the hashtags' positions during the training of the word embeddings model, acknowledging the importance of context.
        hashtags = re.findall(Sanitize.HASHTAGS_REGEX, text)
        tweet.addHashtags(hashtags)
        replacement = f" {hashtagPlaceholder} "
        for hashtag in hashtags:
            text = text.replace(hashtag, replacement)
        
        # Transform the text of the tweet to uppercase and replace Ν.Δ. with ΝΔ.
        text = text.upper()
        text = text.replace("Ν.Δ.", " ΝΔ ")

        tweetTokens, lemmatizedTokens = Sanitize.customTokenize(text, breakSpecials=True)
        refinedTokens = []
        hashtagIndex = 0
        mentionIndex = 0
        
        for token in tweetTokens:
            
            # If the corresponding token is a hashtag, refine and append it to the list of tokens.
            if token == hashtagPlaceholder:
                refinedToken = hashtags[hashtagIndex]
                refinedToken = Sanitize.refineHashtag(refinedToken)
                refinedTokens.append(refinedToken)
                hashtagIndex += 1
                continue
            
            # If the corresponding token is a link ignore it.
            if token == linkPlaceholder:
                continue
            
            # If the corresponding token is a mention, convert it to uppercase and 
            # include it in the list of tokens only if it belongs to the specified set of designated mentions.
            if token == mentionPlaceholder:
                refinedToken = mentions[mentionIndex].upper()
                mentionIndex += 1
                if refinedToken in designatedMentions:
                    refinedTokens.append(refinedToken)
                continue
            
            # Eliminate special characters.
            if len(Sanitize.removeSpecialCharacters(token)) == 0:
                continue
                
            # Eliminate numeric characters.
            if Sanitize.isNumeric(token):
                continue

            # Eliminate accents.
            refinedToken = Sanitize.removeAccents(token)
            
            # If the corresponding token is a stopword ignore it.
            if refinedToken in Sanitize.greekStopwords:
                continue
            
            # Stem the corresponding token.
            refinedToken = stemWord(refinedToken)

            if len(refinedToken) > 0:
                refinedTokens.append(refinedToken)

        tweet.setText(" ".join(refinedTokens))

        tweets.append(tweet)

    return tweets

In [13]:
class TweetDataset(Dataset):

    embeddingsPath = "Data/Vectors.txt"
    embeddings = Embeddings(embeddingsPath)
    encounteredParties = dict()
    partyIndex = 1.0
    scaler = None
    dimensions = embeddings.dimensions() + 2 

    def __init__(self, tweets, isTraining, aggregator, scaler=None):
        self.__instances = None
        self.__labels = None
        self.__ids = None
        if isTraining:
            TweetDataset.scaler = scaler
        
        self.__transform(tweets, isTraining, aggregator)
        
    def __len__(self):
        return len(self.__labels)

    def __getitem__(self, index):
        return self.__instances[index], self.__labels[index]
    
    def getInstances(self):
        return self.__instances
    
    def getIDS(self):
        return self.__ids
    
    def __transform(self, tweets, isTraining, aggregator):
        ids = []
        labels = []
        instances = []
        
        # If referring to the training set and the parties have not yet been recorded.
        if isTraining and len(TweetDataset.encounteredParties.keys()) == 0:
            
            for tweet in tweets:
                
                # Assign a distinct numerical identifier to each party.
                if tweet.getParty() not in TweetDataset.encounteredParties.keys():
                    TweetDataset.encounteredParties[tweet.getParty()] = TweetDataset.partyIndex
                    TweetDataset.partyIndex += 1.0
            
            # Normalize the value of each party by dividing it by the highest party value.
            for key in TweetDataset.encounteredParties.keys():
                TweetDataset.encounteredParties[key] /= (TweetDataset.partyIndex - 1.0)

        for tweet in tweets:

            ids.append(tweet.getID())
            labels.append(tweet.getSentiment())

            tokens = tweet.getText().split()
            
            # Aggregate the embeddings/tokens of the respective tweet.
            instance = aggregator(tokens)
            
            # Incorporate the party information of the associated tweet as a feature column.
            if tweet.getParty() in TweetDataset.encounteredParties.keys():
                party = TweetDataset.encounteredParties[tweet.getParty()]
                instance = torch.cat((instance, torch.tensor([party], dtype=torch.float32)))
            
            # Exclude the party of the tweet if it was not encountered in the training set.
            else:
                instance = torch.cat((instance, torch.tensor([0.0], dtype=torch.float32)))
            
            # Incorporate the has or has not links information of the associated tweet as a feature column.
            links = 1.0 if len(tweet.getLinks()) > 0 else 0.0
            instance = torch.cat((instance, torch.tensor([links], dtype=torch.float32)))

            instances.append(instance)

        instances = torch.stack(instances)

        if TweetDataset.scaler is not None:
            # If referring to the training set and the scaler has not yet been fitted.
            if isTraining:
                TweetDataset.scaler.fit(instances.numpy())
            
            # Apply scaling to the instances.
            instances = TweetDataset.scaler.transform(instances.numpy())

        self.__instances = torch.tensor(instances, dtype=torch.float32)
        if None not in labels:
            self.__labels = torch.LongTensor(labels)
        self.__ids = ids

    @staticmethod
    def average(tokens):
        """
        Utility function to aggregate a tweet's embeddings by averaging the embeddings of its tokens.
        """
        
        # If no embeddings are found for a tweet's tokens or if the tweet is empty, a list of zeros will be returned.
        instance = torch.zeros((TweetDataset.embeddings.dimensions(),), dtype=torch.float32)

        words = 0.0
        for token in tokens:
            if token in TweetDataset.embeddings:
                # Increment the counter for each token in the tweet for which an embedding exists.
                words += 1.0
                instance = instance + torch.tensor(TweetDataset.embeddings[token], dtype=torch.float32)

        if words != 0.0:
            instance = instance / words

        return instance

    @staticmethod
    def sum(tokens):
        """
        Utility function to aggregate a tweet's embeddings by summing the embeddings of its tokens.
        """
        
        instance = torch.zeros((TweetDataset.embeddings.dimensions(),), dtype=torch.float32)

        for token in tokens:
            instance = instance + torch.tensor(TweetDataset.embeddings[token], dtype=torch.float32)

        return instance
    
    @staticmethod
    def max(tokens):
        """
        Utility function to aggregate a tweet's embeddings by choosing the maximum value across each dimension.
        """
        
        tweetEmbeddings = []
        
        for token in tokens:
            tweetEmbeddings.append(TweetDataset.embeddings[token])
        
        # Choose the maximum value across each dimension
        if len(tweetEmbeddings) > 0:
            tweetEmbeddings = numpy.max(tweetEmbeddings, axis=0)
        
        # If no embeddings are found for a tweet's tokens or if the tweet is empty, return a list of zeros.
        else:
            tweetEmbeddings = numpy.zeros((TweetDataset.embeddings.dimensions(),), dtype=numpy.float32)
        
        return torch.tensor(tweetEmbeddings, dtype=torch.float32)

In [14]:
trainingSetPath = "Data/train_set.csv"
validationSetPath = "Data/valid_set.csv"
testingSetPath = "Data/test_set.csv"

trainingTweets = sanitizeDataset(path=trainingSetPath, isTest=False)

validationTweets = sanitizeDataset(path=validationSetPath, isTest=False)

testingTweets = sanitizeDataset(path=testingSetPath, isTest=True)

In [16]:
seed = 69
torch.manual_seed(seed)
numpy.random.seed(seed)
random.seed(seed)

aggregators = [TweetDataset.sum, TweetDataset.average, TweetDataset.max]
scalers = [None, MinMaxScaler(feature_range=(-1, 1)), StandardScaler()]

aggregator = aggregators[0]
scaler = scalers[1]

trainingDataset = TweetDataset(tweets=trainingTweets,
                               isTraining=True,
                               scaler=scaler,
                               aggregator=aggregator)

validationDataset = TweetDataset(tweets=validationTweets,
                                 isTraining=False,
                                 scaler=scaler,
                                 aggregator=aggregator)

testingDataset = TweetDataset(tweets=testingTweets,
                              isTraining=False,
                              scaler=scaler,
                              aggregator=aggregator)


batchSize = 64
trainDataloader = DataLoader(trainingDataset, batch_size=batchSize, shuffle=True)
validationDataloader = DataLoader(validationDataset, batch_size=batchSize, shuffle=False)

In [17]:
def evaluateModel(predictions, labels, average="weighted"):
    """
    Utility function to evaluate a model's F1, recall, precision and accuracy scores.
    """
    # Convert predictions and labels to NumPy arrays.
    predictions = predictions.numpy()
    labels = labels.numpy()
    
    # Calculate F1, recall, precision, and accuracy.
    f1 = f1_score(labels, predictions, average=average)
    recall = recall_score(labels, predictions, average=average)
    precision = precision_score(labels, predictions, average=average)
    accuracy = numpy.sum(predictions == labels) / len(labels)

    return f1, recall, precision, accuracy


def plotLearningCurve(xAxis, toPlot, xAxisName, yAxisName, logger):
    """
    Utility function to plot the learning curve for training and validation metrics over epochs.
    """
    pyplot.figure(figsize=(10, 5))
    
    # Plot each array in 'toPlot' with a corresponding label.
    for array, label in toPlot:
        pyplot.plot(xAxis, array, label=label)

    # Set axis labels, legend, and display/store the plot.
    pyplot.xlabel(xAxisName)
    pyplot.ylabel(yAxisName)
    pyplot.legend()
    pyplot.grid(True)
    
    # Store the plot if a logger is provided.
    if logger is not None:
        pyplot.savefig(f"{logger.directory()}\\{yAxisName}.png")
        
    pyplot.show()


def plotConfusionMatrix(labels, predictedLabels, logger, title):
    """
    Utility function to plot and store the confusion matrix based on true and predicted labels.
    """    
    # Calculate and display the confusion matrix.
    sentimentLabels = ["POSITIVE", "NEGATIVE", "NEUTRAL"]
    confusionMatrix = confusion_matrix(labels, predictedLabels, labels=[0, 1, 2])
    confusionMatrixDisplay = ConfusionMatrixDisplay(confusion_matrix=confusionMatrix, display_labels=sentimentLabels)
    confusionMatrixDisplay.plot(colorbar=False)
    
    # Store the plot if a logger is provided.
    if logger is not None:
        pyplot.savefig(f"{logger.directory()}\\{title}.png")
        
    pyplot.show()


def classificationReport(labels, predictedLabels, logger, title):
    """
    Utility function to generate and log a classification report based on true and predicted labels.
    """
    sentimentLabels = ["POSITIVE", "NEGATIVE", "NEUTRAL"]
    
    # Generate and log the classification report.
    report = classification_report(labels, predictedLabels, target_names=sentimentLabels, labels=[0, 1, 2])
    
    # Store the classification report if a logger is provided.
    if logger is not None:
        logger.log(report, title=title)
        
    print(title)
    print(report)


def train(model, criterion, optimizer, scheduler, epochs, earlyStop, logger):
    

    trainingLoss = []
    validationLoss = []
    trainingMetrics = []
    validationMetrics = []

    trainingLabels = None
    predictedTrainingLabels = None
    validationLabels = None
    predictedValidationLabels = None
    
    if logger is not None:
        logger.log(f"aggregator: {aggregator.__name__}")
        logger.log(f"epochs: {epochs}")
        logger.log(f"batchSize: {batchSize}")
        logger.log(f"model:\n{model}\n")
        logger.log(f"optimizer:\n{str(optimizer)}\n")
        logger.log(f"scaler:\n{str(scaler)}\n")
        logger.log(f"scheduler:\n{str(scheduler)}\n\n")
    
    print(f"aggregator: {aggregator.__name__}")
    print(f"epochs: {epochs}")
    print(f"batchSize: {batchSize}")
    print(f"model:\n{model}\n")
    print(f"optimizer:\n{str(optimizer)}\n")
    print(f"scaler:\n{str(scaler)}\n")
    print(f"scheduler:\n{str(scheduler)}\n\n")
    
    
    for epoch in range(epochs):
        
        # Switch the model to training mode.
        model.train()
        epochLoss = 0.0
        
        # Tensors for holding labels and predicted labels of the training set, considering the possibility of randomization due to the DataLoader shuffle 
        # argument. These labels will be utilized later to assess the model's performance on the training set.
        trainingLabels = torch.LongTensor([])
        predictedTrainingLabels = torch.LongTensor([])

        for instances, labels in trainDataloader:
            
            # Zero the gradients.
            optimizer.zero_grad()

            trainingLabels = torch.cat((trainingLabels, labels))
            
            # Forward pass.
            logits = model(instances)
            probabilities = nn.functional.softmax(logits, dim=1)
            predictedLabels = torch.argmax(probabilities, dim=1)
            predictedTrainingLabels = torch.cat((predictedTrainingLabels, predictedLabels))
            
            # Loss calculation.
            loss = criterion(logits, labels)
            epochLoss += loss.item()
            
            # Backward pass and optimization.
            loss.backward()
            optimizer.step()
        
        # # The cumulative epoch loss should be divided by the total number of batches in the training set, as each batch contributes its own individual loss. 
        epochLoss /= len(trainDataloader)
        trainingLoss.append(epochLoss)
        
        # Assess and store the training metrics.
        metrics = evaluateModel(predictions=predictedTrainingLabels, labels=trainingLabels)
        trainingMetrics.append(metrics)
    
        # Switch the model to evaluation mode.
        model.eval()
        epochLoss = 0.0
        
        # Tensors for holding labels and predicted labels of the training set, considering the possibility of randomization due to the DataLoader shuffle 
        # argument. These labels will be utilized later to assess the model's performance on the validation set.
        validationLabels = torch.LongTensor([])
        predictedValidationLabels = torch.LongTensor([])
        
        # Turn off gradient computation since we are in inference/evaluation mode.
        with torch.no_grad():
            for instances, labels in validationDataloader:
                validationLabels = torch.cat((validationLabels, labels))
                
                # Forward pass.
                logits = model(instances)
                probabilities = nn.functional.softmax(logits, dim=1)
                predictedLabels = torch.argmax(probabilities, dim=1)
                predictedValidationLabels = torch.cat((predictedValidationLabels, predictedLabels))
                
                # Loss calculation.
                epochLoss += criterion(logits, labels).item()
        
        # The cumulative epoch loss should be divided by the total number of batches in the validation set, as each batch contributes its own individual loss.  
        epochLoss /= len(validationDataloader)
        validationLoss.append(epochLoss)
        
        # Evaluate and store validation metrics
        metrics = evaluateModel(predictions=predictedValidationLabels, labels=validationLabels)
        validationMetrics.append(metrics)

        if scheduler is not None:
            scheduler.step(epoch)
        
        if logger is not None:
            logger.log(f'Epoch {epoch + 1}/{epochs}\n'
                       f'Training Loss: {trainingLoss[-1]:.6f}, Validation Loss: {validationLoss[-1]:.6f}\n'
                       f'Training F1: {trainingMetrics[-1][0]:.6f}, Validation F1: {validationMetrics[-1][0]:.6f}\n')
            logger.line()

        print(f'Epoch {epoch + 1}/{epochs}\n'
              f'Training Loss: {trainingLoss[-1]:.6f}, Validation Loss: {validationLoss[-1]:.6f}\n'
              f'Training F1: {trainingMetrics[-1][0]:.6f}, Validation F1: {validationMetrics[-1][0]:.6f}\n'
              f'Training Recall: {trainingMetrics[-1][1]:.6f}, Validation Recall: {validationMetrics[-1][1]:.6f}\n'
              f'Training Precision: {trainingMetrics[-1][2]:.6f}, Validation Precision: {validationMetrics[-1][2]:.6f}\n'
              f'Training Accuracy: {trainingMetrics[-1][3]:.6f}, Validation Accuracy: {validationMetrics[-1][3]:.6f}\n')

        if earlyStop is not None:
            if earlyStop.getMetric() == "F1":
                metric = validationMetrics[-1][0]
            elif earlyStop.getMetric() == "RECALL":
                metric = validationMetrics[-1][1]
            elif earlyStop.getMetric() == "PRECISION":
                metric = validationMetrics[-1][2]
            elif earlyStop.getMetric() == "ACCURACY":
                metric = validationMetrics[-1][2]
            else:
                metric = validationLoss[-1]
            
            # Update the early stopper.
            earlyStop.update(metric, epoch + 1, model, trainingLabels, predictedTrainingLabels, validationLabels, predictedValidationLabels)

            if earlyStop.shouldStop():
                print(f"earlyStop:\n{earlyStop}")
                if logger is not None:
                    logger.log(f"earlyStop:\n{earlyStop}")
                break
    
    # If an evaluation on the training set is intended after the training session, such as for a Dropout layer.
    if model.shouldEvaluate():
        
        # Switch the model to evaluation mode.
        model.eval()
        
        evaluationLoss = 0.0
        
        # Tensors for holding labels and predicted labels of the training set, considering the possibility of randomization due to the DataLoader shuffle 
        # argument. These labels will be utilized later to assess the model's performance on the training set.
        evaluationLabels = torch.LongTensor([])
        predictedEvaluationLabels = torch.LongTensor([])
        
        # Turn off gradient computation since we are in inference/evaluation mode.
        with torch.no_grad():
            for instances, labels in trainDataloader:
                evaluationLabels = torch.cat((evaluationLabels, labels))
                
                # Forward pass
                logits = model(instances)
                probabilities = nn.functional.softmax(logits, dim=1)
                predictedLabels = torch.argmax(probabilities, dim=1)
                predictedEvaluationLabels = torch.cat((predictedEvaluationLabels, predictedLabels))

                # Loss calculation
                evaluationLoss += criterion(logits, labels).item()
        
        # The cumulative epoch loss should be divided by the total number of batches in the training set, as each batch contributes its own individual loss.
        evaluationLoss /= len(trainDataloader)
        
        # Assess and store the validation metrics.
        evaluationMetrics = evaluateModel(predictions=predictedEvaluationLabels, labels=evaluationLabels)
    
    
    # If early stopping is applied
    if earlyStop is not None:
        if logger is not None:
            Model.saveModel(earlyStop.getState(), logger.directory())
            
        # The optimal epoch is the one determined by the early stopper.
        epochs = earlyStop.getEpoch()
        
        # Retrieve the labels and predicted labels for both datasets at the optimal epoch determined by the early stopper.
        trainingLabels, predictedTrainingLabels = earlyStop.getTrainingLabels()
        validationLabels, predictedValidationLabels = earlyStop.getValidationLabels()
        
    else:
        if logger is not None:
            Model.saveModel(model.state_dict(), logger.directory())

    epochsAxis = [i + 1 for i in range(epochs)]

    
    # Obtain the metrics until reaching the optimal epoch, whether it is determined by the early stopper or the final regular training epoch.
    trainingLoss = trainingLoss[:epochs]
    validationLoss = validationLoss[:epochs]
    trainingMetrics = trainingMetrics[:epochs]
    validationMetrics = validationMetrics[:epochs]

    plotLearningCurve(epochsAxis, [(validationLoss, 'Validation'), (trainingLoss, 'Training')], 'Epoch', 'Loss', logger)

    METRICS = ["F1", "RECALL", "PRECISION", "ACCURACY"]
    for index, metric in enumerate(METRICS):
        tMetric = torch.tensor([m[index] for m in trainingMetrics], dtype=torch.float32)
        vMetric = torch.tensor([m[index] for m in validationMetrics], dtype=torch.float32)
        plotLearningCurve(epochsAxis, [(vMetric, 'Validation'), (tMetric, 'Training')], 'Epoch', metric, logger)

    # Compute, diplay, and store the confusion matrix and the classification report associated with the training set.
    if not model.shouldEvaluate():
        plotConfusionMatrix(trainingLabels.numpy(), predictedTrainingLabels.numpy(), logger, "Training")
        classificationReport(trainingLabels.numpy(), predictedTrainingLabels.numpy(), logger, "Training")
    
    # Compute, diplay, and store the confusion matrix and the classification report associated with the training set in case of an AFTER TRAINING EVALUATION.
    else:
        plotConfusionMatrix(evaluationLabels.numpy(), predictedEvaluationLabels.numpy(), logger, "Training")
        classificationReport(evaluationLabels.numpy(), predictedEvaluationLabels.numpy(), logger, "Training")
    
    # Compute, diplay, and store the confusion matrix and the classification report associated with the valiation set.
    plotConfusionMatrix(validationLabels.numpy(), predictedValidationLabels.numpy(), logger, "Validation")
    classificationReport(validationLabels.numpy(), predictedValidationLabels.numpy(), logger, "Validation")
    
    # If an additional evaluation on the training set is needed.
    if model.shouldEvaluate():
        if logger is not None:
            logger.log(f"Training:\n"
                       f"Loss: {evaluationLoss:.6f} "
                       f"F1: {evaluationMetrics[0]:.6f} "
                       f"Recall: {evaluationMetrics[1]:.6f} "
                       f"Precision: {evaluationMetrics[2]:.6f} "
                       f"Accuracy: {evaluationMetrics[3]:.6f}")

        print(f"Training:\n"
              f"Loss: {evaluationLoss:.6f} "
              f"F1: {evaluationMetrics[0]:.6f} "
              f"Recall: {evaluationMetrics[1]:.6f} "
              f"Precision: {evaluationMetrics[2]:.6f} "
              f"Accuracy: {evaluationMetrics[3]:.6f}")
    
    
    else:
        if logger is not None:
            logger.log(f"Training:\n"
                       f"Loss: {trainingLoss[-1]:.6f} "
                       f"F1: {trainingMetrics[-1][0]:.6f} "
                       f"Recall: {trainingMetrics[-1][1]:.6f} "
                       f"Precision: {trainingMetrics[-1][2]:.6f} "
                       f"Accuracy: {trainingMetrics[-1][3]:.6f}")

        print(f"Training:\n"
              f"Loss: {trainingLoss[-1]:.6f} "
              f"F1: {trainingMetrics[-1][0]:.6f} "
              f"Recall: {trainingMetrics[-1][1]:.6f} "
              f"Precision: {trainingMetrics[-1][2]:.6f} "
              f"Accuracy: {trainingMetrics[-1][3]:.6f}")
    
    if logger is not None:
        logger.log(f"Validation:\n"
                   f"Loss: {validationLoss[-1]:.6f} "
                   f"F1: {validationMetrics[-1][0]:.6f} "
                   f"Recall: {validationMetrics[-1][1]:.6f} "
                   f"Precision: {validationMetrics[-1][2]:.6f} "
                   f"Accuracy: {validationMetrics[-1][3]:.6f}")

    print(f"Validation:\n"
          f"Loss: {validationLoss[-1]:.6f} "
          f"F1: {validationMetrics[-1][0]:.6f} "
          f"Recall: {validationMetrics[-1][1]:.6f} "
          f"Precision: {validationMetrics[-1][2]:.6f} "
          f"Accuracy: {validationMetrics[-1][3]:.6f}")
    
    
    
    # Generate predictions on the testing set and store them according to the provided instructions.
    testingIDS = testingDataset.getIDS()
    testingInstances = testingDataset.getInstances()
    
    # Switch the model to evaluation mode.
    model.eval()
    
    # Turn off gradient computation since we are in inference/evaluation mode.
    with torch.no_grad():
        # Forward pass.
        logits = model(testingInstances)
        probabilities = nn.functional.softmax(logits, dim=1)
        testingLabels = torch.argmax(probabilities, dim=1)
    
    testingLabels = list(testingLabels.numpy())
    for i in range(len(testingLabels)):
        testingLabels[i] = Tweet.numberToClass[testingLabels[i]]
        
    values = list(zip(testingIDS, testingLabels))
    dataframe = pandas.DataFrame(values, columns=['Id', 'Predicted'])
    dataframe.to_csv('submission.csv', index=False)

In [None]:
inputSize = TweetDataset.dimensions

model = Model(inputSize=inputSize, seed=seed)
model.addDense(neurons=64)
model.addRelu()
model.addDense(neurons=64)
model.addRelu()
model.addDense(neurons=32)
model.addRelu()

# Output just 3 raw/unnormalized logits, one for each class as 
# Pytorch's CrossEntropyLoss applies the softmax operation by default when provided with the class indices as the target.
# https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html
model.addDense(neurons=3)
model.build()

learningRate = 0.01
optimizers = [torch.optim.ASGD(model.parameters(), lr=learningRate),
              torch.optim.Adam(model.parameters(), lr=learningRate),
              torch.optim.SGD(model.parameters(), lr=learningRate),
              torch.optim.Adagrad(model.parameters(), lr=learningRate)]

optimizer = optimizers[0]
customScheduler = CustomScheduler(torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.85))
criterion = torch.nn.CrossEntropyLoss(reduction='mean')
epochs = 92
earlyStop = None

logger = None

train(model=model,
      criterion=criterion,
      optimizer=optimizer,
      scheduler=customScheduler,
      epochs=epochs,
      earlyStop=earlyStop,
      logger=logger)