In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import spacy
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset

In [2]:
def load_glove_embeddings(glove_file):
    embeddings = {}
    with open(glove_file, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            vector = np.asarray(values[1:], dtype='float32')
            embeddings[word] = vector
    return embeddings


#GLOVE_PATH = 'C:\\Users\\Lee Ming Jia\\Desktop\\GloVe\\glove.6B.100d.txt'
glove_embeddings = load_glove_embeddings('C:\\Users\\Lee Ming Jia\\Desktop\\GloVe\\glove.6B.100d.txt')
embedding_dim = 100  # GloVe 100D embeddings

In [3]:
# Initialize spaCy for tokenization
nlp = spacy.load("en_core_web_sm")

def tokenize(text):
    return [token.text.lower() for token in nlp(text) if not token.is_punct and not token.is_stop]

def pad_sequence(sequence, max_len):
    return sequence + ['<pad>'] * (max_len - len(sequence))

def preprocess_data(texts, labels, max_len):
    tokenized_texts = [tokenize(text) for text in texts]
    padded_texts = [pad_sequence(text, max_len) for text in tokenized_texts]
    return padded_texts, labels

In [4]:
class SentimentDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_len):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        # Convert text to indices using the vocab
        indices = [self.vocab.get(word, self.vocab['<unk>']) for word in text]
        return torch.tensor(indices), torch.tensor(label)

In [5]:
def build_vocab(texts, glove_embeddings, max_vocab_size=20000):
    vocab = {'<pad>': 0, '<unk>': 1}
    for text in texts:
        for word in text:
            if word not in vocab and len(vocab) < max_vocab_size:
                vocab[word] = len(vocab)
    # Add GloVe embeddings for words in the vocab
    embedding_matrix = np.random.uniform(-0.25, 0.25, (len(vocab), embedding_dim))
    for word, idx in vocab.items():
        if word in glove_embeddings:
            embedding_matrix[idx] = glove_embeddings[word]
    return vocab, embedding_matrix

In [6]:

class CNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_filters, filter_sizes, output_dim, dropout, pad_idx):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)
        self.embedding.weight.data.copy_(torch.tensor(embedding_matrix))  # Initialize with pre-trained GloVe vectors

        self.convs = nn.ModuleList([
            nn.Conv2d(in_channels=1, out_channels=num_filters, kernel_size=(fs, embed_dim))
            for fs in filter_sizes
        ])

        self.fc = nn.Linear(len(filter_sizes) * num_filters, output_dim)
        self.dropout = nn.Dropout(dropout)

 #   def forward(self, text):
 #       embedded = self.embedding(text).unsqueeze(1)  # Add channel dimension
  #      conved = [torch.relu(conv(embedded)).squeeze(3) for conv in self.convs]
  #      pooled = [torch.max(conv, dim=2)[0] for conv in conved]
  #      cat = self.dropout(torch.cat(pooled, dim=1))
  #      return self.fc(cat)
    
    
    def forward(self, text):
        # Ensure indices are within the valid range
        max_vocab_index = len(self.embedding.weight) - 1
        text = torch.clamp(text, max=max_vocab_index)  # Clamp indices to the valid range

        # Apply embedding
        embedded = self.embedding(text).unsqueeze(1)  # Add channel dimension
        conved = [torch.relu(conv(embedded)).squeeze(3) for conv in self.convs]
        pooled = [torch.max(conv, dim=2)[0] for conv in conved]

        # Concatenate pooled features and pass through fully connected layer
        cat = torch.cat(pooled, dim=1)
        output = self.fc(cat)
        return output

In [7]:
import re
import unidecode

def load_contractions(file_path):
    contractions = {}
    with open(file_path, "r") as file:
        for line in file:
            line = line.strip()
            if line:
                contraction, expansion = line.split(":")
                contraction = contraction.strip()
                expansion = [word.strip() for word in expansion.split(",")]
                contractions[contraction] = expansion
    return contractions

def normalize_words(text): # stupid but works
    words = []
    contractions = load_contractions("contractions.txt")
    text = unidecode.unidecode(text) # e.g. "café" -> "cafe"
    text = text.lower()
    for contraction, expansion in contractions.items():
        text = re.sub(rf"\b{contraction}\b", " ".join(expansion), text) # e.g. "I'm" -> "I am"
    text = re.sub(r"[`\[\]\"]", "", text) # e.g. "`rock[n]roll`" -> "rocknroll"
    text = re.sub(r"'s(?=[^a-zA-Z]|$)", r" 's ", text) # e.g. "John's" -> "John 's"
    text = re.sub(r"s'(?=[^a-zA-Z]|$)", r"s 's ", text) # e.g. "dogs'" -> "dogs 's"
    text = re.sub(r"[/-]", " ", text) # e.g. "rock-n-roll" -> "rock n roll"
    text = re.sub(r"([@#&%+:,.?!$€£¥\(\)])", r" \1 ", text) # e.g. "rock&roll" -> "rock & roll"
    text = re.sub(r"(?<!s)'(?!s\b\s|s$)", "", text) # e.g. "rock'n'roll" -> "rocknroll"
    for word in text.split():
        words.append(word)
    return words


