In [None]:
# Importing the needed packages
import torch 
from torch import nn
import pandas as pd 
import re 
import numpy as np
from typing import Tuple

# Spliting the data into train and test
from sklearn.model_selection import train_test_split

In [None]:
# Reading the data 
d = pd.read_csv('input/Tweets.csv', header=None)

# Adding the columns 
d.columns = ['INDEX', 'GAME', "SENTIMENT", 'TEXT']

# Leaving only the positive and the negative sentiments 
d = d[d['SENTIMENT'].isin(['Positive', 'Negative'])]

# Encoding the sentiments that the negative will be 1 and the positive 0
d['SENTIMENT'] = d['SENTIMENT'].apply(lambda x: 0 if x == 'Positive' else 1)

# Dropping missing values
d = d.dropna()

In [None]:
d.sample(10)[['SENTIMENT', 'TEXT']]

In [None]:
# Spliting to train test 
train, test = train_test_split(d, test_size=0.2, random_state=42)

# Reseting the indexes 
train.reset_index(drop=True, inplace=True)
test.reset_index(drop=True, inplace=True)

print(f'Train shape: {train.shape}')
print(f'Test shape: {test.shape}')

In [None]:
def create_word_index(x: str, shift_for_padding: bool = False, char_level: bool = False) -> Tuple[dict, dict]: 
    """
    Function that scans a given text and creates two dictionaries:
    - word2idx: dictionary mapping words to integers
    - idx2word: dictionary mapping integers to words

    Args:
        x (str): text to scan
        shift_for_padding (bool, optional): If True, the function will add 1 to all the indexes.
            This is done to reserve the 0 index for padding. Defaults to False.
        char_level (bool, optional): If True, the function will create a character level dictionary.
        
    Returns:
        Tuple[dict, dict]: word2idx and idx2word dictionaries
    """
    # Ensuring that the text is a string
    if not isinstance(x, str):
        try: 
            x = str(x)
        except:
            raise Exception('The text must be a string or a string convertible object')
        
    # Spliting the text into words
    words = []
    if char_level:
        # The list() function of a string will return a list of characters
        words = list(x)
    else:
        # Spliting the text into words by spaces
        words = x.split(' ')

    # Creating the word2idx dictionary 
    word2idx = {}
    for word in words: 
        if word not in word2idx: 
            # The len(word2idx) will always ensure that the 
            # new index is 1 + the length of the dictionary so far
            word2idx[word] = len(word2idx)

    # Adding the <UNK> token to the dictionary; This token will be used 
    # on new texts that were not seen during training.
    # It will have the last index. 
    word2idx['<UNK>'] = len(word2idx)

    if shift_for_padding:
        # Adding 1 to all the indexes; 
        # The 0 index will be reserved for padding
        word2idx = {k: v + 1 for k, v in word2idx.items()}

    # Reversing the above dictionary and creating the idx2word dictionary
    idx2word = {idx: word for word, idx in word2idx.items()}

    # Returns the dictionaries
    return word2idx, idx2word

In [None]:
# Joining all the texts into one string
text = ' '.join(train['TEXT'].values)

# Creating the word2idx and idx2word dictionaries
word2idx, idx2word = create_word_index(text, shift_for_padding=True, char_level=True)

# Printing the size of the vocabulary
print(f'The size of the vocabulary is: {len(word2idx)}')

In [None]:
# Printing the top 10 words
print(f'The top 10 words are: {list(word2idx.keys())[:10]}')

In [None]:
# For each row in the train and test set, we will create a list of integers
# that will represent the words in the text
train['text_int'] = train['TEXT'].apply(lambda x: [word2idx.get(word, word2idx['<UNK>']) for word in list(x)])
test['text_int'] = test['TEXT'].apply(lambda x: [word2idx.get(word, word2idx['<UNK>']) for word in list(x)])

# Calculating the length of sequences in the train set 
train['seq_len'] = train['text_int'].apply(lambda x: len(x))

# Describing the length of the sequences
train['seq_len'].describe()

In [None]:
train

