In [8]:
import os

import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch import nn
from torch import optim
import torchtext
from torchtext.vocab import Vocab
from torchtext.data import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

In [2]:
os.chdir("../../datasets/SarcasmAmazonReviewsCorpus-master")
dataset_path = os.path.abspath(os.curdir)

data_list = list()
data_list.extend([os.path.join("Ironic", file) for file in os.listdir(os.path.join(dataset_path, "Ironic")) if file.endswith(".txt")])
data_list.extend([os.path.join("Regular", file) for file in os.listdir(os.path.join(dataset_path, "Regular")) if file.endswith(".txt")])

print(data_list[:10])
print(len(data_list))

['Ironic/31_9_R1GE9UI3OWCA8M.txt', 'Ironic/22_15_R3S3PYAPELGTG3.txt', 'Ironic/50_12_R24XCD6S26ID9C.txt', 'Ironic/33_7_R33YPAV3D4QU2P.txt', 'Ironic/45_4_R2S0DJ52DDQGF0.txt', 'Ironic/13_12_R37XGBTD0KEF0P.txt', 'Ironic/42_11_R1HNFW27RW2MWJ.txt', 'Ironic/51_5_R3V2K3R4BCRJ75.txt', 'Ironic/51_14_R22TEMV2FB5OD.txt', 'Ironic/29_18_R1WLZAH4TAPM55.txt']
1254


In [46]:
class CustomDataset(Dataset):
    def __init__(self, data_folder_path, tokenizer):
        self.data_folder_path = data_folder_path
        self.tokenizer = tokenizer
        self.vocabulary = list()

        self.data_list = list()
        self.data_list.extend([os.path.join("Ironic", file) for file in os.listdir(os.path.join(dataset_path, "Ironic")) if file.endswith(".txt")])
        self.data_list.extend([os.path.join("Regular", file) for file in os.listdir(os.path.join(dataset_path, "Regular")) if file.endswith(".txt")])

    def __len__(self):
        # Return the total number of samples in your dataset
        return len(self.data_list)
    
    def __getitem__(self, idx):
        # Load and process the text file at the given index and return it
        os.chdir(self.data_folder_path)
        file_path = self.data_list[idx]

        # Get the label from the file name
        label = 0 if 'Regular' in file_path else 1

        text_data = None
        with open(file_path, 'r', encoding="unicode_escape") as file:
            for line in file:
                if '<REVIEW>' in line:
                        text_data = file.readlines()

        # Search for the <\REVIEW> tag and remove it (and everything after it)
        for i in range(len(text_data)):
            if '</REVIEW>' in text_data[i]:
                text_data = text_data[:i]
                break

        # Remove \n and combine in one string
        text_data = ' '.join([line.replace('\n', '') for line in text_data])
        
        # Tokenize the text data
        text_data = self.tokenizer(text_data)

        input_ids = list()
        for token in text_data:
            if token not in self.vocabulary:
                self.vocabulary.append(token)
            input_ids.append(self.vocabulary.index(token))

        input_ids = torch.tensor(input_ids, dtype=torch.long)

        # Return the processed data and its corresponding label (if applicable)
        return {
            "text": text_data, 
            "input_ids": input_ids,
            "label": label
        }


In [48]:
dataset = CustomDataset(dataset_path, get_tokenizer('basic_english'))

print(dataset[12])

# Suffle the dataset
torch.manual_seed(0)
indices = torch.randperm(len(dataset)).tolist()
dataset = torch.utils.data.Subset(dataset, indices)

# Split the dataset into train, validation and test sets
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

print(len(train_dataset), len(val_dataset), len(test_dataset))

{'text': ['i', 'am', 'an', 'acquisitions', 'officer', 'for', 'an', 'artillery', 'unit', 'in', 'the', 'russian', 'army', '.', 'since', 'mafia', 'hooligans', 'stole', 'all', 'of', 'our', 'equipment', 'to', 'sell', 'to', 'kyrgyzstani', 'rebels', ',', 'we', 'have', 'been', 'looking', 'for', 'a', 'low-cost', 'alternative', 'to', 'the', 't-80', 'main', 'battle', 'tank', '.', 'after', 'successful', 'trials', 'at', 'a', 'facility', 'in', 'moscow', ',', 'this', 'so-called', 'badonkadonk', 'was', 'approved', 'for', 'use', 'in', 'the', 'chechen', 'theatre', '.', 'initial', 'reports', 'were', 'favorable', ',', 'but', 'then', 'somebody', 'noticed', 'that', 'the', 'tank', 'lacked', 'a', 'cannon', ',', 'treads', ',', 'and', 'armor', ',', 'and', 'possessed', 'the', 'engine', 'of', 'an', 'electric', 'bicycle', '.', 'it', 'did', ',', 'however', ',', 'have', 'an', 'excellent', 'audio', 'system', ',', 'but', 'this', 'failed', 'to', 'compensate', 'for', 'its', 'disappointing', '100%', 'mortality', 'rate', 

In [30]:
# Build vocabulary
tokens = []
for data in train_dataset:
    tokens.extend(data['text'])

vocab = build_vocab_from_iterator([tokens])

print(vocab.get_itos()[:10])
print(len(vocab))

['.', 'the', ',', 'and', 'to', 'a', 'i', 'of', "'", 'it']
19036


In [5]:
class Network_model(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size, label_size):
        super(Network_model, self).__init__()

        # Embedding
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # LSTM
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, dropout=0.2)

        # Linear
        self.fc1 = nn.Linear(hidden_dim, 128)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, label_size)

    def forward(self, x):
        x = self.embedding(x)
        x = x.permute(1, 0, 2)
        x, _ = self.lstm(x)
        x = x[-1, :, :]
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

