# **DataProcess**

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split
from collections import Counter
import torch
from torch.utils.data import Dataset, DataLoader
import csv

In [3]:
def tokenizer(text):
    # Split the text into words based on whitespace
    return text.split()

def numericalize(text, vocab):
    # Convert text into a list of numbers according to the given vocabulary.
    # If a word is not in the vocabulary, the '<unk>' token index is used.
    return [vocab.get(word, vocab['<unk>']) for word in tokenizer(text)]

def pad_collate_fn(batch, pad_idx):
    # Custom collate function to pad sequences in a batch to the same length.
    texts, labels = zip(*batch)
    text_lens = [len(text) for text in texts]
    max_len = max(text_lens)
    # Create a tensor filled with pad index values.
    padded_texts = torch.full((len(texts), max_len), pad_idx, dtype=torch.long)
    for i, text in enumerate(texts):
        padded_texts[i, :text_lens[i]] = text
    return padded_texts, torch.tensor(labels, dtype=torch.float)

class TextDataset(Dataset):
    def __init__(self, texts, labels, vocab):
        # Initialize dataset with texts, labels and vocabulary
        self.texts = texts
        self.labels = labels
        self.vocab = vocab

    def __len__(self):
        # Return the total number of samples in the dataset
        return len(self.texts)

    def __getitem__(self, idx):
        # Retrieve the numericalized text and corresponding label for a given index
        text = numericalize(self.texts[idx], self.vocab)
        label = self.labels[idx]
        return torch.tensor(text), torch.tensor(label, dtype=torch.float)

In [4]:
class DataLoaderFactory():
    def __init__(self, csv_path, text_col='text', label_col='generated',
                 train_ratio=0.7, val_ratio=0.15, test_ratio=0.15, random_state=42,
                 batch_size_train=32, batch_size_val=32, batch_size_test=1):
        # Initialize factory with file path and various ratios for training, validation, and test sets.
        self.csv_path = csv_path
        self.text_col = text_col
        self.label_col = label_col

        # Check if the sum of ratios for train, validation, and test equals 1
        if abs(train_ratio + val_ratio + test_ratio - 1.0) > 1e-5:
            raise ValueError("Train, validation and test ratios must sum to 1.0")

        self.train_ratio = train_ratio
        self.val_ratio = val_ratio
        self.test_ratio = test_ratio
        self.random_state = random_state
        self.batch_size_train = batch_size_train
        self.batch_size_val = batch_size_val
        self.batch_size_test = batch_size_test

        self.vocab = None
        self.train_loader = None
        self.val_loader = None
        self.test_loader = None

        # Prepare data and create dataloaders
        self._prepare_data()

    def _build_vocab(self, texts):
        # Build a vocabulary from the texts using a Counter to count word frequencies.
        counter = Counter()
        for text in texts:
            counter.update(tokenizer(text))
        # Create vocabulary mapping starting from index 2
        # (reserve index 0 for <unk> and index 1 for <pad>)
        vocab = {word: i+2 for i, (word, _) in enumerate(counter.items())}
        vocab['<unk>'] = 0
        vocab['<pad>'] = 1
        return vocab

    def _prepare_data(self):
        # Read the dataset from the CSV file.
        df = pd.read_csv(self.csv_path)
        texts = df[self.text_col].tolist()
        labels = df[self.label_col].tolist()

        # Build vocabulary from all texts.
        self.vocab = self._build_vocab(texts)

        # Split dataset into training and combined validation+test sets.
        texts_train, texts_val_test, labels_train, labels_val_test = train_test_split(
            texts, labels, test_size=self.test_ratio+self.val_ratio, random_state=self.random_state)

        # Split combined set into validation and test sets.
        texts_val, texts_test, labels_val, labels_test = train_test_split(
            texts_val_test, labels_val_test, test_size=self.test_ratio/(self.test_ratio+self.val_ratio), random_state=self.random_state)

        # Create TextDataset objects for training, validation, and test sets.
        train_dataset = TextDataset(texts_train, labels_train, self.vocab)
        val_dataset = TextDataset(texts_val, labels_val, self.vocab)
        test_dataset = TextDataset(texts_test, labels_test, self.vocab)

        # Create DataLoaders for each split.
        self.train_loader = DataLoader(
            train_dataset,
            batch_size=self.batch_size_train,
            shuffle=True,
            collate_fn=lambda batch: pad_collate_fn(batch, self.vocab['<pad>'])
        )
        self.val_loader = DataLoader(
            val_dataset,
            batch_size=self.batch_size_val,
            shuffle=False,
            collate_fn=lambda batch: pad_collate_fn(batch, self.vocab['<pad>'])
        )
        self.test_loader = DataLoader(
            test_dataset,
            batch_size=self.batch_size_test,
            shuffle=False,
            collate_fn=lambda batch: pad_collate_fn(batch, self.vocab['<pad>'])
        )

    def get_loaders(self):
        # Return the training, validation, and test dataloaders.
        return self.train_loader, self.val_loader, self.test_loader

