##### Importing essential libraries

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import re
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader, TensorDataset
import seaborn as sns

# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
is_cuda = torch.cuda.is_available()
if is_cuda:
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU not available, CPU used")

##### The following code snippet provided in the course material (Snippet for Implementing RNN model).

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class TwoLayerBiLSTMAttention(nn.Module):
    """
    A PyTorch module that combines:
      - A 2-layer (stacked) Bidirectional LSTM
      - A simple attention mechanism that computes scalar 'attention weights' per time step
      - A final linear (fully connected) layer to produce outputs (e.g., classification)
    """
    def __init__(self, input_dim, hidden_dim, output_dim,
                 num_layers=2, dropout=0.2, bidirectional=True):
        """
        Args:
            input_dim (int):
                Dimensionality of each input feature vector (features per time step).
            hidden_dim (int):
                Dimensionality of the hidden state in each LSTM layer.
            output_dim (int):
                Number of output units (e.g., for classification: number of classes).
            num_layers (int):
                Number of stacked LSTM layers (default=2).
            dropout (float):
                Dropout probability applied between LSTM layers (default=0.2).
            bidirectional (bool):
                Whether to use a bidirectional LSTM (default=True).
        """
        super(TwoLayerBiLSTMAttention, self).__init__()

        # Store constructor parameters
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.bidirectional = bidirectional

        # Define a multi-layer (num_layers) LSTM.
        # If bidirectional=True, it will have two directions (forward and backward) per layer,
        # effectively doubling the hidden state size for that layer.
        # batch_first=True -> input & output tensors have shape: (batch, seq_len, feature)
        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout,
            bidirectional=bidirectional
        )

        # If the LSTM is bidirectional, the output at each time step
        # will have size 2 * hidden_dim (concatenated forward and backward states).
        lstm_output_dim = hidden_dim * 2 if bidirectional else hidden_dim

        # The attention mechanism:
        # We'll transform each LSTM output vector (size = lstm_output_dim) down to a single score.
        #   - First, we project from lstm_output_dim -> 64, apply a Tanh nonlinearity
        #   - Then, project from 64 -> 1
        # The result is a scalar "attention" for each time step, used to compute attention weights.
        self.attn = nn.Sequential(
            nn.Linear(lstm_output_dim, 64),
            nn.Tanh(),
            nn.Linear(64, 1)
        )

        # After computing the weighted sum of LSTM outputs (the "context" vector),
        # we map that context vector to the final output dimensionality.
        self.fc = nn.Linear(lstm_output_dim, output_dim)

    def forward(self, x):
        """
        Forward pass through the model.

        Args:
            x (Tensor):
                Input tensor of shape (batch_size, seq_length, input_dim).

        Returns:
            out (Tensor):
                Output tensor of shape (batch_size, output_dim).
        """
        # 1) Pass the input through the LSTM.
        #    Output shape: lstm_out -> (batch, seq_len, lstm_output_dim)
        #    h_n, c_n -> final hidden and cell states for each layer & direction (not used here).
        lstm_out, (h_n, c_n) = self.lstm(x)

        # 2) Apply the attention network to each time step of the LSTM output.
        #    attention shape: (batch, seq_len, 1), since we produce a single score per time step.
        energy = self.attn(lstm_out)

        # 3) Convert the energy scores into attention weights.
        #    We apply a softmax across the sequence dimension (dim=1).
        #    attention_weights shape: (batch, seq_len, 1)
        attention_weights = F.softmax(energy, dim=1)

        # 4) Compute a weighted sum of the LSTM outputs based on attention weights.
        #    Multiplying elementwise, then summing across the sequence dimension
        #    gives us the "context" vector of shape (batch, lstm_output_dim).
        context = (lstm_out * attention_weights).sum(dim=1)

        # 5) Map the context vector to the final output dimension (e.g., number of classes).
        out = self.fc(context)

        # Return the model's predictions or logits of shape (batch_size, output_dim).
        return out

