In [None]:
import os
import tarfile
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader, random_split
import nltk
from nltk.data import find
from nltk.tokenize import TreebankWordTokenizer
from nltk.corpus import sentiwordnet as swn


# Extract IMDb Dataset
def extract_imdb_dataset(tar_path, extract_to="./imdb_dataset"):
    if not os.path.exists(tar_path):
        print(f"Dataset file '{tar_path}' not found.")
        print("Please download the IMDb dataset manually and place it in the specified path.")
        return None

    if not os.path.exists(extract_to) or not os.listdir(extract_to):
        print("Extracting IMDb dataset...")
        with tarfile.open(tar_path, "r:gz") as tar:
            tar.extractall(path=extract_to)
        print("Extraction complete.")
    else:
        print("IMDb dataset already extracted.")
    return extract_to


# Load IMDb Dataset
def load_imdb_data(dataset_dir):
    print("Loading data...")
    texts, labels = [], []
    for split in ["train", "test"]:
        for sentiment in ["pos", "neg"]:
            dir_path = os.path.join(dataset_dir, split, sentiment)
            label = 1 if sentiment == "pos" else 0
            for file in os.listdir(dir_path):
                with open(os.path.join(dir_path, file), "r", encoding="utf-8") as f:
                    texts.append(f.read())
                    labels.append(label)
    print("Data has been loaded.")
    return texts, labels


# Tokenize Texts and Build Vocabulary
def build_vocab(texts, tokenizer, special_tokens=["<PAD>", "<UNK>"]):
    print("Tokenizing data...")
    vocab = {token: idx for idx, token in enumerate(special_tokens)}
    for text in texts:
        tokens = tokenizer.tokenize(text.lower())
        for token in tokens:
            if token not in vocab:
                vocab[token] = len(vocab)
    print("Data has been tokenized and vocabulary has been built.")
    return vocab


# Define a function to build the lexicon
def build_lexicon_from_swn():
    print("Building lexicon...") 
    
    # Function to ensure `sentiwordnet` and `wordnet` resources are downloaded
    def ensure_resource_downloaded(resource_name):
        try:
            find(resource_name)
            print(f"Resource '{resource_name}' is already downloaded.")
        except LookupError:
            print(f"Resource '{resource_name}' not found. Downloading now...")
            nltk.download(resource_name)

    ensure_resource_downloaded("sentiwordnet")
    ensure_resource_downloaded("wordnet")
    
    lexicon = {}
    for word in swn.all_senti_synsets():
        sentiment_score = word.pos_score() - word.neg_score()
        if word.synset.name().split('.')[0] not in lexicon:
            lexicon[word.synset.name().split('.')[0]] = sentiment_score
    print("Lexicon has been built.")
    return lexicon


# Define Preprocessing Functions
def preprocess_text(text, vocab, max_len):
    tokenizer = TreebankWordTokenizer()
    tokens = tokenizer.tokenize(text.lower())
    token_indices = [vocab.get(token, vocab["<UNK>"]) for token in tokens]
    token_indices = token_indices[:max_len] + [0] * (max_len - len(token_indices))
    return token_indices

def get_lexicon_features(text, lexicon, vocab, max_len):
    tokenizer = TreebankWordTokenizer()
    tokens = tokenizer.tokenize(text.lower())
    features = [lexicon.get(token, 0.0) for token in tokens]
    features = features[:max_len] + [0.0] * (max_len - len(features))
    return features


# Define Dataset Class
class SentimentDataset(Dataset):
    def __init__(self, texts, labels, vocab, lexicon, max_len=100):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.lexicon = lexicon
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        token_indices = preprocess_text(text, self.vocab, self.max_len)
        lexicon_features = get_lexicon_features(text, self.lexicon, self.vocab, self.max_len)
        return {
            "tokens": torch.tensor(token_indices, dtype=torch.long),
            "lexicon_features": torch.tensor(lexicon_features, dtype=torch.float),
            "label": torch.tensor(label, dtype=torch.long)
        }

In [None]:
# Path to IMDb dataset
base_dir = "/Users/sonnguyen/Desktop/Fall24/lign167/project/lign167/"  # Directory of the script
tar_path = os.path.join(base_dir, "imdb_dataset/aclImdb_v1.tar.gz")
extracted_path = os.path.join(base_dir, "imdb_dataset/aclImdb/aclImdb")

# Extract and load the dataset
extract_imdb_dataset(tar_path, extracted_path)
texts, labels = load_imdb_data(extracted_path)


# Tokenize and build vocabulary
tokenizer = TreebankWordTokenizer()
vocab = build_vocab(texts, tokenizer)


# Build the lexicon
lexicon = build_lexicon_from_swn()


# Create the PyTorch Dataset
dataset = SentimentDataset(texts, labels, vocab, lexicon, max_len=100)

In [3]:
lexicon