In [64]:
# Evaluation function
def evaluate(model, loss_fn, data_loader, device):
    model.eval() # Set the model to evaluation mode
    loss = 0.0
    accuracy = 0.0

    with torch.no_grad(): # Disable gradient calculation
        correct = 0
        total = 0
        
        for data in data_loader:
            input_ids = data['input_ids']
            label = data['label']
            input_ids, label = input_ids.to(device), label.to(device)

            # Get the prediction
            outputs = model(input_ids)
            _, predicted = torch.max(outputs.data, 1)

            # Calculate the loss
            loss += loss_fn(outputs, label).item()

            # Count the total number of correct predictions
            total += label.size(0)
            correct += (predicted == label).sum().item()

        accuracy = correct / total
        loss /= len(data_loader)
    
    return loss, accuracy


# Training function
def train(num_epochs, model, loss_fn, optimizer, train_loader, validation_loader, device):

    evaluate(model, loss_fn, validation_loader, device)

    for epoch in range(num_epochs):
        train_loss = 0.0
        correct = 0
        total = 0
        model.train()

        for batch_i, data in enumerate(train_loader):
            input_ids = data['input_ids']
            label = data['label']
            input_ids, label = input_ids.to(device), label.to(device)

            # Forward pass
            optimizer.zero_grad() # Clear the gradients
            outputs = model(input_ids)

            # Calculate the loss
            loss = loss_fn(outputs, label)
            train_loss += loss.item()

            # Backpropagation
            loss.backward()

            # Update weights
            optimizer.step()

        # Evaluate the model after each epoch
        valid_loss, valid_accuracy = evaluate(model, loss_fn, validation_loader, device)

        # Print Loss every epoch
        print('Epoch: [{}/{}], Training Loss: {:.6f}, Validation Loss: {:.6f}, Validation Accuracy {:.6f}'.format(epoch + 1, num_epochs, train_loss, valid_loss, valid_accuracy))


In [9]:
# Hyperparameters
num_epochs = 10
learning_rate = 0.001
batch_size = 32
embedding_dim = 128
hidden_dim = 64

In [55]:
# Build iterator
train_loader = DataLoader(train_dataset)
val_loader = DataLoader(val_dataset)
test_loader = DataLoader(test_dataset)

In [61]:
for data in train_loader:
    print(type(data['input_ids']))
    print(type(data['label']))
    break

<class 'torch.Tensor'>
<class 'torch.Tensor'>


In [65]:
# Execution device (mps or cpu)
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print("The model will be running on", device, "device")

model = Network_model(embedding_dim, hidden_dim, len(vocab), 2)
model.to(device)

# Loss function
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Train the model
train(num_epochs, model, loss_fn, optimizer, train_loader, val_loader, device)
print("Training finished!")

The model will be running on mps device
Epoch: [1/10], Training Loss: 656.916227, Validation Loss: 0.658690, Validation Accuracy 0.712000
Epoch: [2/10], Training Loss: 528.581546, Validation Loss: 0.790340, Validation Accuracy 0.632000
Epoch: [3/10], Training Loss: 264.667727, Validation Loss: 1.119410, Validation Accuracy 0.608000
Epoch: [4/10], Training Loss: 88.248260, Validation Loss: 1.555002, Validation Accuracy 0.600000
Epoch: [5/10], Training Loss: 16.898428, Validation Loss: 1.818501, Validation Accuracy 0.688000
Epoch: [6/10], Training Loss: 1.235500, Validation Loss: 1.986187, Validation Accuracy 0.696000
Epoch: [7/10], Training Loss: 0.193590, Validation Loss: 2.107672, Validation Accuracy 0.696000
Epoch: [8/10], Training Loss: 0.092499, Validation Loss: 2.230078, Validation Accuracy 0.688000
Epoch: [9/10], Training Loss: 0.049571, Validation Loss: 2.353275, Validation Accuracy 0.688000
Epoch: [10/10], Training Loss: 0.026950, Validation Loss: 2.477211, Validation Accuracy 

In [66]:
# Test the model
test_loss, test_accuracy = evaluate(model, loss_fn, test_loader, device)
print("Test Loss: {:.6f}, Test Accuracy: {:.6f}".format(test_loss, test_accuracy))

Test Loss: 2.849635, Test Accuracy: 0.634921
