# Model 1: Siamese BiLSTM for Natural Language Inference - Google Colab Version

Binary classification: entails (0) vs neutral (1)


In [None]:
# check if running on colab and gpu availability
import os
try:
    import google.colab
    IN_COLAB = True
    print("Running on Google Colab")
except:
    IN_COLAB = False
    print("Not running on Google Colab")

import torch
print(f"\nPyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")

if torch.cuda.is_available():
    print(f"CUDA version: {torch.version.cuda}")
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
else:
    print("WARNING: GPU not detected!")
    print("Please enable GPU: Runtime → Change runtime type → Hardware accelerator → GPU")


In [None]:
# upload preprocessed data file
if IN_COLAB:
    from google.colab import files
    print("Please upload your preprocessed_data.pkl file:")
    uploaded = files.upload()
    
    if 'preprocessed_data.pkl' in uploaded:
        print(f"✓ Successfully uploaded preprocessed_data.pkl ({len(uploaded['preprocessed_data.pkl']) / 1e6:.2f} MB)")
    else:
        print("❌ Error: preprocessed_data.pkl not found. Please upload it.")
else:
    # assume file is in current directory when not on colab
    if os.path.exists('preprocessed_data.pkl'):
        print("✓ preprocessed_data.pkl found in current directory")
    else:
        print("❌ Error: preprocessed_data.pkl not found in current directory")


In [None]:
# additional imports
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pickle
import numpy as np
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt

print("✓ All packages imported successfully")

In [None]:
# device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"✓ Using device: {device}")

if torch.cuda.is_available():
    print(f"  GPU: {torch.cuda.get_device_name(0)}")
    print(f"  GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
    USE_GPU = True
else:
    print("  No GPU available, training will be slower")
    USE_GPU = False

Using device: cpu


In [3]:
# load preprocessed data
with open('preprocessed_data.pkl', 'rb') as f:
    data = pickle.load(f)

train_premise_idx = data['train_premise_idx']
train_hypothesis_idx = data['train_hypothesis_idx']
train_labels = data['train_labels']

test_premise_idx = data['test_premise_idx']
test_hypothesis_idx = data['test_hypothesis_idx']
test_labels = data['test_labels']

val_premise_idx = data['val_premise_idx']
val_hypothesis_idx = data['val_hypothesis_idx']
val_labels = data['val_labels']

word_to_ix = data['word_to_ix']
vocab_size = data['vocab_size']
label_to_ix = data['label_to_ix']
ix_to_label = data['ix_to_label']

print(f"Training examples: {len(train_premise_idx)}")
print(f"Validation examples: {len(val_premise_idx)}")
print(f"Test examples: {len(test_premise_idx)}")
print(f"Vocabulary size: {vocab_size}")


Training examples: 23088
Validation examples: 1304
Test examples: 2126
Vocabulary size: 20499


In [None]:
# check sequence length statistics
premise_lens = [len(p) for p in train_premise_idx]
hyp_lens = [len(h) for h in train_hypothesis_idx]

print(f"Premise lengths - Max: {max(premise_lens)}, Mean: {np.mean(premise_lens):.1f}, 95th percentile: {np.percentile(premise_lens, 95):.1f}")
print(f"Hypothesis lengths - Max: {max(hyp_lens)}, Mean: {np.mean(hyp_lens):.1f}, 95th percentile: {np.percentile(hyp_lens, 95):.1f}")
print(f"\nNote: Sequences will be truncated to {200} tokens to manage memory")


In [4]:
# dataset class
class NLIDataset(Dataset):
    def __init__(self, premise_idx, hypothesis_idx, labels):
        self.premise_idx = premise_idx
        self.hypothesis_idx = hypothesis_idx
        self.labels = labels
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return {
            'premise': self.premise_idx[idx],
            'hypothesis': self.hypothesis_idx[idx],
            'label': self.labels[idx]
        }


In [None]:
# collate function for padding with length limiting
MAX_SEQ_LENGTH = 200  # limit sequence length to prevent memory issues

def collate_fn(batch):
    # extract sequences and labels
    premises = [item['premise'] for item in batch]
    hypotheses = [item['hypothesis'] for item in batch]
    labels = [item['label'] for item in batch]
    
    # truncate sequences if too long
    premises = [p[:MAX_SEQ_LENGTH] for p in premises]
    hypotheses = [h[:MAX_SEQ_LENGTH] for h in hypotheses]
    
    # get max lengths in this batch
    max_premise_len = max(len(p) for p in premises)
    max_hypothesis_len = max(len(h) for h in hypotheses)
    
    # pad sequences
    padded_premises = []
    for p in premises:
        padded = p + [0] * (max_premise_len - len(p))
        padded_premises.append(padded)
    
    padded_hypotheses = []
    for h in hypotheses:
        padded = h + [0] * (max_hypothesis_len - len(h))
        padded_hypotheses.append(padded)
    
    # convert to tensors
    premises_tensor = torch.LongTensor(padded_premises)
    hypotheses_tensor = torch.LongTensor(padded_hypotheses)
    labels_tensor = torch.LongTensor(labels)
    
    return premises_tensor, hypotheses_tensor, labels_tensor


In [None]:
# create datasets
train_dataset = NLIDataset(train_premise_idx, train_hypothesis_idx, train_labels)
val_dataset = NLIDataset(val_premise_idx, val_hypothesis_idx, val_labels)
test_dataset = NLIDataset(test_premise_idx, test_hypothesis_idx, test_labels)

# hyperparameters - automatically adjust batch size based on GPU
BATCH_SIZE = 64 if USE_GPU else 32  # use 64 for GPU, 32 for CPU
EMBEDDING_DIM = 128
HIDDEN_DIM = 256
NUM_CLASSES = 2
LEARNING_RATE = 0.001
NUM_EPOCHS = 10

print(f"Hyperparameters:")
print(f"  Batch size: {BATCH_SIZE}")
print(f"  Embedding dim: {EMBEDDING_DIM}")
print(f"  Hidden dim: {HIDDEN_DIM}")
print(f"  Learning rate: {LEARNING_RATE}")
print(f"  Number of epochs: {NUM_EPOCHS}")

# create dataloaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

print(f"\nDataloaders created:")
print(f"  Training batches: {len(train_loader)}")
print(f"  Validation batches: {len(val_loader)}")
print(f"  Test batches: {len(test_loader)}")


Number of training batches: 361
Number of validation batches: 21
Number of test batches: 34


In [7]:
# siamese bilstm model
class SiameseBiLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_classes):
        super(SiameseBiLSTM, self).__init__()
        
        # embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        
        # bidirectional lstm
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        
        # classification layers
        self.fc1 = nn.Linear(4 * hidden_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, num_classes)
        self.dropout = nn.Dropout(0.3)
    
    def encode(self, x):
        # embed sequence
        embeds = self.embedding(x)
        
        # pass through bilstm
        lstm_out, (h_n, c_n) = self.lstm(embeds)
        
        # concat forward and backward final hidden states
        hidden = torch.cat((h_n[0], h_n[1]), dim=1)
        return hidden
    
    def forward(self, premise, hypothesis):
        # encode both sequences with shared encoder
        premise_vec = self.encode(premise)
        hypothesis_vec = self.encode(hypothesis)
        
        # concatenate encodings
        combined = torch.cat((premise_vec, hypothesis_vec), dim=1)
        
        # classification
        out = torch.relu(self.fc1(combined))
        out = self.dropout(out)
        out = self.fc2(out)
        return out