In [None]:
def pad_sequences(x: list, pad_length: int) -> list:
    """
    Function that pads a given list of integers to a given length

    Args:
        x (list): list of integers to pad
        pad_length (int): length to pad

    Returns:
        list: padded list of integers
    """
    # Getting the length of the list
    len_x = len(x)

    # Checking if the length of the list is less than the pad_length
    if len_x < pad_length: 
        # Padding the list with 0s
        x = x + [0] * (pad_length - len_x)
    else: 
        # Truncating the list to the desired length
        x = x[:pad_length]

    # Returning the padded list
    return x

In [None]:
# Padding the train and test sequences 
train['text_int'] = train['text_int'].apply(lambda x: pad_sequences(x, 200))
test['text_int'] = test['text_int'].apply(lambda x: pad_sequences(x, 200))

In [None]:
train

# Defining the activation functions 

In [None]:
def sigmoid(x: float) -> float: 
    """
    Function that calculates the sigmoid of a given value

    Args:
        x (float): value to calculate the sigmoid

    Returns:
        float: sigmoid of the given value in (0, 1)
    """
    return 1 / (1 + np.exp(-x))

def tanh(x: float) -> float: 
    """
    Function that calculates the tanh of a given value

    Args:
        x (float): value to calculate the tanh

    Returns:
        float: tanh of the given value in (-1, 1)
    """
    return (np.exp(x) - np.exp(-x)) / (np.exp(x) + np.exp(-x))

# Defining the forget gate

In [None]:
class ForgetGate: 
    """
    Class that implements the forget gate of an LSTM cell
    """
    def __init__(
            self, 
            w1: float = np.random.normal(), 
            w2: float = np.random.normal(),
            b1: float = np.random.normal(),
            long_term_memory: float = np.random.normal(), 
            short_term_memory: float = np.random.normal(), 
            ):
        """
        Constructor of the class

        Args:
            long_term_memory (float): long term memory
            short_term_memory (float): short term memory
            w1 (float): weight 1
            w2 (float): weight 2
            b1 (float): bias term 1
        """
        # Saving the input
        self.long_term_memory = long_term_memory
        self.short_term_memory = short_term_memory
        self.w1 = w1
        self.w2 = w2
        self.b1 = b1

    def forward(self, x: float) -> float: 
        """
        Function that calculates the output of the forget gate

        Args:
            x (float): input to the forget gate

        Returns:
            float: output of the forget gate
        """
        # Calculates the percentage of the long term memory that will be kept
        percentage_to_keep = sigmoid((self.w1 * x  + self.w2 * self.short_term_memory) + self.b1)

        # Updating the long term memory
        self.long_term_memory = self.long_term_memory * percentage_to_keep

        # The output of the forget gate is the new long term memory and the short term memory
        return self.long_term_memory, self.short_term_memory

In [None]:
# Initiating 
forget_gate = ForgetGate()

print(f'Initial long term memory: {forget_gate.long_term_memory}')
print(f'Initial short term memory: {forget_gate.short_term_memory}')

# Calculating the output of the forget gate
lt, st = forget_gate.forward(0.5)

print(f'Long term memory: {lt}')
print(f'Short term memory: {st}')

# Defining the input gate

In [None]:
class InputGate:
    def __init__(
            self, 
            w3: float = np.random.normal(), 
            w4: float = np.random.normal(),
            w5: float = np.random.normal(),
            w6: float = np.random.normal(),
            b2: float = np.random.normal(),
            b3: float = np.random.normal(),
            long_term_memory: float = np.random.normal(), 
            short_term_memory: float = np.random.normal(), 
            ):
        """
        Constructor of the class

        Args:
            long_term_memory (float): long term memory
            short_term_memory (float): short term memory
            w3 (float): weight 3
            w4 (float): weight 4
            w5 (float): weight 5
            w6 (float): weight 6
            b2 (float): bias 2
            b3 (float): bias 3
        """
        # Saving the input
        self.long_term_memory = long_term_memory
        self.short_term_memory = short_term_memory
        self.w3 = w3
        self.w4 = w4
        self.w5 = w5
        self.w6 = w6
        self.b2 = b2
        self.b3 = b3

    def forward(self, x: float) -> float:
        """
        Function that calculates the output of the input gate

        Args:
            x (float): input to the input gate

        Returns:
            float: output of the input gate
        """
        # Calculating the memory signal 
        memory_signal = tanh((self.w3 * x + self.w4 * self.short_term_memory) + self.b2)

        # Calculating the percentage of memory to keep 
        percentage_to_keep = sigmoid((self.w5 * x + self.w6 * self.short_term_memory) + self.b3) 

        # Multiplying the memory signal by the percentage to keep
        memory_signal = memory_signal * percentage_to_keep

        # Updating the long term memory
        self.long_term_memory = self.long_term_memory + memory_signal

        # The output of the input gate is the new long term memory and the short term memory
        return self.long_term_memory, self.short_term_memory