{'able': 0.125,
 'unable': -0.75,
 'abaxial': 0.0,
 'adaxial': 0.0,
 'acroscopic': 0.0,
 'basiscopic': 0.0,
 'abducent': 0.0,
 'adducent': 0.0,
 'nascent': 0.0,
 'emergent': 0.0,
 'dissilient': 0.25,
 'parturient': 0.25,
 'dying': 0.0,
 'moribund': 0.0,
 'last': 0.0,
 'abridged': 0.0,
 'cut': 0.0,
 'half-length': 0.0,
 'potted': 0.0,
 'unabridged': 0.0,
 'full-length': 0.5,
 'absolute': 0.5,
 'direct': 0.75,
 'implicit': 0.0,
 'infinite': 0.125,
 'living': 0.375,
 'relative': -0.25,
 'relational': 0.0,
 'absorbent': 0.0,
 'absorbefacient': 0.375,
 'assimilating': -0.75,
 'hygroscopic': 0.0,
 'receptive': -0.125,
 'shock-absorbent': 0.0,
 'spongy': 0.0,
 'thirsty': 0.0,
 'nonabsorbent': -0.5,
 'repellent': -0.5,
 'adsorbent': 0.0,
 'chemisorptive': -0.25,
 'nonadsorbent': -0.25,
 'absorbable': 0.5,
 'adsorbable': 0.25,
 'abstemious': 0.0,
 'abstinent': -0.625,
 'ascetic': 0.25,
 'gluttonous': 0.0,
 'crapulous': 0.0,
 'crapulent': -0.5,
 'edacious': 0.0,
 'greedy': 0.0,
 'hoggish': -0.12

In [4]:
class CNNWithLexicon(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_classes, lexicon_dim=1, kernel_sizes=[3, 4, 5], num_filters=100):
        super(CNNWithLexicon, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # Convolutional layers for text
        self.convs = nn.ModuleList([
            nn.Conv2d(1, num_filters, (k, embedding_dim + lexicon_dim))
            for k in kernel_sizes
        ])
        self.fc = nn.Linear(len(kernel_sizes) * num_filters, num_classes)
        self.dropout = nn.Dropout(0.5)
        self.softmax = nn.Softmax(dim=1)
    
    def forward(self, tokens, lexicon_features):
        # Word embeddings
        embed = self.embedding(tokens)  # [batch_size, seq_len, embedding_dim]
        
        # Combine word embeddings with lexicon features
        lexicon_features = lexicon_features.unsqueeze(-1)  # [batch_size, seq_len, 1]
        combined = torch.cat((embed, lexicon_features), dim=-1)  # [batch_size, seq_len, embedding_dim + lexicon_dim]
        
        # Add a channel dimension for CNN
        combined = combined.unsqueeze(1)  # [batch_size, 1, seq_len, embedding_dim + lexicon_dim]
        
        # Convolution and pooling
        conv_outputs = [
            torch.relu(conv(combined)).squeeze(3) for conv in self.convs
        ]  # List of [batch_size, num_filters, seq_len - kernel_size + 1]
        pooled_outputs = [torch.max(conv, dim=2)[0] for conv in conv_outputs]  # [batch_size, num_filters]
        
        # Concatenate and classify
        concatenated = torch.cat(pooled_outputs, dim=1)  # [batch_size, len(kernel_sizes) * num_filters]
        dropped = self.dropout(concatenated)
        output = self.fc(dropped)  # [batch_size, num_classes]
        return self.softmax(output)

In [5]:
# Split the Dataset
train_split = 0.8
train_size = int(train_split * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Display split information
print(f"Training samples: {len(train_dataset)}")
print(f"Testing samples: {len(test_dataset)}")


# Create model
vocab_size = len(vocab)
embedding_dim = 100
num_classes = 2

model = CNNWithLexicon(vocab_size, embedding_dim, num_classes)
sample_batch = next(iter(train_loader))
tokens = sample_batch['tokens']
lexicon_features = sample_batch['lexicon_features']
outputs = model(tokens, lexicon_features)

print("Model Outputs:", outputs)

Training samples: 40000
Testing samples: 10000
Model Outputs: tensor([[0.3235, 0.6765],
        [0.1042, 0.8958],
        [0.3507, 0.6493],
        [0.4043, 0.5957],
        [0.1861, 0.8139],
        [0.3174, 0.6826],
        [0.5031, 0.4969],
        [0.4350, 0.5650],
        [0.6352, 0.3648],
        [0.2937, 0.7063],
        [0.7657, 0.2343],
        [0.9307, 0.0693],
        [0.9252, 0.0748],
        [0.5533, 0.4467],
        [0.4714, 0.5286],
        [0.5683, 0.4317],
        [0.2497, 0.7503],
        [0.6957, 0.3043],
        [0.3937, 0.6063],
        [0.1394, 0.8606],
        [0.3118, 0.6882],
        [0.3103, 0.6897],
        [0.4219, 0.5781],
        [0.4100, 0.5900],
        [0.3278, 0.6722],
        [0.3143, 0.6857],
        [0.4965, 0.5035],
        [0.5376, 0.4624],
        [0.7980, 0.2020],
        [0.7103, 0.2897],
        [0.7137, 0.2863],
        [0.1026, 0.8974]], grad_fn=<SoftmaxBackward0>)


In [None]:
def train_model(model, dataloader, optimizer, criterion, epochs=5):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch in dataloader:
            tokens = batch['tokens']
            lexicon_features = batch['lexicon_features']
            labels = batch['label']
            
            optimizer.zero_grad()
            outputs = model(tokens, lexicon_features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(dataloader)}")

# Training example
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

train_model(model, train_loader, optimizer, criterion, epochs=20)