In [1]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import re
import json
import spacy
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from wordcloud import WordCloud
from sklearn.metrics import accuracy_score,f1_score, precision_score, recall_score, confusion_matrix
import random
random.seed(0)
import numpy as np
np.random.seed(0)
from sklearn.model_selection import train_test_split,StratifiedKFold
import itertools
from sklearn.preprocessing import StandardScaler, LabelEncoder
import gensim
from gensim.scripts.glove2word2vec import glove2word2vec
import torch
torch.manual_seed(0)
import torch.nn as nn
from gensim.models import Word2Vec
from nltk.tokenize import word_tokenize
import nltk
from gensim.models.phrases import Phrases, Phraser
nltk.download('punkt')
from torch.utils.data import TensorDataset, DataLoader
import optuna

ModuleNotFoundError: No module named 'pandas'

In [None]:
%%capture
!pip install torchtext

In [None]:
import torchtext
from torchtext import data
from torchtext.data import get_tokenizer #https://stackoverflow.com/questions/42711144/how-can-i-install-torchtext

<h1>Load our data</h1>

In [None]:
df_train_set = pd.read_csv('/kaggle/input/ys19-2023-assignment-3/train_set.csv')
df_test_set = pd.read_csv('/kaggle/input/ys19-2023-assignment-3/test_set.csv')
df_valid_set = pd.read_csv('/kaggle/input/ys19-2023-assignment-3/valid_set.csv')

<h1>We print our data and check for null values</h1><br>

We can observe from the output of the **info** method that there are no null values

In [None]:
print(df_train_set.head(),'\n')
print(df_train_set.info(), '\n')

print(df_valid_set.head(),'\n')
print(df_valid_set.info(), '\n')

print(df_test_set.head(),'\n')
print(df_test_set.info(), '\n')

<h1>Barplots that illustrate the number of tweets and their sentiment for each party</h1>

In [None]:
group_df_by_sentiment_party_train = df_train_set.groupby(['Sentiment', 'Party']).size().reset_index(name='NumOfTweets')
group_df_by_sentiment_party_valid = df_valid_set.groupby(['Sentiment', 'Party']).size().reset_index(name='NumOfTweets')

In [None]:
plt.figure(figsize=(12,8))
sns.barplot(x='Party', y='NumOfTweets', hue='Sentiment', data=group_df_by_sentiment_party_train)
plt.title('Number of Tweets/Sentiment per Party for Train set')
plt.show()

In [None]:
plt.figure(figsize=(12,8))
sns.barplot(x='Party', y='NumOfTweets', hue='Sentiment', data=group_df_by_sentiment_party_valid)
plt.title('Number of Tweets/Sentiment per Party for Valid set')
plt.show()

<h1>Plot the number of tweets for each party</h1>

In [None]:
df_train_set['Party'].value_counts().plot(kind='bar', figsize=(12,8))
plt.title('Number of Tweets/Party for Train set')
plt.ylabel('Num of Tweets')
plt.xlabel('Party')
plt.xticks(rotation=45)
plt.show()

In [None]:
df_valid_set['Party'].value_counts().plot(kind='bar', figsize=(12,8))
plt.title('Number of Tweets/Party for Validation set')
plt.ylabel('Num of Tweets')
plt.xlabel('Party')
plt.xticks(rotation=45)
plt.show()

In [None]:
df_test_set['Party'].value_counts().plot(kind='bar', figsize=(12,8))
plt.title('Number of Tweets/Party for Test set')
plt.ylabel('Num of Tweets')
plt.xlabel('Party')
plt.xticks(rotation=45)
plt.show()

<h1>Data preprocessing</h1>
<h4>Turn the categorical values to numerical</h4>

In [None]:
df_train_set['Sentiment'].head()

In [None]:
le = LabelEncoder()

df_train_set['Sentiment'] = le.fit_transform(df_train_set['Sentiment'])
df_valid_set['Sentiment'] = le.fit_transform(df_valid_set['Sentiment'])
print(df_train_set['Sentiment'].head())
print(df_valid_set['Sentiment'].head())

<h4>Function that turn the text of each tweet to lowercase, removes stopwords and special charachters, urls, mentions e.t.c</h4>