In [None]:
# Creating the input gate object 
input_gate = InputGate(long_term_memory=lt, short_term_memory=st)

# Forward propagating 
lt, st = input_gate.forward(0.5)

print(f'Long term memory: {lt}')
print(f'Short term memory: {st}')

# Defining the output gate 

In [None]:
class OutputGate:
    def __init__(
            self, 
            w7: float = np.random.normal(), 
            w8: float = np.random.normal(),
            b4: float = np.random.normal(),
            long_term_memory: float = np.random.normal(), 
            short_term_memory: float = np.random.normal(), 
            ):
        """
        Constructor of the class

        Args:
            long_term_memory (float): long term memory
            short_term_memory (float): short term memory
            w7 (float): weight 7
            w8 (float): weight 8
            w9 (float): weight 9
            w10 (float): weight 10
            b4 (float): bias 4
            b5 (float): bias 5
        """
        # Saving the input
        self.long_term_memory = long_term_memory
        self.short_term_memory = short_term_memory
        self.w7 = w7
        self.w8 = w8
        self.b4 = b4

    def forward(self, x: float) -> float:
        """
        Function that calculates the output of the output gate

        Args:
            x (float): input to the output gate

        Returns:
            float: output of the output gate
        """
        # Calculating the short term memory signal 
        short_term_memory_signal = tanh(self.long_term_memory)

        # Calculating the percentage of short term memory to keep 
        percentage_to_keep = sigmoid((self.w7 * x + self.w8 * self.short_term_memory) + self.b4) 

        # Multiplying the short term memory signal by the percentage to keep
        short_term_memory_signal = short_term_memory_signal * percentage_to_keep

        # Updating the short term memory
        self.short_term_memory = short_term_memory_signal

        # The output of the output gate is the new long term memory and the short term memory
        return self.long_term_memory, self.short_term_memory

In [None]:
# Creating the output gate object
output_gate = OutputGate(long_term_memory=lt, short_term_memory=st)

# Forward propagating
lt, st = output_gate.forward(0.5)

print(f'Long term memory: {lt}')
print(f'Short term memory: {st}')

# Defining a simple LSTM class 

In [None]:
# Redefining the forget, input and output gates as functions 
def forget_gate(x: float, w1: float, w2: float, b1: float, long_term_memory: float, short_term_memory: float) -> Tuple[float, float]:
    """
    Function that calculates the output of the forget gate

    Args:
        x (float): input to the forget gate
        w1 (float): weight 1
        w2 (float): weight 2
        b1 (float): bias 1
        long_term_memory (float): long term memory
        short_term_memory (float): short term memory

    Returns:
        Tuple[float, float]: output of the forget gate
    """
    # Calculates the percentage of the long term memory that will be kept
    percentage_to_keep = sigmoid((w1 * x  + w2 * short_term_memory) + b1)

    # Updating the long term memory
    long_term_memory = long_term_memory * percentage_to_keep

    # The output of the forget gate is the new long term memory and the short term memory
    return long_term_memory, short_term_memory

def input_gate(x: float, w3: float, w4: float, w5: float, w6: float, b2: float, b3: float, long_term_memory: float, short_term_memory: float) -> Tuple[float, float]:
    """
    Function that calculates the output of the input gate

    Args:
        x (float): input to the input gate
        w3 (float): weight 3
        w4 (float): weight 4
        w5 (float): weight 5
        w6 (float): weight 6
        b2 (float): bias 2
        b3 (float): bias 3
        long_term_memory (float): long term memory
        short_term_memory (float): short term memory

    Returns:
        Tuple[float, float]: output of the input gate
    """
    # Calculating the memory signal 
    memory_signal = tanh((w3 * x + w4 * short_term_memory) + b2)

    # Calculating the percentage of memory to keep 
    percentage_to_keep = sigmoid((w5 * x + w6 * short_term_memory) + b3) 

    # Multiplying the memory signal by the percentage to keep
    memory_signal = memory_signal * percentage_to_keep

    # Updating the long term memory
    long_term_memory = long_term_memory + memory_signal

    # The output of the input gate is the new long term memory and the short term memory
    return long_term_memory, short_term_memory