In [6]:
    csv_file = '/content/drive/MyDrive/RNN_HW1/AI_Human.csv'
    # Initialize DataLoaderFactory with the CSV file path and split ratios.
    data_factory = DataLoaderFactory(csv_file, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15)
    train_loader, val_loader, test_loader = data_factory.get_loaders()

    # Print number of batches for each loader as a check.
    print(f"Train loader: {len(train_loader)} batches")
    print(f"Validation loader: {len(val_loader)} batches")
    print(f"Test loader: {len(test_loader)} batches")

Train loader: 10659 batches
Validation loader: 2284 batches
Test loader: 73086 batches


# **LSTM Train**

In [7]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from rich.progress import track
from torch.utils.tensorboard import SummaryWriter
import argparse

**LSTMClassifier**

In [8]:
class LSTMClassifier(nn.Module):
    """
    # This LSTMClassifier uses an embedding layer followed by a multi-layer LSTM for text classification.
    """
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, padding_idx, num_layers=1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, num_layers=num_layers)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, text):
        # Check if sequence length is zero and return zeros if so.
        if text.size(1) == 0:
            return torch.zeros(text.size(0), self.fc.out_features, device=text.device)
        # Convert input word indices to embeddings
        embedded = self.embedding(text)
        # Process embeddings through LSTM
        _, (hidden, _) = self.lstm(embedded)
        # Use the last hidden state for classification
        return self.fc(hidden[-1])

**ConvLSTMClassifier**

In [9]:
class ConvLSTMClassifier(nn.Module):
    """
    The ConvLSTMClassifier combines a convolutional layer with an LSTM for text classification.
    """
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, padding_idx, num_layers=1):
        super().__init__()
        # Create an embedding layer to convert word indices to dense vectors.
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx)
        # Create a 1D convolutional layer to capture local features from embeddings.
        self.conv = nn.Conv1d(in_channels=embedding_dim, out_channels=embedding_dim, kernel_size=3, padding=1)
        # Create an LSTM layer to capture sequential dependencies.
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)
        # Final linear layer to produce the output logits.
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, text):
        # Handle empty texts by returning zeros output.
        if text.size(1) == 0:
            return torch.zeros(text.size(0), self.fc.out_features, device=text.device)
        # Obtain word embeddings: shape [batch_size, seq_length, embedding_dim]
        embedded = self.embedding(text)
        # Rearrange dimensions for Conv1d: from [batch_size, seq_length, embedding_dim]
        # to [batch_size, embedding_dim, seq_length]
        conv_input = embedded.permute(0, 2, 1)
        # Apply convolution followed by ReLU activation: shape remains [batch_size, embedding_dim, seq_length]
        conv_out = torch.relu(self.conv(conv_input))
        # Rearrange back to LSTM input shape: [batch_size, seq_length, embedding_dim]
        conv_out = conv_out.permute(0, 2, 1)
        # Feed the convolution output into the LSTM; get the hidden states.
        _, (hidden, _) = self.lstm(conv_out)
        # Use the hidden state of the last LSTM layer for classification.
        return self.fc(hidden[-1])

**BiStackedLSTMClassifier**