In [None]:
# NOTE: To remove the stopwords I downloaded locally the stopwords-el.json file from the repository
# at https://github.com/stopwords-iso/stopwords-el and uploaded it
# to my notebook at gree-stopwords-json-file.

# Load Greek stopwords from the JSON file
with open('/kaggle/input/stopwords/stopwords_el_2.json', 'r', encoding='utf-8') as file:
    greek_stopwords = json.load(file)

def preprocess_tweet(tweet):
    tweet = tweet.lower().replace('_', ' ')
    
    # delete mentions
    tweet = re.sub(r'@\w+', '', tweet)
    
    # delete urls
    tweet = re.sub(r'http\S+', '', tweet)
    
    # delete special characters but keep the alphanumeric ones, including all Greek letters
    #tweet = re.sub(r'[^αβγδεζηθικλμνξοπρστυφχψωςάέίόώύήΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩa-zA-Z0-9\s]', '', tweet)
    
    # I keep only greek charachters (I used to keep and english, I am trying it this way to see if I
    # will achieve higher f1 score)
    tweet = re.sub(r'[^αβγδεζηθικλμνξοπρστυφχψωςάέίόώύήΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ0-9\s]', '', tweet)
    
    # delete Greek stopwords
    tweet_words = tweet.split()
    cleaned_words = [word for word in tweet_words if word not in greek_stopwords]
    tweet = ' '.join(cleaned_words)

    tweet = tweet.strip()
    
    return tweet

In [None]:
df_train_set['Text'] = df_train_set['Text'].apply(preprocess_tweet)
df_test_set['Text'] = df_test_set['Text'].apply(preprocess_tweet)
df_valid_set['Text'] = df_valid_set['Text'].apply(preprocess_tweet)

In [None]:
print(df_train_set['Text'].head(), '\n')
print(df_test_set['Text'].head(), '\n')
print(df_valid_set['Text'].head(), '\n')

**We load the spacy model to perform lemmatization tokenaziation for greek words** <br>
Sometimes it's necessary to restart the kernel in order for the following to work

<h1>ATTENTION:</h1><h2>The following command needs to be executed only one time. If an error occurs from the following spacy.load() command just restart the kernel and run all the commands except this one.</h2>

In [None]:
%%capture
# This needs to be executed only one time. If an error occurs from the following spacy.load() command
# just restart the kernel and run all the commands except this one.
!pip install -U spacy  

In [None]:
nlp = spacy.load('/kaggle/input/el-core-news-lg-4/el_core_news_lg_3/el_core_news_lg-3.7.0')

<h1>LEMMATIZATION - TOKENIZATION</h1>

In [None]:
# For the lemmatization tokenazation step, I downloaded locally the el_core_news_lg model,
# then I zipped it and I uploaded it as a public dataset.
def lemmatize_tokenize_text(text):
    doc = nlp(text)
    return ' '.join([token.lemma_ for token in doc])


df_train_set['Text'] = df_train_set['Text'].apply(lemmatize_tokenize_text)
df_test_set['Text'] = df_test_set['Text'].apply(lemmatize_tokenize_text)
df_valid_set['Text'] = df_valid_set['Text'].apply(lemmatize_tokenize_text)

In [None]:
print(df_train_set['Text'].head(), '\n')
print(df_test_set['Text'].head(), '\n')
print(df_valid_set['Text'].head(), '\n')

<h1> Number of Unique words - Wordcloud</h1>

In [None]:
def unique_words_num(tweets):
    # Function that counts the number of the unique words from the Text column of each dataframe
    words = set() 
    for tweet in tweets:
        words.update(tweet.split())
    return len(words)

In [None]:
print("Num of unique words in df_train_set:", unique_words_num(df_train_set['Text']))
print("Num of unique words in df_test_set:", unique_words_num(df_test_set['Text']))
print("Num of unique words in df_valid_set:", unique_words_num(df_valid_set['Text']))

In [None]:
def plot_wordcloud(tweets, title):
    tweets_joined = ' '.join(tweets)
    wordcloud = WordCloud(width=800, height=400, background_color='white', max_words=200).generate(tweets_joined)
    plt.figure(figsize=(10, 5))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.title(title)
    plt.show()

# Plot word cloud for each dataframe
plot_wordcloud(df_train_set['Text'], 'df_train_set')
plot_wordcloud(df_test_set['Text'], 'df_test_set')
plot_wordcloud(df_valid_set['Text'], 'df_valid_set')

