# Import Packages

In [1]:
import re

import numpy as np
import pandas as pd
import spacy
import torch
import torchtext
from sklearn.preprocessing import LabelEncoder
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset
from torchtext.vocab import build_vocab_from_iterator
from tqdm import tqdm

print(f"PyTorch version: {torch.__version__}")
print(f"torchtext version: {torchtext.__version__}")

PyTorch version: 2.2.1+cu118
torchtext version: 0.17.1+cpu


In [2]:
print("Test")

Test


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [ ]:
def seed_everything(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


seed_everything(42)

# Load GloVe embedding

In [None]:
def load_embedding(embedding_file):
    glove_embedding_dict = {}

    with open(embedding_file, encoding="utf8") as embedding_file:
        for line in embedding_file:
            tokens = line.split()
            word = tokens[0]
            word_embedding_vector = np.array(tokens[1:], dtype=np.float64)
            glove_embedding_dict[word] = word_embedding_vector

    return glove_embedding_dict

In [None]:
glove_embedding_dict = load_embedding("data/glove.6B/glove.6B.100d.txt")

# Load data

In [None]:
def read_data(file_name: str):
    data_list = []

    with open(f"data/semeval-tweets/{file_name}.txt", encoding="utf8") as f:
        for line in f:
            fields = line.strip().split("\t")
            data_list.append(fields)

    df = pd.DataFrame(
        data=data_list,
        columns=[
            "tweet_id",
            "tweet_sentiment",
            "tweet_text",
        ],
    )

    return df

In [None]:
training_data = read_data("twitter-training-data")

In [None]:
development_data = read_data("twitter-dev-data")

In [None]:
testing_1_data = read_data("twitter-test1")

In [None]:
testing_2_data = read_data("twitter-test2")

In [None]:
testing_3_data = read_data("twitter-test3")

In [None]:
training_data.head()

# Build vocabulary

We build the vocabulary only on the training data.

In [None]:
def remove_user_mentions(tweet: str):
    user_handle_pattern = re.compile("(@[a-zA-Z0-9_]+)")

    return user_handle_pattern.sub("", tweet)

In [None]:
def remove_tweet_hashtag(tweet: str):
    hashtag_pattern = re.compile("#(\w+)")

    return hashtag_pattern.sub("", tweet)

In [None]:
def remove_url(tweet: str):
    url_pattern = re.compile(
        "http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
    )
    tweet = url_pattern.sub("", tweet)
    return tweet

In [None]:
def remove_special_characters(tweet: str):
    special_characters_pattern = re.compile("[^a-zA-Z0-9\s]")

    return special_characters_pattern.sub("", tweet)

In [None]:
def remove_digits(tweet: str):
    digits_pattern = re.compile(r"\b\d+\b")
    # single character word: \b\w{1}\b

    return digits_pattern.sub("", tweet)

In [None]:
nlp = spacy.load("en_core_web_sm")

In [None]:
def preprocess_tweet(tweet: str, nlp) -> list[str]:
    tweet = remove_url(tweet)  # what about emails?
    tweet = remove_user_mentions(tweet)
    tweet = remove_tweet_hashtag(tweet)
    # tweet = remove_special_characters(tweet)
    tweet = remove_digits(tweet)
    # remove multiple spaces
    tweet = re.sub(r"\s+", " ", tweet)
    # remove leading and trailing spaces
    tweet = tweet.strip()
    # lowercase
    tweet = tweet.lower()

    # tokenize
    doc = nlp(tweet)
    tweet_tokens = [
        token.text
        for token in doc
        if not token.is_stop and not token.is_punct and not token.is_space
    ]

    return tweet_tokens

In [None]:
training_tweets_preprocessed = [preprocess_tweet(tweet, nlp) for tweet in training_data["tweet_text"]]

In [None]:
development_tweets_preprocessed = [preprocess_tweet(tweet, nlp) for tweet in development_data["tweet_text"]]

In [None]:
special_tokens = ["<unk>", "<pad>"]
vocab = build_vocab_from_iterator(training_tweets_preprocessed, specials=special_tokens)
vocab.set_default_index(vocab["<unk>"])

In [None]:
len(vocab)

# Build embedding matrix

In [None]:
# Prepare the embedding matrix
vocab_size = len(vocab)
embedding_dim = 100
embedding_matrix = torch.zeros((vocab_size, embedding_dim))

In [None]:
print(embedding_matrix.shape)

In [None]:
for word, idx in tqdm(vocab.get_stoi().items()):
    if word in glove_embedding_dict:
        embedding_matrix[idx] = torch.tensor(glove_embedding_dict[word])
    else:
        embedding_matrix[idx] = torch.randn(embedding_dim)

In [None]:
embedding_matrix.shape

# Define Dataset

In [None]:
class TweetsDataset(Dataset):
    def __init__(self, tweet_ids, tweets, labels, vocab, label_encoder):
        self.tweet_ids = tweet_ids
        self.tweets = tweets
        self.labels = labels
        self.vocab = vocab
        self.label_encoder = label_encoder

    def __len__(self):
        return len(self.tweets)

    def __getitem__(self, idx):
        tweet_id = self.tweet_ids[idx]
        tweet = self.tweets[idx]
        label = self.labels[idx]

        tweet_tensor = torch.tensor(self.vocab.lookup_indices(tweet))
        tweet_label = self.label_encoder.transform([label])

        return tweet_id, tweet_tensor, tweet_label

In [None]:
encoder = LabelEncoder()

encoder.fit(training_data['tweet_sentiment'])

In [None]:
print(encoder.classes_)

In [None]:
train_dataset = TweetsDataset(training_data['tweet_id'], training_tweets_preprocessed, training_data["tweet_sentiment"],
                              vocab, encoder)

In [None]:
development_dataset = TweetsDataset(development_data['tweet_id'], development_tweets_preprocessed,
                                    development_data["tweet_sentiment"], vocab, encoder)

In [None]:
print(f"Training dataset size: {len(train_dataset)}")
print(f"Development dataset size: {len(development_dataset)}")

In [None]:
def collate_batch(batch):
    tweet_ids = np.array([item[0] for item in batch])
    tweets = [item[1] for item in batch]
    labels = np.array([item[2] for item in batch])

    padded_tweets = pad_sequence(tweets, batch_first=True, padding_value=vocab["<pad>"])

    return tweet_ids, padded_tweets, torch.tensor(labels)

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=256, shuffle=True, collate_fn=collate_batch)