In [10]:
class BiStackedLSTMClassifier(nn.Module):
    """
    BiStackedLSTMClassifier implements a bidirectional LSTM classifier.
    It applies an embedding layer followed by a bidirectional LSTM and a fully connected layer.
    """
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, padding_idx, num_layers=2):
        super().__init__()
        # Embedding layer to convert input token indices into dense vectors.
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx)
        # Bidirectional LSTM layer with the specified number of layers.
        # batch_first=True means the input shape is [batch_size, sequence_length, embedding_dim].
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, num_layers=num_layers, bidirectional=True)
        # Fully connected layer that maps the concatenated hidden states to the output dimension.
        self.fc = nn.Linear(hidden_dim * 2, output_dim)

    def forward(self, text):
        # If the input text has zero length, return a tensor of zeros with appropriate shape.
        if text.size(1) == 0:
            return torch.zeros(text.size(0), self.fc.out_features, device=text.device)
        # Convert input token indices into embeddings.
        embedded = self.embedding(text)
        # Pass embeddings through the bidirectional LSTM.
        # The output is ignored as we are only interested in the hidden states.
        _, (hidden, _) = self.lstm(embedded)
        # Concatenate the last forward and backward hidden states.
        hidden_cat = torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)
        # Pass the concatenated hidden states through the fully connected layer to get logits.
        return self.fc(hidden_cat)

**AttentionBiStackedLSTMClassifier**

In [11]:
class Attention(nn.Module):
    """
    Attention module that computes a context vector.
    It uses the current hidden state as the query and the encoder outputs as keys to
    calculate compatibility scores. The scores undergo a softmax to produce attention
    weights, which are then used to compute a weighted sum of the encoder outputs.
    """
    def __init__(self, hidden_dim):
        super().__init__()
        # Linear layer to transform the query (hidden state)
        self.W = nn.Linear(hidden_dim, hidden_dim)
        # Linear layer to transform the encoder outputs (keys)
        self.U = nn.Linear(hidden_dim, hidden_dim)
        # Learnable parameter for computing the compatibility score
        self.v = nn.Parameter(torch.randn(hidden_dim))

    def forward(self, hidden, encoder_outputs):
        # hidden: (batch_size, hidden_dim)
        # encoder_outputs: (batch_size, seq_length, hidden_dim)

        # Expand hidden to (batch_size, 1, hidden_dim) to use it as the query for attention
        hidden = hidden.unsqueeze(1)

        # Compute intermediate scores by applying a tanh activation on the sum of
        # transformed hidden (query) and encoder outputs (keys); shape: (batch_size, seq_length, hidden_dim)
        score = torch.tanh(self.W(hidden) + self.U(encoder_outputs))

        # Compute raw attention scores by taking the dot product with the learnable vector v;
        # resulting shape: (batch_size, seq_length)
        attention_weights = torch.matmul(score, self.v)

        # Apply softmax to obtain a probability distribution over the sequence length
        attention_weights = F.softmax(attention_weights, dim=1)

        # Compute the context vector as the weighted sum of encoder outputs according to the attention weights;
        # resulting shape: (batch_size, hidden_dim)
        context_vector = torch.sum(attention_weights.unsqueeze(-1) * encoder_outputs, dim=1)
        return context_vector

In [12]:
class AttenationBiStackedLSTMClassifier(nn.Module):
    """
    AttenationBiStackedLSTMClassifier implements a bidirectional LSTM classifier augmented with an attention mechanism.
    It utilizes an embedding layer, a bidirectional LSTM to capture both forward and backward context, and an attention module to combine the encoded sequence information.
    The resulting context vector is then passed through a fully connected layer to produce the final output for classification.
    """
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, padding_idx, num_layers=2):
        super().__init__()
        # Initialize the embedding layer to convert input indices to dense vectors.
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx)
        # Create a bidirectional LSTM layer.
        # Note: This generates outputs with dimension hidden_dim * 2 due to bidirectionality.
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, num_layers=num_layers, bidirectional=True)
        # Fully connected layer to map the context vector from attention to the desired output dimension.
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        # Attention module that operates on the concatenated hidden states (from both directions).
        self.attention = Attention(hidden_dim * 2)

    def forward(self, text):
        # Handle empty input: if the sequence length is zero, return a tensor of zeros.
        if text.size(1) == 0:
            return torch.zeros(text.size(0), self.fc.out_features, device=text.device)
        # Obtain embeddings from the input text.
        embedded = self.embedding(text)

        # Process the embeddings through the bidirectional LSTM.
        # outputs: tensor of shape [batch_size, seq_length, hidden_dim*2]
        # hidden: tensor of shape [num_layers*2, batch_size, hidden_dim]
        outputs, (hidden, _) = self.lstm(embedded)

        # Concatenate the last hidden state from the forward and backward passes.
        # hidden[-2, :, :] corresponds to the forward LSTM of the last layer.
        # hidden[-1, :, :] corresponds to the backward LSTM of the last layer.
        hidden_cat = torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)

        # Use the concatenated hidden state as query and the LSTM outputs as keys/values for attention.
        context = self.attention(hidden_cat, outputs)

        # Map the attention context vector to the output logits.
        return self.fc(context)