<h1>Word2Vec</h1>
<p>Μετατροπή κειμένου σε vector representation με Word2Vec</p>
<p>Train Word2Vec model</p>

In [None]:
all_text = pd.concat([df_train_set['Text'], df_test_set['Text'], df_valid_set['Text']])
tokenized_text = [text.split() for text in all_text]

# training
w2v_model = Word2Vec(tokenized_text, vector_size=100, window=5, min_count=1, workers=4)

<h2>Let's examine the distribution of the sequence lengths</h2>

In [None]:
lengths = [len(text.split()) for text in all_text]
plt.hist(lengths, bins=range(1, max(lengths)+1))
plt.xlabel('Sequence Length')
plt.ylabel('Frequency')
plt.title('Distribution of Sequence Lengths')
plt.show()

In [None]:
# https://spotintelligence.com/2023/02/15/word2vec-for-text-classification/
def vectorize(sentence, w2v_model, max_length):
    words = sentence.split()
    words_vecs = [w2v_model.wv[word] for word in words if word in w2v_model.wv][:max_length]
    padded_vecs = np.zeros((max_length, w2v_model.vector_size))

    if len(words_vecs) > 0:
        padded_vecs[:len(words_vecs)] = words_vecs

    return padded_vecs # we return padded sequences



In [None]:
df_train_set_vectorized = df_train_set
df_valid_set_vectorized = df_valid_set
df_test_set_vectorized = df_test_set

In [None]:
max_length = 30
df_train_set_vectorized['Text'] = df_train_set_vectorized['Text'].apply(lambda x: vectorize(x, w2v_model, max_length))
df_test_set_vectorized['Text'] = df_test_set_vectorized['Text'].apply(lambda x: vectorize(x, w2v_model, max_length))
df_valid_set_vectorized['Text'] = df_valid_set_vectorized['Text'].apply(lambda x: vectorize(x, w2v_model, max_length))

In [None]:
df_train_set_vectorized['Text'].head()

In [None]:
w2v_model.vector_size

<h2>Create dataloaders</h2>

In [None]:
# I do the following in order to avoid the following Warning
# UserWarning: Creating a tensor from a list of numpy.ndarrays is extremely slow.
# convert list of numpy arrays to a single numpy array
X_train_np = np.stack(df_train_set_vectorized['Text'].values)
X_valid_np = np.stack(df_valid_set_vectorized['Text'].values)
X_test_np = np.stack(df_test_set_vectorized['Text'].values)

# convert numpy arrays to torch tensors
X_train = torch.tensor(X_train_np, dtype=torch.float)
y_train = torch.tensor(df_train_set_vectorized['Sentiment'].values, dtype=torch.long)
X_valid = torch.tensor(X_valid_np, dtype=torch.float)
y_valid = torch.tensor(df_valid_set_vectorized['Sentiment'].values, dtype=torch.long)
X_test = torch.tensor(X_test_np, dtype=torch.float)

# (num_samples, sequence_length, embedding_size)
print("Shape of X_train:", X_train.shape)
print("Shape of X_valid:", X_valid.shape)
print("Shape of X_test:", X_test.shape)

In [None]:
train_dataset = TensorDataset(X_train, y_train)
valid_dataset = TensorDataset(X_valid, y_valid)
test_dataset = TensorDataset(X_test)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

<h2>Plotting function</h2>

In [None]:
def plot_f1_recall_precision(train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision):
    plt.figure(figsize=(18, 6))  # Adjust the figure size as needed

    # Plotting training and validation F1 Score
    plt.subplot(1, 3, 1)  # First subplot in a 1x3 grid
    plt.plot(train_f1, label='Train F1 Score')
    plt.plot(valid_f1, label='Validation F1 Score')
    plt.title('Training and Validation F1 Score')
    plt.xlabel('Epochs')
    plt.ylabel('F1 Score')
    plt.legend()

    # Plotting training and validation Precision Score
    plt.subplot(1, 3, 2)  # Second subplot in a 1x3 grid
    plt.plot(train_precision, label='Train Precision Score')
    plt.plot(valid_precision, label='Validation Precision Score')
    plt.title('Training and Validation Precision Scores')
    plt.xlabel('Epochs')
    plt.ylabel('Precision Score')
    plt.legend()

    # Plotting training and validation Recall Score
    plt.subplot(1, 3, 3)  # Third subplot in a 1x3 grid
    plt.plot(train_recall, label='Train Recall Score')
    plt.plot(valid_recall, label='Validation Recall Score')
    plt.title('Training and Validation Recall Score')
    plt.xlabel('Epochs')
    plt.ylabel('Recall Score')
    plt.legend()

    # Show the plot
    plt.tight_layout()  # Adjusts the subplots to fit into the figure area.
    plt.show()