In [8]:
import numpy as np
import torch
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn

class GloveTokenizer:
    def __init__(self, glove_file_path, pad_token="<PAD>", unk_token="<UNK>"):
        # Load GloVe embeddings from file
        self.embeddings_index = self._load_glove_embeddings(glove_file_path)
        self.pad_token = pad_token
        self.pad_token_id = 0
        self.unk_token = unk_token
        self.unk_token_id = 1

        # Create word-to-index and index-to-word dictionaries
        self.word_index = {word: idx for idx, word in enumerate(self.embeddings_index.keys(), start=2)}
        self.word_index[self.pad_token] = self.pad_token_id
        self.word_index[self.unk_token] = self.unk_token_id
        self.index_word = {idx: word for word, idx in self.word_index.items()}

    def _load_glove_embeddings(self, glove_file_path):
        # Load the GloVe embeddings from file into a dictionary
        embeddings_index = {}
        with open(glove_file_path, 'r', encoding="utf-8") as f:
            for line in f:
                values = line.split()
                word = values[0]
                vector = np.asarray(values[1:], dtype='float32')
                embeddings_index[word] = vector
        return embeddings_index

    def _tokenize_with_subwords(self, word):
        # If word exists in vocabulary, return its index
        if word in self.word_index:
            return [self.word_index[word]]

        # Otherwise, tokenize word into subwords and try to match each subword
        subword_tokens = []
        subwords = normalize_words(word)
        for subword in subwords:
            if subword in self.word_index:
                subword_tokens.append(self.word_index[subword])

        # Return indices of matched subwords, or UNK token if no subwords matched
        if subword_tokens:
            return subword_tokens
        else:
            return [self.unk_token_id]

    def encode(self, texts, max_length=None, return_tensors="list"):
        # Convert single string to list for consistent handling
        if isinstance(texts, str):
            texts = [texts]

        # Tokenize and encode each text in batch
        all_input_ids = []
        for text in texts:
            input_ids = []
            for word in text.split():
                input_ids.extend(self._tokenize_with_subwords(word))
            all_input_ids.append(input_ids)

        # Pad sequences and create attention masks
        input_ids_padded = pad_sequence(
            [torch.tensor(seq, dtype=torch.long) for seq in all_input_ids],
            batch_first=True,
            padding_value=self.pad_token_id
        )

        attention_mask = (input_ids_padded != self.pad_token_id).long()

        # Trim/pad to max_length if specified
        if max_length:
            input_ids_padded = input_ids_padded[:, :max_length]
            attention_mask = attention_mask[:, :max_length]

            if input_ids_padded.shape[1] < max_length:
                pad_size = max_length - input_ids_padded.shape[1]
                padding = torch.full((input_ids_padded.shape[0], pad_size), self.pad_token_id, dtype=torch.long)
                input_ids_padded = torch.cat([input_ids_padded, padding], dim=1)
                
                mask_padding = torch.zeros((attention_mask.shape[0], pad_size), dtype=torch.long)
                attention_mask = torch.cat([attention_mask, mask_padding], dim=1)

        # Return in specified format
        if return_tensors == "pt":
            return {"input_ids": input_ids_padded, "attention_mask": attention_mask}
        else:
            return {"input_ids": input_ids_padded.tolist(), "attention_mask": attention_mask.tolist()}

    def decode(self, token_ids_batch, skip_special_tokens=False):
        # Convert tensor to list for consistent handling
        if isinstance(token_ids_batch, torch.Tensor):
            if token_ids_batch.dim() != 2:
                raise ValueError("Input tensor must be 2-dimensional.")
            token_ids_batch = token_ids_batch.tolist()

        # Ensure batch format
        if isinstance(token_ids_batch[0], int):
            token_ids_batch = [token_ids_batch]

        # Decode each sequence in batch
        all_texts = []
        for token_ids in token_ids_batch:
            words = []
            for idx in token_ids:
                word = self.index_word.get(idx, self.unk_token)
                if skip_special_tokens and word == self.pad_token:
                    continue
                words.append(word)
            all_texts.append(" ".join(words))

        # Return single or list of decoded texts
        return all_texts if len(all_texts) > 1 else all_texts[0]