In [13]:
def train(model, iterator, optimizer, criterion, device, current_epoch, total_epochs):
    # Set the model to training mode
    model.train()
    epoch_loss = 0

    # Iterate over batches in the training data
    for texts, labels in track(iterator, description=f"[bold][cyan]Epoch {current_epoch}/{total_epochs}[/bold]"):
        # Move the texts and labels to the specified device (CPU or GPU)
        texts, labels = texts.to(device), labels.to(device)

        # Zero out gradients to prevent accumulation
        optimizer.zero_grad()

        # Perform a forward pass through the model
        predictions = model(texts).squeeze(1)

        # Calculate the loss between predictions and actual labels
        loss = criterion(predictions, labels)

        # Perform a backward pass to compute gradients
        loss.backward()

        # Update the model parameters using the computed gradients
        optimizer.step()

        # Accumulate the batch loss
        epoch_loss += loss.item()

    # Return the average loss for the epoch
    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion, device):
    # This function evaluates the model's performance on the given data iterator (e.g., validation set)
    model.eval()
    epoch_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for texts, labels in track(iterator, description="[bold][yellow]Validation[/bold]"):
            texts, labels = texts.to(device), labels.to(device)
            predictions = model(texts).squeeze(1)
            loss = criterion(predictions, labels)
            epoch_loss += loss.item()

            # Apply sigmoid to logits, round predictions, and compare with actual labels
            rounded_preds = torch.round(torch.sigmoid(predictions))
            correct += (rounded_preds == labels).sum().item()
            total += labels.size(0)

    return epoch_loss / len(iterator), correct / total

def test_best_model(model, test_loader, criterion, device, model_path="./models/model_best.pth", write_path="./runs/test/test_results.txt"):
    # Load the saved best model weights from disk.
    best_model_path = model_path
    model.load_state_dict(torch.load(best_model_path))
    model.to(device)
    model.eval()  # Set the model to evaluation mode.

    total_loss = 0
    correct = 0
    total = 0
    # Evaluate the model on the test dataset.
    with torch.no_grad():
        for texts, labels in track(test_loader, description="[bold][yellow]Testing[/bold]"):
            texts, labels = texts.to(device), labels.to(device)
            outputs = model(texts).squeeze(1)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            # Convert model outputs to probabilities then round them to obtain final predictions.
            preds = torch.round(torch.sigmoid(outputs))
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    # Calculate average loss and accuracy.
    avg_loss = total_loss / len(test_loader)
    accuracy = correct / total
    print(f'Final Test Loss: {avg_loss:.4f}, Final Test Accuracy: {accuracy:.4f}')

    # Append the test results to a file.
    with open(write_path, 'a') as f:
        f.write(f'\nFinal Test Loss: {avg_loss:.4f}, Final Test Accuracy: {accuracy:.4f}')


In [18]:
test_model = './models/lstm_model5.pth'
csv_path = '/content/drive/MyDrive/RNN_HW1/AI_Human.csv'
log_path = 'runs/lstm_experiment5'
best_model_path = './models/lstm_model5_best.pth'
final_model_path = './models/lstm_model5.pth'
test_model_path = './models/lstm_model5.pth'
write_result_path = './runs/test/test_results.txt'
model_folder = './models'
model_name = 'lstm_model5'


# Create directories if they do not exist
if not os.path.exists(log_path):
    os.makedirs(log_path)

if not os.path.exists(model_folder):
    os.makedirs(model_folder)

data_loader_factory = DataLoaderFactory(csv_path, batch_size_train=64, batch_size_val=64)
train_loader = data_loader_factory.train_loader
test_loader = data_loader_factory.test_loader
val_loader = data_loader_factory.val_loader

