In [1]:
!pip install datasets
!pip install datasets transformers
!pip install --upgrade datasets



In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy
import matplotlib.pyplot as plt
import os
import shutil
import zipfile
import torch.optim as optim
import datasets
from datasets import load_dataset
from torch.cuda.amp import autocast, GradScaler
from tqdm import tqdm
import re
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from transformers import GPT2Tokenizer

In [3]:
class Embedding(nn.Module):
    def __init__(self, vocab_size, embed_size, max_len):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, embed_size)
        self.position_embedding = nn.Parameter(torch.randn(1, max_len, embed_size))

    def forward(self, token_ids):
        seq_len = token_ids.size(1)
        token_emb = self.token_embedding(token_ids)
        pos_emb = self.position_embedding[:, :seq_len, :].to(token_emb.device)
        return token_emb + pos_emb



In [None]:
class CasualSelfAttention(nn.Module):
    def __init__(self, embed_dim, num_heads, max_seq_len=256):
        super().__init__()
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        self.scale = self.head_dim ** -0.5
        self.max_seq_len = max_seq_len
        self.qkv = nn.Linear(embed_dim, embed_dim * 3)
        self.out_proj = nn.Linear(embed_dim, embed_dim)
        mask = torch.tril(torch.ones(max_seq_len, max_seq_len))
        self.register_buffer("mask", mask.unsqueeze(0))

    def forward(self, x):
        B, T, C = x.shape
        assert T <= self.max_seq_len, f"Sequence length {T} exceeds maximum {self.max_seq_len}"
        qkv = self.qkv(x).reshape(B, T, 3, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]
        attn_scores = (q @ k.transpose(-2, -1)) * self.scale
        attn_mask = self.mask[:, :T, :T]
        attn_scores = attn_scores.masked_fill(attn_mask == 0, float('-inf'))
        attn_probs = F.softmax(attn_scores, dim=-1)
        out = (attn_probs @ v).transpose(1, 2).reshape(B, T, C)
        return self.out_proj(out)


In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, mlp_dim=3072, max_seq_len=256):
        super().__init__()
        self.ln1 = nn.LayerNorm(embed_dim)
        self.attn = CasualSelfAttention(embed_dim, num_heads, max_seq_len)
        self.ln2 = nn.LayerNorm(embed_dim)
        self.mlp = nn.Sequential(
            nn.Linear(embed_dim, mlp_dim),
            nn.GELU(),
            nn.Linear(mlp_dim, embed_dim)
        )

    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.mlp(self.ln2(x))
        return x

In [None]:
class GPT(nn.Module):
    def __init__(self, vocab_size=50257, embed_dim=768, num_heads=12, num_layers=12, seq_len=1024):
        super().__init__()
        self.embed_dim = embed_dim
        self.seq_len = seq_len
        self.vocab_size = vocab_size
        self.embedding = Embedding(vocab_size, embed_dim, seq_len)
        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(embed_dim, num_heads, max_seq_len=seq_len) for _ in range(num_layers)
        ])
        self.ln_final = nn.LayerNorm(embed_dim)
        self.head = nn.Linear(embed_dim, vocab_size)

    def forward(self, x):
        if torch.max(x) >= self.vocab_size:
            raise ValueError(f"Input contains token IDs >= vocab_size ({self.vocab_size})")
        x = self.embedding(x)
        for block in self.transformer_blocks:
            x = block(x)
        x = self.ln_final(x)
        logits = self.head(x)
        return logits

In [4]:
print("Loading dataset...")
dataset = load_dataset("bavard/personachat_truecased")
train_dataset = dataset["train"].select(range(10000))  # Take first 10,000 examples
test_dataset = dataset["validation"].select(range(1000))  # Take first 1,000 examples

# Initialize tokenizer
print("Initializing tokenizer...")
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.pad_token_id = tokenizer.eos_token_id

# Print tokenizer info
print(f"Tokenizer vocabulary size: {tokenizer.vocab_size}")
print(f"Tokenizer pad token ID: {tokenizer.pad_token_id}")
print(f"Tokenizer eos token ID: {tokenizer.eos_token_id}")