In [None]:
development_dataloader = DataLoader(development_dataset, batch_size=256, shuffle=False, collate_fn=collate_batch)

# Define Model

In [None]:
class LSTMClassifier(torch.nn.Module):
    def __init__(self, embedding_matrix, hidden_dim, output_dim, num_layers, bidirectional, dropout):
        super(LSTMClassifier, self).__init__()

        self.embedding = torch.nn.Embedding.from_pretrained(embedding_matrix, padding_idx=vocab["<pad>"])
        self.lstm = torch.nn.LSTM(
            embedding_matrix.size(1),
            hidden_dim,
            num_layers=num_layers,
            bidirectional=bidirectional,
            dropout=dropout,
            batch_first=True,
        )
        self.fc = torch.nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = torch.nn.Dropout(dropout)

    def forward(self, x):
        embedded = self.embedding(x)
        output, (hidden, cell) = self.lstm(embedded)
        hidden = self.dropout(torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1))
        return self.fc(hidden)

In [None]:
# Instantiate the model
model = LSTMClassifier(embedding_matrix, hidden_dim=256, output_dim=3, num_layers=2, bidirectional=True,
                       dropout=0.2).to(device)

In [None]:
# Define loss function and optimizer
criterion = torch.nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters())
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

In [None]:
# Number of training epochs
n_epochs = 10

In [None]:
def train(dataloader, model, loss_fn, epoch):
    model.train()

    epoch_loss = 0.0
    train_accuracy = 0.0

    for batch in tqdm(dataloader, desc=f"Training Epoch {epoch + 1}"):
        tweet_ids, tweets, labels = batch

        tweets = tweets.to(device)
        labels = labels.squeeze().to(device, dtype=torch.long)

        optimizer.zero_grad()

        output = model(tweets)
        predictions = torch.argmax(output, dim=1)

        train_accuracy += torch.sum(predictions == labels).item()

        loss = loss_fn(output, labels)

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(dataloader), train_accuracy

In [None]:
def evaluate(dataloader, model, loss_fn, epoch):
    model.eval()

    epoch_loss = 0.0
    evaluation_accuracy = 0.0

    with torch.no_grad():
        for batch in tqdm(dataloader, desc=f"Evaluating Epoch {epoch + 1}"):
            tweet_ids, tweets, labels = batch

            tweets = tweets.to(device)
            labels = labels.squeeze().to(device, dtype=torch.long)

            output = model(tweets)
            predictions = torch.argmax(output, dim=1)

            evaluation_accuracy += torch.sum(predictions == labels).item()

            loss = loss_fn(output, labels)

            epoch_loss += loss.item()

    return epoch_loss / len(dataloader), evaluation_accuracy

In [None]:
train_loss_list = []
development_loss_list = []

train_accuracy_list = []
development_accuracy_list = []

for epoch in range(n_epochs):
    train_loss, train_accuracy = train(train_dataloader, model, criterion, epoch)
    development_loss, development_accuracy = evaluate(development_dataloader, model, criterion, epoch)

    train_loss_list.append(train_loss)
    development_loss_list.append(development_loss)

    train_accuracy_list.append(train_accuracy)
    development_accuracy_list.append(development_accuracy)

    print(f"[{epoch + 1}/{n_epochs}], Train Loss: {train_loss:.4f}, Development Loss: {development_loss:.4f}")
    print(f"[{epoch + 1}/{n_epochs}], Train Accuracy: {train_accuracy / len(training_data):.4f}, "
          f"Development Accuracy: {development_accuracy / len(development_data):.4f}")

    print("=" * 80)

In [None]:
import matplotlib.pyplot as plt

plt.plot(train_loss_list, label="Train Loss")
plt.plot(development_loss_list, label="Development Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
plt.plot(train_accuracy_list, label="Train Accuracy")
plt.plot(development_accuracy_list, label="Development Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

In [None]:
sample_tweet = "I love this movie"

In [None]:
sample_tweet_tokens = preprocess_tweet(sample_tweet, nlp)

In [None]:
sample_tweet_tensor = torch.tensor(vocab.lookup_indices(sample_tweet_tokens)).unsqueeze(0)

In [None]:
if torch.cuda.is_available():
    sample_tweet_tensor = sample_tweet_tensor.cuda()

In [None]:
model(sample_tweet_tensor)

In [None]:
torch.argmax(model(sample_tweet_tensor))

In [None]:
sample_tweet = "I hate this movie"

In [None]:
sample_tweet_tokens = preprocess_tweet(sample_tweet, nlp)

In [None]:
sample_tweet_tensor = torch.tensor(vocab.lookup_indices(sample_tweet_tokens)).unsqueeze(0)

In [None]:
if torch.cuda.is_available():
    sample_tweet_tensor = sample_tweet_tensor.cuda()

In [None]:
model(sample_tweet_tensor)

In [None]:
torch.argmax(model(sample_tweet_tensor))