In [8]:
# initialize model
model = SiameseBiLSTM(vocab_size, EMBEDDING_DIM, HIDDEN_DIM, NUM_CLASSES).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

print("Model initialized")
print(f"Total parameters: {sum(p.numel() for p in model.parameters())}")


Model initialized
Total parameters: 3677314


In [9]:
# training function
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    all_preds = []
    all_labels = []
    
    for premises, hypotheses, labels in dataloader:
        premises = premises.to(device)
        hypotheses = hypotheses.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(premises, hypotheses)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
        # get predictions
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
    
    avg_loss = total_loss / len(dataloader)
    accuracy = accuracy_score(all_labels, all_preds)
    return avg_loss, accuracy


In [10]:
# evaluation function
def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for premises, hypotheses, labels in dataloader:
            premises = premises.to(device)
            hypotheses = hypotheses.to(device)
            labels = labels.to(device)
            
            outputs = model(premises, hypotheses)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item()
            
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    avg_loss = total_loss / len(dataloader)
    accuracy = accuracy_score(all_labels, all_preds)
    return avg_loss, accuracy, all_preds, all_labels


In [None]:
# training loop
best_val_acc = 0
train_losses = []
val_losses = []
train_accs = []
val_accs = []

for epoch in range(NUM_EPOCHS):
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc, _, _ = evaluate(model, val_loader, criterion, device)
    
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accs.append(train_acc)
    val_accs.append(val_acc)
    
    print(f"Epoch {epoch+1}/{NUM_EPOCHS}")
    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
    print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    print("-" * 50)
    
    # save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), 'best_siamese_bilstm.pth')
        print(f"Best model saved with validation accuracy: {best_val_acc:.4f}")
    print()


In [None]:
# load best model
model.load_state_dict(torch.load('best_siamese_bilstm.pth'))
print("Best model loaded")

In [None]:
# evaluate on test set
test_loss, test_acc, test_preds, test_labels = evaluate(model, test_loader, criterion, device)

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")
print("\nClassification Report:")
print(classification_report(test_labels, test_preds, target_names=['entails', 'neutral']))


In [None]:
# plot training history
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

# plot loss
ax1.plot(train_losses, label='Train Loss')
ax1.plot(val_losses, label='Val Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Training and Validation Loss')
ax1.legend()

# plot accuracy
ax2.plot(train_accs, label='Train Acc')
ax2.plot(val_accs, label='Val Acc')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.set_title('Training and Validation Accuracy')
ax2.legend()

plt.tight_layout()
plt.show()


In [None]:
# save training results
results = {
    'train_losses': train_losses,
    'val_losses': val_losses,
    'train_accs': train_accs,
    'val_accs': val_accs,
    'test_loss': test_loss,
    'test_acc': test_acc,
    'hyperparameters': {
        'batch_size': BATCH_SIZE,
        'embedding_dim': EMBEDDING_DIM,
        'hidden_dim': HIDDEN_DIM,
        'learning_rate': LEARNING_RATE,
        'num_epochs': NUM_EPOCHS
    }
}

with open('bilstm_results.pkl', 'wb') as f:
    pickle.dump(results, f)

print("Results saved to bilstm_results.pkl")


In [None]:
# download trained model and results
if IN_COLAB:
    from google.colab import files
    
    print("Downloading trained model and results...")
    
    # download the best model
    if os.path.exists('best_siamese_bilstm.pth'):
        files.download('best_siamese_bilstm.pth')
        print("Downloaded: best_siamese_bilstm.pth")
    else:
        print("Error: best_siamese_bilstm.pth not found")
    
    # download the results
    if os.path.exists('bilstm_results.pkl'):
        files.download('bilstm_results.pkl')
        print("Downloaded: bilstm_results.pkl")
    else:
        print("Error: bilstm_results.pkl not found")
    
    print("All files downloaded! You can now test the model locally.")
else:
    print("Not running on Colab - files are already in your local directory")
    print(f"  Model: best_siamese_bilstm.pth")
    print(f"  Results: bilstm_results.pkl")