# Sample dataset to check structure
sample = train_dataset[0]
print("Keys in the dataset example:", sample.keys())


Loading dataset...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Repo card metadata block was not found. Setting CardData to empty.


Initializing tokenizer...
Tokenizer vocabulary size: 50257
Tokenizer pad token ID: 50256
Tokenizer eos token ID: 50256
Keys in the dataset example: dict_keys(['personality', 'candidates', 'history', 'conv_id', 'utterance_idx'])


In [5]:
class PersonaChatDataset(Dataset):
    def __init__(self, dataset, tokenizer, max_length=256):
        self.data = []
        for example in dataset:
            persona = " ".join(example["personality"])
            dialogue = " ".join(example["history"])
            text = f"Persona: {persona} Dialogue: {dialogue}"
            tokens = tokenizer(text, padding="max_length", truncation=True, max_length=max_length, return_tensors="pt")
            self.data.append(tokens["input_ids"].squeeze(0))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]


In [6]:
print("Creating datasets...")
train_data = PersonaChatDataset(train_dataset, tokenizer, max_length=256)
test_data = PersonaChatDataset(test_dataset, tokenizer, max_length=256)

sample_tokens = train_data[0]
print(f"Sample tokens - min: {sample_tokens.min().item()}, max: {sample_tokens.max().item()}")
print(f"Sample tokens shape: {sample_tokens.shape}")
if sample_tokens.max() >= tokenizer.vocab_size:
    print(f"WARNING: Found token IDs >= vocab_size ({tokenizer.vocab_size})")

Creating datasets...
Sample tokens - min: 11, max: 50256
Sample tokens shape: torch.Size([256])


In [None]:
batch_size = 8
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)


print("Initializing model...")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


vocab_size = tokenizer.vocab_size
model = GPT(vocab_size=vocab_size, embed_dim=384, num_heads=12, num_layers=12, seq_len=256).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5, weight_decay=1e-3)
scaler = GradScaler()
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters: {total_params:,}")

In [None]:
num_epochs = 15
train_losses = []
test_losses = []
accuracies = []
best_test_loss = float('inf')
best_model_path = "best_model.pth"


print("Starting training...")
for epoch in range(num_epochs):
    model.train()
    epoch_train_loss = 0
    correct_train = 0
    total_train = 0

    progress = tqdm(total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}",
                   bar_format='{l_bar}{bar:10}{r_bar}{bar:-10b}',
                   leave=True)

    for i, batch in enumerate(train_loader):
        try:
            # Ensure batch is within vocab range
            if batch.max() >= vocab_size:
                batch = torch.clamp(batch, max=vocab_size-1)

            inputs = batch.to(device)
            targets = inputs[:, 1:].contiguous()

            optimizer.zero_grad()

            with torch.cuda.amp.autocast(enabled=device.type == 'cuda'):
                outputs = model(inputs[:, :-1])
                # Use reshape instead of view for safety
                loss = F.cross_entropy(outputs.reshape(-1, vocab_size), targets.reshape(-1))

            # Use gradient scaling for mixed precision training
            scaler.scale(loss).backward()

            # Clip gradients to avoid exploding gradients
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            scaler.step(optimizer)
            scaler.update()

            epoch_train_loss += loss.item()

            # Calculate accuracy
            with torch.no_grad():
                preds = outputs.argmax(dim=-1)
                correct_train += (preds == targets).sum().item()
                total_train += targets.numel()

            # Update progress bar with current loss and accuracy
            current_loss = epoch_train_loss / (i + 1)
            current_acc = correct_train / total_train if total_train > 0 else 0
            progress.set_postfix(loss=f"{current_loss:.4f}", acc=f"{current_acc:.4f}")
            progress.update(1)

        except Exception as e:
            print(f"Error in batch {i}: {e}")
            # Skip this batch and continue with the next one
            continue

    progress.close()

    avg_train_loss = epoch_train_loss / len(train_loader) if len(train_loader) > 0 else float('inf')
    train_losses.append(avg_train_loss)

    # Evaluation
    model.eval()
    epoch_test_loss = 0
    correct = 0
    total = 0

    print("Evaluating...")
    with torch.no_grad():
        for batch in test_loader:
            try:
                # Ensure batch is within vocab range
                if batch.max() >= vocab_size:
                    batch = torch.clamp(batch, max=vocab_size-1)

                inputs = batch.to(device)
                targets = inputs[:, 1:].contiguous()

                with torch.cuda.amp.autocast(enabled=device.type == 'cuda'):
                    outputs = model(inputs[:, :-1])
                    # Use reshape instead of view for safety
                    loss = F.cross_entropy(outputs.reshape(-1, vocab_size), targets.reshape(-1))
                    epoch_test_loss += loss.item()

                    preds = outputs.argmax(dim=-1)
                    correct += (preds == targets).sum().item()
                    total += targets.numel()

            except Exception as e:
                print(f"Error in evaluation batch: {e}")
                # Skip this batch and continue with the next one
                continue

    avg_test_loss = epoch_test_loss / len(test_loader) if len(test_loader) > 0 else float('inf')
    accuracy = correct / total if total > 0 else 0
    test_losses.append(avg_test_loss)
    accuracies.append(accuracy)

    # Print epoch summary
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.4f}, Test Loss: {avg_test_loss:.4f}, Accuracy: {accuracy:.4f}")

    # Save the best model
    if avg_test_loss < best_test_loss:
        best_test_loss = avg_test_loss
        torch.save(model.state_dict(), best_model_path)
        print(f"Saved best model with Test Loss: {best_test_loss:.4f}")