##### Data Preprocessing: Loading the dataset and preprocessing steps as in Exercise 1.

In [16]:
def remove_unwanted(text):
    text = re.sub(r'http\S+|www\S+', '', text)
    text = re.sub(r'@\w+', '', text)
    text = re.sub(r'#\w+', '', text)
    emojis = re.compile(
        "[\U0001F600-\U0001F64F" # Emoticons
        "\U0001F300-\U0001F5FF"  # Symbols & Pictographs
        "\U0001F680-\U0001F6FF"  # Transport & Map Symbols
        "\U0001F700-\U0001F77F"  # Alchemical Symbols
        "\U0001F780-\U0001F7FF"  # Geometric Shapes Extended
        "\U0001F800-\U0001F8FF"  # Supplemental Arrows-C
        "\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
        "\U0001FA00-\U0001FA6F"  # Chess Symbols
        "\U0001FA70-\U0001FAFF"  # Symbols and Pictographs Extended-A
        "\U00002702-\U000027B0"  # Dingbats
        "\U000024C2-\U0001F251"  # Enclosed characters
        "]", flags=re.UNICODE
    )
    text = emojis.sub(r'', text)
    return text

def preprocessing(sentence):
    sentence = remove_unwanted(sentence)
    sentence = sentence.lower()
    tokens = word_tokenize(sentence, language='english', preserve_line=True)
    stop_words = set(stopwords.words('english'))
    lemmatizer = WordNetLemmatizer()
    filtered_tokens = [lemmatizer.lemmatize(word) for word in tokens if word not in stop_words]
    text = " ".join(filtered_tokens)
    return filtered_tokens

tweets_df = pd.read_csv("datasets/Tweets.csv", encoding="utf-8")
tokens = [preprocessing(sentence) for sentence in tweets_df['text']]
tweets_df['text'] = [" ".join(token) for token in tokens]

label_encoder = LabelEncoder()
tweets_df['label'] = label_encoder.fit_transform(tweets_df['airline_sentiment'])

# Tokenize text data (you can also use other advanced tokenizers like from HuggingFace)
from sklearn.feature_extraction.text import CountVectorizer
vectorizer = CountVectorizer(stop_words='english')
X = vectorizer.fit_transform(tweets_df['text']).toarray()
y = tweets_df['label'].values

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train = torch.tensor(X_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)

# DataLoader
train_data = TensorDataset(X_train, y_train)
test_data = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

In [17]:
input_dim = X.shape[1]  # number of TF-IDF features
hidden_dim = 128
output_dim = len(label_encoder.classes_)

# Instantiate the model
model = TwoLayerBiLSTMAttention(input_dim, hidden_dim, output_dim)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Assume we have training and validation data loaders: train_loader, val_loader
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    correct_preds = 0
    total_preds = 0
    
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct_preds += (predicted == labels).sum().item()
        total_preds += labels.size(0)
    
    accuracy = correct_preds / total_preds
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {train_loss/len(train_loader)}, Accuracy: {accuracy}")

RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x64 and 256x3)

##### Initialize the model.

In [None]:
model = TwoLayerBiLSTMAttention(input_dim=X_train.shape[1], hidden_dim=64, output_dim=len(np.unique(y)))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Assume we have training and validation data loaders: train_loader, val_loader
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    correct_preds = 0
    total_preds = 0
    
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct_preds += (predicted == labels).sum().item()
        total_preds += labels.size(0)
    
    accuracy = correct_preds / total_preds
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {train_loss/len(train_loader)}, Accuracy: {accuracy}")

In [None]:
# After training, evaluate on validation/test set
model.eval()
y_true = []
y_pred = []

with torch.no_grad():
    for inputs, labels in val_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        y_true.extend(labels.numpy())
        y_pred.extend(predicted.numpy())

# Confusion Matrix and Classification Report
conf_matrix = confusion_matrix(y_true, y_pred)
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

print(classification_report(y_true, y_pred))