class GloveTokenizerNoSub(GloveTokenizer):
    def __init__(self, glove_file_path, pad_token="<PAD>", unk_token="<UNK>"):
        super().__init__(glove_file_path, pad_token, unk_token)

    def _tokenize_with_subwords(self, word):
        if word in self.word_index:
            return [self.word_index[word]]
        else:
            return [self.unk_token_id]

class GloveEmbedding(nn.Module):
    def __init__(self, glove_file_path, embedding_dim=100, trainable=False, pad_token="<PAD>", unk_token="<UNK>"):
        super(GloveEmbedding, self).__init__()
        # Initialize vocabulary and embedding matrix with GloVe embeddings
        self.word_index, embedding_matrix = self._load_glove_embeddings(glove_file_path, embedding_dim, pad_token, unk_token)
        
        # Set up embedding layer with GloVe weights
        vocab_size = embedding_matrix.shape[0]
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding.weight.data.copy_(torch.from_numpy(embedding_matrix))
        self.embedding.weight.requires_grad = trainable

    def _load_glove_embeddings(self, glove_file_path, embedding_dim, pad_token, unk_token):
        # Load GloVe embeddings and build vocabulary and embedding matrix
        embeddings_index = {}
        with open(glove_file_path, 'r', encoding="utf-8") as f:
            for line in f:
                values = line.split()
                word = values[0]
                vector = np.asarray(values[1:], dtype='float32')
                embeddings_index[word] = vector
        
        # Create word index, and initialize embedding matrix
        word_index = {word: idx for idx, word in enumerate(embeddings_index.keys(), start=2)}
        word_index[pad_token] = 0
        word_index[unk_token] = 1

        vocab_size = len(word_index)
        embedding_matrix = np.zeros((vocab_size, embedding_dim))
        for word, idx in word_index.items():
            embedding_vector = embeddings_index.get(word)
            if embedding_vector is not None:
                embedding_matrix[idx] = embedding_vector
            elif word == unk_token:
                embedding_matrix[idx] = np.random.normal(size=(embedding_dim,))
            elif word == pad_token:
                embedding_matrix[idx] = np.zeros(embedding_dim)
        
        return word_index, embedding_matrix

    def forward(self, x):
        # Perform embedding lookup
        return self.embedding(x)

def test(tokenizer):
    embedding_layer = GloveEmbedding(glove_file_path, embedding_dim=100)

    # Example batch of texts
    texts = ["hello world this is <UNK> test",
            "another example sentence for testing",
            "batch processing with <PAD> and <UNK> tokens",
            "Check the price of the new gadget ($199) at 'Tech-Store', " \
        "and don't forget to use the discount code 'SAVE20' for 20% off on your next purchase! " \
        "For more info, call #123 or visit www.tech-store.com & sign up."]

    # Encode batch with tokenizer, returning list format
    encoded_list = tokenizer.encode(texts, max_length=60, return_tensors="list")
    print("Encoded Batch (List):", encoded_list)

    # Encode batch with tokenizer, returning torch tensor format
    encoded_tensor = tokenizer.encode(texts, max_length=60, return_tensors="pt")
    print("Encoded Batch (Torch):", encoded_tensor)
    print("Shape:", encoded_tensor["input_ids"].shape, encoded_tensor["attention_mask"].shape)

    # Decode batch with tokenizer
    decoded_texts = tokenizer.decode(encoded_tensor["input_ids"])
    print("Decoded Batch (Tensor):", decoded_texts)
    
    # Decode list format
    decoded_texts = tokenizer.decode(encoded_list["input_ids"])
    print("Decoded Batch (List):", decoded_texts)
    
    # Pass encoded input_ids through embedding layer
    embedded_output = embedding_layer(encoded_tensor["input_ids"])
    print("Embedded Output Shape (Batch):", embedded_output.shape)
    print("Embedded Output Shape (Batch):", embedded_output)
    
    # Verify alignment of embeddings
    test_word = "text"  # Word to check
    if test_word in tokenizer.word_index:
        test_index = tokenizer.word_index[test_word]
        
        # Retrieve GloVe vector
        glove_vector = torch.tensor(tokenizer.embeddings_index[test_word], dtype=torch.float32)
        
        # Retrieve nn.Embedding vector
        embedding_vector = embedding_layer.embedding.weight[test_index]
        
        # Print and compare vectors
        print(f"GloVe vector for '{test_word}':", glove_vector)
        print(f"Embedding vector for '{test_word}':", embedding_vector)
        
        # Check if vectors align
        if torch.allclose(glove_vector, embedding_vector, atol=1e-6):
            print(f"The embedding for '{test_word}' is correctly aligned with the original GloVe vector.")
        else:
            print(f"The embedding for '{test_word}' is NOT aligned with the original GloVe vector.")
    else:
        print(f"The word '{test_word}' is not in the tokenizer's vocabulary.")