def output_gate(x: float, w7: float, w8: float, b4: float, long_term_memory: float, short_term_memory: float) -> Tuple[float, float]:
    """
    Function that calculates the output of the output gate

    Args:
        x (float): input to the output gate
        w7 (float): weight 7
        w8 (float): weight 8
        b4 (float): bias 4
        long_term_memory (float): long term memory
        short_term_memory (float): short term memory

    Returns:
        Tuple[float, float]: output of the output gate
    """
    # Calculating the short term memory signal 
    short_term_memory_signal = tanh(long_term_memory)

    # Calculating the percentage of short term memory to keep 
    percentage_to_keep = sigmoid((w7 * x + w8 * short_term_memory) + b4) 

    # Multiplying the short term memory signal by the percentage to keep
    short_term_memory_signal = short_term_memory_signal * percentage_to_keep

    # Updating the short term memory
    short_term_memory = short_term_memory_signal

    # The output of the output gate is the new long term memory and the short term memory
    return long_term_memory, short_term_memory 


class simpleLSTM: 
    def __init__(
            self, 
            w1: float = np.random.normal(),
            w2: float = np.random.normal(),
            w3: float = np.random.normal(),
            w4: float = np.random.normal(),
            w5: float = np.random.normal(),
            w6: float = np.random.normal(),
            w7: float = np.random.normal(),
            w8: float = np.random.normal(),
            b1: float = np.random.normal(),
            b2: float = np.random.normal(),
            b3: float = np.random.normal(),
            b4: float = np.random.normal(),
            long_term_memory: float = np.random.normal(),
            short_term_memory: float = np.random.normal(),
            ):
        """
        Constructor of the class

        Args:
            long_term_memory (float): long term memory
            short_term_memory (float): short term memory
            w1 (float): weight 1
            w2 (float): weight 2
            w3 (float): weight 3
            w4 (float): weight 4
            w5 (float): weight 5
            w6 (float): weight 6
            w7 (float): weight 7
            w8 (float): weight 8
            b1 (float): bias 1
            b2 (float): bias 2
            b3 (float): bias 3
            b4 (float): bias 4
        """

        # Saving the input
        self.long_term_memory = long_term_memory
        self.short_term_memory = short_term_memory
        self.w1 = w1
        self.w2 = w2
        self.w3 = w3
        self.w4 = w4
        self.w5 = w5
        self.w6 = w6
        self.w7 = w7
        self.w8 = w8
        self.b1 = b1
        self.b2 = b2
        self.b3 = b3
        self.b4 = b4

    def forward(self, x: float) -> float:
        """
        Function that calculates the output of the simple LSTM cell

        Args:
            x (float): input to the simple LSTM cell

        Returns:
            float: output of the simple LSTM cell
        """
        # Calculating the output of the forget gate
        lt, st = forget_gate(x, self.w1, self.w2, self.b1, self.long_term_memory, self.short_term_memory)

        # Updating the long term memory
        self.long_term_memory = lt

        # Calculating the output of the input gate
        lt, st = input_gate(x, self.w3, self.w4, self.w5, self.w6, self.b2, self.b3, self.long_term_memory, self.short_term_memory)

        # Updating the long term memory
        self.long_term_memory = lt

        # Calculating the output of the output gate
        lt, st = output_gate(x, self.w7, self.w8, self.b4, self.long_term_memory, self.short_term_memory)

        # Updating the short term memory
        self.short_term_memory = st

        # The output of the simple LSTM cell is the new long term memory and the short term memory
        return self.long_term_memory, self.short_term_memory
        
    def forward_sequence(self, x: list) -> list:
        """
        Function that forward propagates a sequence of inputs through the simple LSTM cell

        Args:
            x (list): sequence of inputs to the simple LSTM cell

        Returns:
            list: sequence of outputs of the simple LSTM cell
        """
        # Creating a list to store the outputs
        outputs = []

        # Forward propagating each input
        for input in x: 
            # Forward propagating the input
            _, st = self.forward(input)

            # Appending the output to the list
            outputs.append(st)

        # Returning the list of outputs
        return outputs