vocab = data_loader_factory.vocab

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# **switch your Classifier **
LSTMClassifier ConvLSTMClassifier BiStackedLSTMClassifier AttenationBiStackedLSTMClassifier

In [None]:
embedding_dim = 100
hidden_dim = 256
output_dim = 1
num_layers = 1

# switch your Classifier # LSTMClassifier ConvLSTMClassifier BiStackedLSTMClassifier AttenationBiStackedLSTMClassifier
model = LSTMClassifier(
    vocab_size=len(vocab),
    embedding_dim=embedding_dim,
    hidden_dim=hidden_dim,
    output_dim=output_dim,
    padding_idx=vocab['<pad>'],
    num_layers=num_layers
).to(device)

criterion = nn.BCEWithLogitsLoss().to(device)
optimizer = torch.optim.Adam(model.parameters())
writer = SummaryWriter(log_path)

best_acc = 0.0
total_epochs = 50
# Training loop
for epoch in range(1, total_epochs+1):
    train_loss = train(model, train_loader, optimizer, criterion, device, epoch, total_epochs)
    print(f'Epoch: {epoch}, Train Loss: {train_loss:.4f}')

    test_loss, test_acc = evaluate(model, val_loader, criterion, device)
    print(f'Epoch: {epoch}, Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}')

    writer.add_scalar('Loss/Train', train_loss, epoch)
    writer.add_scalar('Loss/Test', test_loss, epoch)
    writer.add_scalar('Accuracy/Test', test_acc, epoch)

    # Save best model based on test accuracy
    if test_acc > best_acc:
        best_acc = test_acc
        torch.save(model.state_dict(), best_model_path)
        print(f'Best model saved at epoch {epoch} with Test Acc: {test_acc:.4f}')

    # Save model every 10 epochs with a custom name
    if epoch % 10 == 0:
        torch.save(model.state_dict(), model_folder + f'/{model_name}_{epoch}.pth')
        print(f'Model saved at epoch {epoch} as {model_name}_{epoch}.pth')

writer.close()

# Save the final model
torch.save(model.state_dict(), final_model_path)

if test_model:
    test_best_model(model, test_loader, criterion, device, model_path=test_model_path, write_path=write_result_path)

Output()

Output()

Epoch: 1, Train Loss: 0.2956


Epoch: 1, Test Loss: 0.0245, Test Acc: 0.9924


Output()

Best model saved at epoch 1 with Test Acc: 0.9924


Output()

Epoch: 2, Train Loss: 0.0141


Epoch: 2, Test Loss: 0.0135, Test Acc: 0.9966


Output()

Best model saved at epoch 2 with Test Acc: 0.9966


Output()

Epoch: 3, Train Loss: 0.0069


Epoch: 3, Test Loss: 0.0065, Test Acc: 0.9982


Output()

Best model saved at epoch 3 with Test Acc: 0.9982


Output()

Epoch: 4, Train Loss: 0.0030


Epoch: 4, Test Loss: 0.0030, Test Acc: 0.9990


Output()

Best model saved at epoch 4 with Test Acc: 0.9990


Output()

Epoch: 5, Train Loss: 0.0019


Output()

Epoch: 5, Test Loss: 0.0039, Test Acc: 0.9989


Output()

Epoch: 6, Train Loss: 0.0013


Epoch: 6, Test Loss: 0.0038, Test Acc: 0.9991


Output()

Best model saved at epoch 6 with Test Acc: 0.9991


Output()

Epoch: 7, Train Loss: 0.0011


Output()

Epoch: 7, Test Loss: 0.0035, Test Acc: 0.9991


Output()

Epoch: 8, Train Loss: 0.0009


Output()

Epoch: 8, Test Loss: 0.0049, Test Acc: 0.9986


Output()

Epoch: 9, Train Loss: 0.0007


Epoch: 9, Test Loss: 0.0023, Test Acc: 0.9994


Output()

Best model saved at epoch 9 with Test Acc: 0.9994


Output()

Epoch: 10, Train Loss: 0.0005


Epoch: 10, Test Loss: 0.0036, Test Acc: 0.9992


Output()

Model saved at epoch 10 as lstm_model5_10.pth


In [None]:
test_best_model(model, test_loader, criterion, device, model_path=test_model_path, write_path=write_result_path)