<h1>Experiments with layers</h1>
<p>Initially we construct a RNN with LSTM cells</p>

In [None]:
class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNNModel, self).__init__()
        self.num_layers = num_layers # for forward method
        self.hidden_size = hidden_size # for forward method
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        #hidden state - cell state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))  #(batch_size, seq_length, hidden_size)
        out = self.fc(out[:, -1, :])
        return out

In [None]:
def train_model(model, loss_func, optimizer, num_epochs, train_loader, valid_loader):

    train_f1, valid_f1 = [], []
    train_precision, valid_precision = [], []
    train_recall, valid_recall = [], []
    
    best_valid_f1 = 0
    epochs_no_improve = 0
    patience = 20
    
    for epoch in range(num_epochs):
        model.train()
        train_preds, train_labels = [], []

        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_func(outputs, labels.long())
            loss.backward()
            optimizer.step()

            _, predicted = torch.max(outputs.data, 1)
            train_preds.extend(predicted.cpu().numpy())
            train_labels.extend(labels.cpu().numpy())

        train_f1.append(f1_score(train_labels, train_preds, average='weighted', zero_division=0))
        train_precision.append(precision_score(train_labels, train_preds, average='weighted', zero_division=0))
        train_recall.append(recall_score(train_labels, train_preds, average='weighted', zero_division=0))

        # Validation phase
        model.eval()
        valid_loss = 0
        valid_preds, valid_labels = [], []
        with torch.no_grad():
            for inputs, labels in valid_loader:
                outputs = model(inputs)
                loss = loss_func(outputs, labels.long())
                valid_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                valid_preds.extend(predicted.cpu().numpy())
                valid_labels.extend(labels.cpu().numpy())

        valid_f1.append(f1_score(valid_labels, valid_preds, average='weighted', zero_division=0))
        valid_precision.append(precision_score(valid_labels, valid_preds, average='weighted', zero_division=0))
        valid_recall.append(recall_score(valid_labels, valid_preds, average='weighted', zero_division=0))
            
        print(f'Epoch {epoch+1}/{num_epochs}, '
              f'Train F1: {train_f1[-1]:.2f}, Valid F1: {valid_f1[-1]:.2f}, '
              f'Train Precision: {train_precision[-1]:.2f}, Valid Precision: {valid_precision[-1]:.2f}, '
              f'Train Recall: {train_recall[-1]:.2f}, Valid Recall: {valid_recall[-1]:.2f}')

        temp_f1 = round(valid_f1[-1], 2)
        if temp_f1 > best_valid_f1:
            best_valid_f1 = temp_f1
            epochs_no_improve = 0
            
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= patience:
            print(f'Early stopping triggered after {epoch + 1} epochs')
            break

    print(f'MEAN TRAIN F1 SCORE: {np.mean(train_f1):.2f}\n'
          f'MEAN VALIDATION F1 SCORE: {np.mean(valid_f1):.2f}\n'
          f'MEAN TRAIN PRECISION: {np.mean(train_precision):.2f}\n'
          f'MEAN VALIDATION PRECISION: {np.mean(valid_precision):.2f}\n'
          f'MEAN TRAIN RECALL: {np.mean(train_recall):.2f}\n'
          f'MEAN VALIDATION RECALL: {np.mean(valid_recall):.2f}\n')
    return model, train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision

In [None]:
def objective(trial):
    # num of layers in [1, 10]
    num_layers = trial.suggest_int('num_layers', 1, 10)
    num_epochs = 100
    # Here we create our model
    model = RNNModel(input_size=100, hidden_size=128, num_layers=num_layers, num_classes=3)
    
    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    # training
    model, train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision = train_model(model, loss_func, optimizer, num_epochs, train_loader, valid_loader)
    print(f'\n\nFOR num_layers={num_layers}\n\n')
    plot_f1_recall_precision(train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision)
    
    
    return valid_f1[-1] # return the f1 score of the last epoch