In [None]:
# Creating the simple LSTM cell object
simple_lstm = simpleLSTM()

# Creating a sequence of x
x = [0.5, 0.6, 0.7, 0.8, 0.9]

# Forward propagating the sequence
outputs = simple_lstm.forward_sequence(x)

# Rounding 
outputs = [round(output, 2) for output in outputs]

# Printing the outputs
print(f'The outputs of the simple LSTM cell are: {outputs}')

In [None]:
# Defining the torch model for sentiment classification 
class SentimentClassifier(torch.nn.Module):
    """
    Class that defines the sentiment classifier model
    """
    def __init__(self, vocab_size, embedding_dim):
        super(SentimentClassifier, self).__init__()

        self.embedding = nn.Embedding(vocab_size + 1, embedding_dim)
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=1, batch_first=True)
        self.fc = nn.Linear(1, 1)  # Output with a single neuron for binary classification
        self.sigmoid = nn.Sigmoid()  # Sigmoid activation

    def forward(self, x):
        x = self.embedding(x)  # Embedding layer
        output, _ = self.lstm(x)  # RNN layer

        # Use the short term memory from the last time step as the representation of the sequence
        x = output[:, -1, :]

        # Fully connected layer with a single neuron
        x = self.fc(x) 
        
        # Converting to probabilities
        x = self.sigmoid(x)

        # Flattening the output
        x = x.squeeze()
        
        return x

# Initiating the model 
model = SentimentClassifier(vocab_size=len(word2idx), embedding_dim=16)

# Initiating the criterion and the optimizer
criterion = nn.BCELoss() # Binary cross entropy loss
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Defining the data loader 
from torch.utils.data import Dataset, DataLoader

class TextClassificationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # The x is named as text_int and the y as airline_sentiment
        x = self.data.iloc[idx]['text_int']
        y = self.data.iloc[idx]['SENTIMENT']
        
        # Converting the x and y to torch tensors
        x = torch.tensor(x)
        y = torch.tensor(y)

        # Converting the y variable to float 
        y = y.float()

        # Returning the x and y
        return x, y
    
# Creating the train and test loaders
train_loader = DataLoader(TextClassificationDataset(train), batch_size=32, shuffle=True)
test_loader = DataLoader(TextClassificationDataset(test), batch_size=32, shuffle=True)

In [None]:
# Defining the number of epochs
epochs = 100

# Setting the model to train mode
model.train()

# Saving of the loss values
losses = []

# Iterating through epochs
for epoch in range(epochs):
    # Initiating the total loss 
    total_loss = 0

    for batch_idx, (inputs, labels) in enumerate(train_loader):
        # Zero the gradients
        optimizer.zero_grad()  # Zero the gradients
        outputs = model(inputs)  # Forward pass

        loss = criterion(outputs, labels)  # Compute the loss
        loss.backward()  # Backpropagation
        optimizer.step()  # Update the model's parameters

        # Adding the loss to the total loss
        total_loss += loss.item()

    # Calculating the average loss
    avg_loss = total_loss / len(train_loader)

    # Appending the loss to the list containing the losses
    losses.append(avg_loss)

    # Printing the loss every n epochs
    if epoch % 20 == 0:
        print(f'Epoch: {epoch}, Loss: {avg_loss}')

In [None]:
# Ploting the loss 
import matplotlib.pyplot as plt

plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss vs. Epoch')
plt.show()

In [None]:
# Setting the model to eval model
model.eval()

# List to track the test acc 
total_correct = 0
total_obs = 0

# Iterating over the test set
for batch_idx, (inputs, labels) in enumerate(test_loader):
    outputs = model(inputs)  # Forward pass

    # Getting the number of correct predictions 
    correct = ((outputs > 0.5).float() == labels).float().sum()

    # Getting the total number of predictions
    total = labels.size(0)

    # Updating the total correct and total observations
    total_correct += correct
    total_obs += total

print(f'The test accuracy is: {total_correct / total_obs}')