print("Training completed!")

In [None]:
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Test Loss')

plt.subplot(1, 2, 2)
plt.plot(accuracies)
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Test Accuracy')
plt.tight_layout()
plt.show()


In [23]:
print("Loading best model for text generation...")
best_model = GPT(vocab_size=vocab_size, embed_dim=384, num_heads=8, num_layers=6, seq_len=256).to(device)
best_model.load_state_dict(torch.load(best_model_path))
best_model.eval()

Loading best model for text generation...


  best_model.load_state_dict(torch.load(best_model_path))


GPT(
  (embedding): Embedding(
    (token_embedding): Embedding(50257, 384)
  )
  (transformer_blocks): ModuleList(
    (0-5): 6 x TransformerBlock(
      (ln1): LayerNorm((384,), eps=1e-05, elementwise_affine=True)
      (attn): CasualSelfAttention(
        (qkv): Linear(in_features=384, out_features=1152, bias=True)
        (out_proj): Linear(in_features=384, out_features=384, bias=True)
      )
      (ln2): LayerNorm((384,), eps=1e-05, elementwise_affine=True)
      (mlp): Sequential(
        (0): Linear(in_features=384, out_features=3072, bias=True)
        (1): GELU(approximate='none')
        (2): Linear(in_features=3072, out_features=384, bias=True)
      )
    )
  )
  (ln_final): LayerNorm((384,), eps=1e-05, elementwise_affine=True)
  (head): Linear(in_features=384, out_features=50257, bias=True)
)

In [27]:
def generate_text(model, tokenizer, prompt, max_length=50, temperature=1.0):
    model.eval()

    # Tokenize the prompt
    input_ids = tokenizer.encode(prompt, return_tensors='pt').to(device)

    # Generate text
    with torch.no_grad():
        for _ in range(max_length):
            # Get model predictions
            outputs = model(input_ids)
            next_token_logits = outputs[:, -1, :] / temperature

            # Apply softmax to get probabilities
            probs = F.softmax(next_token_logits, dim=-1)

            # Sample from the distribution
            next_token = torch.multinomial(probs, num_samples=1)

            # Append the next token to input_ids
            input_ids = torch.cat([input_ids, next_token], dim=1)

            # Stop if EOS token is generated
            if next_token.item() == tokenizer.eos_token_id:
                break

    # Decode the generated text
    return tokenizer.decode(input_ids[0], skip_special_tokens=True)

In [28]:
prompt = "Persona: Hello how was your day. Dialogue:"
generated_text = generate_text(best_model, tokenizer, prompt, max_length=20)
print(f"Generated text:\n{generated_text}")

Generated text:
Persona: Hello how was your day. Dialogue: Drop. I hate Hurricanes? My two I'm a college mer at poll. I'm great years