In [9]:
# Load dataset

from datasets import load_dataset
dataset = load_dataset ("rotten_tomatoes")
train_data = dataset ['train']
val_data = dataset ['validation' ]
test_data = dataset ['test']

print(train_data[0])

{'text': 'the rock is destined to be the 21st century\'s new " conan " and that he\'s going to make a splash even greater than arnold schwarzenegger , jean-claud van damme or steven segal .', 'label': 1}


In [10]:
import numpy as np

def load_glove_embeddings(file_path, embedding_dim=100):
    embeddings_index = {}
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            vector = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = vector
    return embeddings_index

# Load the GloVe embeddings (adjust the file path as needed)
glove_file_path = 'C:\\Users\\Lee Ming Jia\\Desktop\\GloVe\\glove.6B.100d.txt'
glove_embeddings = load_glove_embeddings(glove_file_path)
print(f"Loaded {len(glove_embeddings)} words from GloVe embeddings.")

Loaded 400000 words from GloVe embeddings.


In [11]:
import torch
import numpy as np
from torch.utils.data import Dataset

class TextDataset(Dataset):
    def __init__(self, texts, labels, glove_embeddings, max_len=60, unk_token="<UNK>", pad_token="<PAD>"):
        self.texts = texts
        self.labels = labels
        self.glove_embeddings = glove_embeddings
        self.max_len = max_len
        self.unk_token = unk_token
        self.pad_token = pad_token

        # GloVe vectors for unknown and padding tokens
        self.unk_vector = np.random.normal(size=(100,))  # Random vector for OOV words
        self.pad_vector = np.zeros((100,))  # Zero vector for padding

    def tokenize_text(self, text):
        # Tokenize text into words and convert to GloVe vectors
        words = text.split()  # Simple whitespace tokenizer
        vectors = []
        for word in words:
            word = word.lower()  # Convert to lowercase to standardize
            if word in self.glove_embeddings:
                vectors.append(self.glove_embeddings[word])  # Use GloVe vector for known words
            else:
                vectors.append(self.unk_vector)  # Use random vector for unknown words
        return vectors

    def pad_sequence(self, sequence):
        # Pad sequence to max_len
        if len(sequence) < self.max_len:
            # Pad with pad_token vectors
            sequence.extend([self.pad_vector] * (self.max_len - len(sequence)))
        else:
            sequence = sequence[:self.max_len]  # Trim to max_len if sequence is too long
        return sequence

    def __getitem__(self, idx):
        # Fetch the text and its label for the given index
        text = self.texts[idx]
        label = self.labels[idx]

        # Tokenize and pad the text
        tokenized_text = self.tokenize_text(text)
        padded_text = self.pad_sequence(tokenized_text)

        # Convert to torch tensors (embeddings are float32, labels are long)
        return torch.tensor(padded_text, dtype=torch.float32), torch.tensor(label, dtype=torch.long)

    def __len__(self):
        return len(self.texts)

In [14]:
from datasets import load_dataset

# Load the Rotten Tomatoes dataset (you can replace with your dataset)
dataset = load_dataset("rotten_tomatoes")
train_data = dataset["train"]
val_data = dataset["validation"]
test_data = dataset["test"]

# Extract texts and labels
train_texts = train_data["text"]
train_labels = train_data["label"]
val_texts = val_data["text"]
val_labels = val_data["label"]
test_texts = test_data["text"]
test_labels = test_data["label"]

vocab, embedding_matrix = build_vocab(train_texts, glove_embeddings)

# Create the dataset
train_dataset = TextDataset(train_texts, train_labels, glove_embeddings, max_len=60)
val_dataset = TextDataset(val_texts, val_labels, glove_embeddings, max_len=60)
test_dataset = TextDataset(test_texts, test_labels, glove_embeddings, max_len=60)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [25]:
# Example: iterate through the training data
#for batch_idx, (texts, labels) in enumerate(train_loader):
#    print(f"Batch {batch_idx + 1}")
#    print(f"Text Shape: {texts.shape}")  # Shape: (batch_size, max_len, embedding_dim)
#    print(f"Labels Shape: {labels.shape}")
#    break  # Just show the first batch for example
    
    
# Example: iterate through the training data
for batch_idx, (texts, labels) in enumerate(train_loader):
    print(f"Batch {batch_idx + 1}")
    print(f"Text Shape: {texts.shape}")  # Shape: (batch_size, max_len)
    print(f"Labels Shape: {labels.shape}")

    # Ensure that indices are within the valid range
    max_vocab_index = len(vocab) - 1  # The highest valid index in the vocab
    texts = torch.clamp(texts, max=max_vocab_index)  # Clamp indices to be within the valid range

    # Pass the clamped texts to the model
    predictions = model(texts)

    break  # Just show the first batch for example