In [None]:
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=10)

# Get the best number of layers
optimal_layers = study.best_params['num_layers']
print(f'Optimal Number of Layers: {optimal_layers}')

<h1>Experiments with hidden_size</h1>

In [None]:
def objective_hidden_size(trial):

    hidden_size = trial.suggest_int('hidden_size', 32, 256) # suggest int in range [32, 256]
    num_epochs = 100
    
    model = RNNModel(input_size=100, hidden_size=hidden_size, num_layers=2, num_classes=3)

    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    print(f'\n\nFOR hidden_size={hidden_size}\n\n')
    model, train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision = train_model(model, loss_func, optimizer, num_epochs, train_loader, valid_loader)
    plot_f1_recall_precision(train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision)

    # return the f1 score of the last epoch as the metric to optimize
    return valid_f1[-1]

In [None]:
study_hidden_size = optuna.create_study(direction='maximize')
study_hidden_size.optimize(objective_hidden_size, n_trials=10)

optimal_hidden_size = study_hidden_size.best_params['hidden_size']
print(f'Optimal Hidden Size: {optimal_hidden_size}')

<h1>Experiments with cell type</h1>

In [None]:
class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, cell_type='LSTM'):
        super(RNNModel, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.cell_type = cell_type
        
        if cell_type == 'LSTM':
            self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        elif cell_type == 'GRU':
            self.rnn = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        else:
            raise ValueError("Unsupported RNN cell type. Choose 'LSTM' or 'GRU'.")

        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        if self.cell_type == 'LSTM':
            c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
            out, _ = self.rnn(x, (h0, c0))
        elif self.cell_type == 'GRU':
            out, _ = self.rnn(x, h0)
        out = self.fc(out[:, -1, :])
        return out


In [None]:
def objective(trial):
    hidden_size = 88
    num_layers = 2
    cell_type = trial.suggest_categorical('cell_type', ['LSTM', 'GRU'])
    num_epochs = 100
    model = RNNModel(input_size=100, hidden_size=hidden_size, num_layers=num_layers, num_classes=3, cell_type=cell_type)

    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
    print(f'\n\nFOR cell type={cell_type}\n\n')
    model, train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision = train_model(model, loss_func, optimizer, num_epochs, train_loader, valid_loader)
    plot_f1_recall_precision(train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision)


    return valid_f1[-1]  # return the f1 score of the last epoch


In [None]:
study_cell_type = optuna.create_study(direction='maximize')
study_cell_type.optimize(objective, n_trials=4)

optimal_cell_type = study_cell_type.best_params['cell_type']
print(f'Optimal Cell Type: {optimal_cell_type}')

<h1>Experiment with skip connections</h1>

In [None]:
class RNNModelSkipConns(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, skip_connections):
        super(RNNModelSkipConns, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.skip_connections = skip_connections

        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        outputs = []
        if self.skip_connections:
            for i in range(self.num_layers):
                x, (h0, c0) = self.lstm(x, (h0, c0))
                outputs.append(x)
                if self.skip_connections and i > 0:
                    x = x + outputs[i - 1]  #

            out = self.fc(outputs[-1][:, -1, :])
            return out
        else:
            out, _ = self.lstm(x, (h0, c0))  #(batch_size, seq_length, hidden_size)
            out = self.fc(out[:, -1, :])
            return out


In [None]:
def objective(trial):
    hidden_size = 100
    num_layers = 2
    skip_conn = trial.suggest_categorical('skip_connections', [True, False])
    num_epochs = 100
    model = RNNModelSkipConns(input_size=100, hidden_size=hidden_size, num_layers=num_layers, num_classes=3, skip_connections=skip_conn)

    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
    print(f'\n\nFOR skip connection={skip_conn}\n\n')
    model, train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision = train_model(model, loss_func, optimizer, num_epochs, train_loader, valid_loader)
    plot_f1_recall_precision(train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision)


    return valid_f1[-1] 

In [None]:
study_skip_connections = optuna.create_study(direction='maximize')
study_skip_connections.optimize(objective, n_trials=4)

optimal_skip_connections = study_skip_connections.best_params['skip_connections']
print(f'Optimal Skip Connections value: {optimal_skip_connections}')

<h1>Experiments with gradient clipping</h1>

In [None]:
def train_model_with_clipping(model, loss_func, optimizer, num_epochs, train_loader, valid_loader, clip=None):

    train_f1, valid_f1 = [], []
    train_precision, valid_precision = [], []
    train_recall, valid_recall = [], []
    
    best_valid_f1 = 0
    epochs_no_improve = 0
    patience = 20
    
    for epoch in range(num_epochs):
        model.train()
        train_preds, train_labels = [], []

        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_func(outputs, labels.long())
            loss.backward()
            if clip is not None:
                torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
            optimizer.step()

            _, predicted = torch.max(outputs.data, 1)
            train_preds.extend(predicted.cpu().numpy())
            train_labels.extend(labels.cpu().numpy())

        train_f1.append(f1_score(train_labels, train_preds, average='weighted', zero_division=0))
        train_precision.append(precision_score(train_labels, train_preds, average='weighted', zero_division=0))
        train_recall.append(recall_score(train_labels, train_preds, average='weighted', zero_division=0))

        # Validation phase
        model.eval()
        valid_loss = 0
        valid_preds, valid_labels = [], []
        with torch.no_grad():
            for inputs, labels in valid_loader:
                outputs = model(inputs)
                loss = loss_func(outputs, labels.long())
                valid_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                valid_preds.extend(predicted.cpu().numpy())
                valid_labels.extend(labels.cpu().numpy())

        valid_f1.append(f1_score(valid_labels, valid_preds, average='weighted', zero_division=0))
        valid_precision.append(precision_score(valid_labels, valid_preds, average='weighted', zero_division=0))
        valid_recall.append(recall_score(valid_labels, valid_preds, average='weighted', zero_division=0))
            
        print(f'Epoch {epoch+1}/{num_epochs}, '
              f'Train F1: {train_f1[-1]:.2f}, Valid F1: {valid_f1[-1]:.2f}, '
              f'Train Precision: {train_precision[-1]:.2f}, Valid Precision: {valid_precision[-1]:.2f}, '
              f'Train Recall: {train_recall[-1]:.2f}, Valid Recall: {valid_recall[-1]:.2f}')

        temp_f1 = round(valid_f1[-1], 2)
        if temp_f1 > best_valid_f1:
            best_valid_f1 = temp_f1
            epochs_no_improve = 0
            
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= patience:
            print(f'Early stopping triggered after {epoch + 1} epochs')
            break

    print(f'MEAN TRAIN F1 SCORE: {np.mean(train_f1):.2f}\n'
          f'MEAN VALIDATION F1 SCORE: {np.mean(valid_f1):.2f}\n'
          f'MEAN TRAIN PRECISION: {np.mean(train_precision):.2f}\n'
          f'MEAN VALIDATION PRECISION: {np.mean(valid_precision):.2f}\n'
          f'MEAN TRAIN RECALL: {np.mean(train_recall):.2f}\n'
          f'MEAN VALIDATION RECALL: {np.mean(valid_recall):.2f}\n')
    

    return model, train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision

In [None]:
clip_vals = [round(val, 1) for val in list(np.linspace(0.5,10, 5, endpoint=False))]
max_f1_dt = {'Clipping value':[], 'Val f1 score':[]}

In [None]:
clip_vals.append(None)
for val in clip_vals:
    hidden_size = 88
    num_layers = 2
    num_epochs = 100
    model = RNNModel(input_size=100, hidden_size=hidden_size, num_layers=num_layers, num_classes=3)

    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    print(f'\n\nFOR clipping value={val}\n\n')
    model, train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision = train_model_with_clipping(model, loss_func, optimizer, num_epochs, train_loader, valid_loader, val)
    plot_f1_recall_precision(train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision)
    max_f1_dt['Clipping value'].append(val)
    max_f1_dt['Val f1 score'].append(np.mean(valid_f1))
max_df = pd.DataFrame.from_dict(max_f1_dt)
optimal_clip_value = max_df.loc[max_df['Val f1 score'].idxmax(), 'Clipping value']
print(f'The optimal value for gradient clipping is {optimal_clip_value}')

<h1>Experiments with dropout</h1>

In [None]:
class RNNModelDropout(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout_rate=0.0):
        super(RNNModelDropout, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, dropout=dropout_rate, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

In [None]:
dropout_vals = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5]  
max_f1_dt = {'Dropout value': [], 'Val f1 score': []}
num_epochs = 100
model_dropout_best = None
best_valid_f1 = 0
for val in dropout_vals:
    print(f'\n\nFOR dropout value={val}\n\n')
    model = RNNModelDropout(input_size=100, hidden_size=88, num_layers=2, num_classes=3, dropout_rate=val)
    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    model, train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision = train_model(model, loss_func, optimizer, num_epochs, train_loader, valid_loader)
    if np.mean(valid_f1) > np.mean(best_valid_f1):
        model_dropout_best = model
    plot_f1_recall_precision(train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision)
    max_f1_dt['Dropout value'].append(val)
    max_f1_dt['Val f1 score'].append(np.mean(valid_f1))

max_df = pd.DataFrame.from_dict(max_f1_dt)
optimal_dropout_value = max_df.loc[max_df['Val f1 score'].idxmax(), 'Dropout value']
print(f'The optimal value for dropout is {optimal_dropout_value}')


<h1>Attention</h1>

In [None]:
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.hidden_size = hidden_size
        self.attn = nn.Linear(self.hidden_size, 1)

    def forward(self, outputs):
        attn_weights = nn.functional.softmax(self.attn(outputs), dim=1)
        context = torch.sum(attn_weights * outputs, dim=1)
        return context, attn_weights


In [None]:
class RNNModelAttention(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout_rate=0.0):
        super(RNNModelAttention, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, dropout=dropout_rate, batch_first=True)
        self.attention = Attention(hidden_size)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        outputs, _ = self.lstm(x, (h0, c0))
        context, attn_weights = self.attention(outputs)
        out = self.fc(context)
        return out


In [None]:
num_epochs = 100
model = RNNModelAttention(100, 88, 2, 3, 0.1)
loss_func = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
model_attention, train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision = train_model(model, loss_func, optimizer, num_epochs, train_loader, valid_loader)
plot_f1_recall_precision(train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision)

<h1>Enhanced attention</h1>

In [None]:
class EnhancedAttention(nn.Module):
    def __init__(self, hidden_size):
        super(EnhancedAttention, self).__init__()
        self.hidden_size = hidden_size
        self.query = nn.Parameter(torch.randn(hidden_size), requires_grad=True)  
        self.attn = nn.Linear(self.hidden_size, self.hidden_size)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, lstm_outputs):
        transformed_outputs = self.attn(lstm_outputs)  # (batch_size, seq_len, hidden_size)
        scores = torch.matmul(transformed_outputs, self.query)  # (batch_size, seq_len)
        attn_weights = self.softmax(scores)  # (batch_size, seq_len)
        context = torch.sum(lstm_outputs * attn_weights.unsqueeze(2), dim=1)
        return context, attn_weights


In [None]:
class RNNModelWithEnhancedAttention(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout_rate=0.0):
        super(RNNModelWithEnhancedAttention, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, dropout=dropout_rate, batch_first=True)
        self.attention = EnhancedAttention(hidden_size)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        lstm_outputs, _ = self.lstm(x, (h0, c0))
        context, attn_weights = self.attention(lstm_outputs)
        out = self.fc(context)
        return out


In [None]:
num_epochs = 100
model = RNNModelWithEnhancedAttention(100, 88, 2, 3, 0.1)
loss_func = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
model_attention_enchanced, train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision = train_model(model, loss_func, optimizer, num_epochs, train_loader, valid_loader)
plot_f1_recall_precision(train_f1, valid_f1, train_recall, valid_recall, train_precision, valid_precision)

<h1>Predictions</h1>

In [None]:
model_attention_enchanced.eval() # We use the second attention model

with torch.no_grad():
    test_preds = []
    for inputs in test_loader:
        inputs = inputs[0]
        outputs = model_attention_enchanced(inputs)
        _, predicted = torch.max(outputs.data, 1)
        test_preds.extend(predicted.cpu().numpy())

predicted_labels = le.inverse_transform(test_preds)

In [None]:
results = pd.DataFrame({
    'Id': df_test_set['New_ID'],
    'Predicted': predicted_labels
})

results.to_csv('submission.csv', index=False)
results