Batch 1
Text Shape: torch.Size([32, 60, 100])
Labels Shape: torch.Size([32])


In [15]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = CNN(len(vocab), embedding_dim, 100, [3, 4, 5], 1, 0.5, vocab['<pad>']).to(device)

optimizer = optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss().to(device)

def binary_accuracy(preds, y):
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y).float()
    return correct.sum() / len(correct)

def train(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    epoch_acc = 0

    for batch in iterator:
        optimizer.zero_grad()
        
        # Get the text and label from the batch
        text, label = batch
        
        # Convert text to LongTensor for the embedding layer
        text = text.long()
        
        # Forward pass through the model
        predictions = model(text).squeeze(1)
        
        # Compute loss and accuracy
        loss = criterion(predictions, label)
        acc = binary_accuracy(predictions, label)

        # Backward pass and optimization step
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
        
        
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)


def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    epoch_acc = 0

    with torch.no_grad():
        for batch in iterator:
            text, label = batch
            
            # Ensure text is a LongTensor for the embedding layer
            text = text.long()
            
            predictions = model(text).squeeze(1)
            loss = criterion(predictions, label)
            acc = binary_accuracy(predictions, label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
# Ensure texts are within the valid index range
max_vocab_index = len(vocab) - 1  # The highest valid index
texts = torch.clamp(texts, max=max_vocab_index)

# Ensure the tensor type is LongTensor
texts = texts.long()

# Pass the clamped texts to the model
predictions = model(texts)

In [18]:
print(f"Vocabulary Size: {len(vocab)}")
print(f"Embedding Layer Size: {model.embedding.num_embeddings}")

Vocabulary Size: 87
Embedding Layer Size: 87


In [19]:
# Check for NaNs or values greater than vocab size
print(torch.isnan(texts).any())  # Check if there are NaNs
print((texts >= len(vocab)).any())  # Check if any values exceed the vocab size

tensor(False)
tensor(False)


In [23]:
# Example: iterate through the training data
#for batch_idx, (texts, labels) in enumerate(train_loader):
#    print(f"Batch {batch_idx + 1}")
#    print(f"Text Shape: {texts.shape}")  # Shape: (batch_size, max_len, embedding_dim)
#    print(f"Labels Shape: {labels.shape}")
#    break  # Just show the first batch for example
    
    
# Example: iterate through the training data
for batch_idx, (texts, labels) in enumerate(train_loader):
    print(f"Batch {batch_idx + 1}")
    print(f"Text Shape: {texts.shape}")  # Shape: (batch_size, max_len)
    print(f"Labels Shape: {labels.shape}")

    # Ensure that indices are within the valid range
    #max_vocab_index = len(vocab) - 1  # The highest valid index in the vocab
    #texts = torch.clamp(texts, max=max_vocab_index)  # Clamp indices to be within the valid range
    
    # Assuming texts are indices
    texts = texts.view(-1, texts.size(2))  # Flatten to [batch_size * sequence_length, feature_size]

    # Ensure indices are within the valid range
    texts = torch.clamp(texts, 0, max_vocab_index)

    
    # Ensure that texts are of type LongTensor (indices for embeddings)
    texts = texts.long()


    # Pass the clamped texts to the model
    predictions = model(texts)

    break  # Just show the first batch for example
    
    


Batch 1
Text Shape: torch.Size([32, 60, 100])
Labels Shape: torch.Size([32])


In [24]:
N_EPOCHS = 5
for epoch in range(N_EPOCHS):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_loader, criterion)
    print(f'Epoch: {epoch+1}, Train Loss: {train_loss:.3f}, Train Acc: {train_acc*100:.2f}%, Val. Loss: {valid_loss:.3f}, Val. Acc: {valid_acc*100:.2f}%')

IndexError: index out of range in self

In [None]:
test_loss, test_acc = evaluate(model, test_loader, criterion)
print(f'Test Loss: {test_loss:.3f}, Test Acc: {test_acc*100:.2f}%')