<a href="https://colab.research.google.com/github/MartinFarres/transformers_Chatbot_QA/blob/main/transformers_Chatbot_Q%26Av2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Chatbot Q&A - Seq2Seq Transformer Model

## 1. Data Preprocessing

In [None]:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import math
from collections import Counter
import re
import torch.nn.functional as F

# ============================================================================
# STEP 1: IMPROVED DATA PREPROCESSING
# ============================================================================

class DialogDataset(Dataset):
    def __init__(self, file_path, vocab=None, max_len=50):
        self.pairs = []
        self.max_len = max_len


        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                parts = line.strip().split('\t')
                if len(parts) == 2:
                    question, answer = parts
                    self.pairs.append((question.lower(), answer.lower()))


        # Build or use existing vocabulary
        if vocab is None:
            self.vocab = self.build_vocab()
        else:
            self.vocab = vocab

        self.word2idx = {word: idx for idx, word in enumerate(self.vocab)}
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}

    def build_vocab(self):

        words = []
        for q, a in self.pairs:
            words.extend(self.tokenize(q))
            words.extend(self.tokenize(a))

        word_counts = Counter(words)

        vocab = ['<PAD>', '<SOS>', '<EOS>', '<UNK>']
        vocab.extend([word for word, count in word_counts.most_common() if count >= 3])

        # Cap vocabulary size
        MAX_VOCAB_SIZE = 5000
        if len(vocab) > MAX_VOCAB_SIZE:
            vocab = vocab[:MAX_VOCAB_SIZE]

        return vocab

    def tokenize(self, text):
        text = re.sub(r"[^\w\s']", '', text)
        return text.split()

    def encode(self, text):
        tokens = self.tokenize(text)
        indices = [self.word2idx.get(word, self.word2idx['<UNK>']) for word in tokens]
        return indices

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        question, answer = self.pairs[idx]

        src = [self.word2idx['<SOS>']] + self.encode(question) + [self.word2idx['<EOS>']]
        tgt = [self.word2idx['<SOS>']] + self.encode(answer) + [self.word2idx['<EOS>']]

        src = src[:self.max_len]
        tgt = tgt[:self.max_len]

        return torch.tensor(src), torch.tensor(tgt)


def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for src, tgt in batch:
        src_batch.append(src)
        tgt_batch.append(tgt)

    src_batch = nn.utils.rnn.pad_sequence(src_batch, batch_first=True, padding_value=0)
    tgt_batch = nn.utils.rnn.pad_sequence(tgt_batch, batch_first=True, padding_value=0)

    return src_batch, tgt_batch

In [None]:
dataset = DialogDataset('dialogs.txt')

print(f"Number of dialog pairs: {len(dataset)}")
print(f"Vocabulary size: {len(dataset.vocab)}")
print(f"First pair (encoded): {dataset[0]}")

FileNotFoundError: [Errno 2] No such file or directory: 'dialogs.txt'

In [None]:
input = "hi, how are you doing?"
tokens = dataset.tokenize(input)
encode = dataset.encode(input)
encode.insert(0, dataset.word2idx['<SOS>'])
encode.append(dataset.word2idx['<EOS>'])
print(f"Input : {input}")
print(f"Tokens : {tokens}")
print(f"Encode : {encode}")

## 2. Positional Encoding



In [None]:
# ============================================================================
# STEP 2: POSITIONAL ENCODING
# ============================================================================

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()

        # Create matrix of shape (max_len, d_model)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()

        # Create the div_term for sine and cosine
        div_term = torch.exp(torch.arange(0, d_model, 2).float() *
                            (-math.log(10000.0) / d_model))


        # Apply sine to even indices
        # Even positions: sin(pos / 10000^(2i/d_model))
        pe[:, 0::2] = torch.sin(position * div_term)


        # Apply cosine to odd indices
        # Odd positions: cos(pos / 10000^(2i/d_model))
        pe[:, 1::2] = torch.cos(position * div_term)

        # Add batch dimension
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]


Word embedding:      [0.2, 0.5, 0.1, ...]

\+ Positional info:   [0.1, 0.0, 0.3, ...]

= Final embedding:   [0.3, 0.5, 0.4, ...]

## 3. Transformer Model

### Architecture - Transformer
    1. Embedding layer
    2. Positional encoding
    3. Transformer encoder (processes input question)
    4. Transformer decoder (generates answer)
    5. Output linear layer

### Encoder Flow

    Input Question → Embedding → Positional Encoding → Multi-Head Attention → Feed Forward → Output

### Decoder Flow
    Previous Words → Embedding → Positional Encoding →
    Masked Multi-Head Attention →
    Cross-Attention (with Encoder output) →
    Feed Forward →
    Output Prediction

In [None]:
# ============================================================================
# STEP 3: TRANSFORMER MODEL
# ============================================================================

class Seq2SeqTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=256, nhead=8,
                 num_encoder_layers=3, num_decoder_layers=3,
                 dim_feedforward=1024, dropout=0.15, max_len=100):
        super().__init__()

        self.d_model = d_model
        self.vocab_size = vocab_size

        # Embedding layers
        self.src_embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.tgt_embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)

        # Positional encoding
        self.pos_encoder = PositionalEncoding(d_model, max_len)

        # Transformer
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )

        # Output layer
        self.fc_out = nn.Linear(d_model, vocab_size)

        # Initialize weights
        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.src_embedding.weight.data.uniform_(-initrange, initrange)
        self.tgt_embedding.weight.data.uniform_(-initrange, initrange)
        self.fc_out.bias.data.zero_()
        self.fc_out.weight.data.uniform_(-initrange, initrange)

    def load_pretrained_embeddings(self, embeddings):
        if isinstance(embeddings, np.ndarray):
            embeddings = torch.from_numpy(embeddings)

        if embeddings.shape[1] < self.d_model:
            padding = torch.randn(embeddings.shape[0], self.d_model - embeddings.shape[1]) * 0.01  # Small random instead of zeros
            embeddings = torch.cat([embeddings, padding], dim=1)

        self.src_embedding.weight.data.copy_(embeddings)
        self.tgt_embedding.weight.data.copy_(embeddings)

    def generate_square_subsequent_mask(self, sz):
        mask = torch.triu(torch.ones(sz, sz), diagonal=1)
        mask = mask.masked_fill(mask == 1, float('-inf'))
        return mask

    def create_padding_mask(self, seq, pad_idx=0):
        return (seq == pad_idx)

    def forward(self, src, tgt):
        # Create masks
        tgt_mask = self.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device)
        src_padding_mask = self.create_padding_mask(src)
        tgt_padding_mask = self.create_padding_mask(tgt)

        # Embed and add positional encoding
        src_emb = self.pos_encoder(self.src_embedding(src) * math.sqrt(self.d_model))
        tgt_emb = self.pos_encoder(self.tgt_embedding(tgt) * math.sqrt(self.d_model))

        # Pass through transformer
        output = self.transformer(
            src_emb,
            tgt_emb,
            tgt_mask=tgt_mask,
            src_key_padding_mask=src_padding_mask,
            tgt_key_padding_mask=tgt_padding_mask
        )

        # Project to vocabulary
        output = self.fc_out(output)

        return output

    def encode(self, src):
        src_padding_mask = self.create_padding_mask(src)
        src_emb = self.pos_encoder(self.src_embedding(src) * math.sqrt(self.d_model))
        memory = self.transformer.encoder(src_emb, src_key_padding_mask=src_padding_mask)
        return memory

    def decode(self, tgt, memory, tgt_mask=None):
        tgt_padding_mask = self.create_padding_mask(tgt)
        tgt_emb = self.pos_encoder(self.tgt_embedding(tgt) * math.sqrt(self.d_model))
        output = self.transformer.decoder(
            tgt_emb,
            memory,
            tgt_mask=tgt_mask,
            tgt_key_padding_mask=tgt_padding_mask
        )
        return self.fc_out(output)



In [None]:
def load_glove_embeddings(glove_file, word2idx, embedding_dim=100):
    embeddings = np.zeros((len(word2idx), embedding_dim))

    with open(glove_file, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            vector = np.array(values[1:], dtype='float32')

            if word in word2idx:
                embeddings[word2idx[word]] = vector

    return embeddings



## 4. Training Loop - Teacher Forcing

    Target: "I am fine"

    Input to decoder:        [<SOS>, I, am]
    Expected output:         [I, am, fine]
    Model tries to predict:  [I, am, fine]

If the model predicts incorrectly, we still feed it the correct word for the next step.

In [None]:
def train_epoch(model, dataloader, optimizer, criterion, device, epoch, scheduler=None):
    model.train()
    total_loss = 0.0

    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)

        tgt_input = tgt[:, :-1]
        tgt_output = tgt[:, 1:]

        optimizer.zero_grad()

        with torch.cuda.amp.autocast(enabled=(device.type == 'cuda')):
            output = model(src, tgt_input)
            output_flat = output.reshape(-1, output.shape[-1])
            tgt_flat = tgt_output.reshape(-1)
            loss = criterion(output_flat, tgt_flat)

        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), GRAD_CLIP)
        scaler.step(optimizer)
        scaler.update()

        total_loss += loss.item()

        if scheduler is not None:
            scheduler.step()

    return total_loss / len(dataloader)


def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0.0

    with torch.no_grad():
        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)

            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            with torch.cuda.amp.autocast(enabled=(device.type == 'cuda')):
                output = model(src, tgt_input)
                output_flat = output.reshape(-1, output.shape[-1])
                tgt_flat = tgt_output.reshape(-1)
                loss = criterion(output_flat, tgt_flat)

            total_loss += loss.item()

    return total_loss / len(dataloader)



## 5. Generating Responses

In [None]:
def generate_response_diverse(model, src, dataset, max_len=20, device='cpu'):
    """NEW: Generate with diversity penalty to avoid repetitive responses"""
    model.eval()

    src_tokens = [dataset.word2idx['<SOS>']] + dataset.encode(src) + [dataset.word2idx['<EOS>']]
    src_tensor = torch.tensor([src_tokens]).to(device)
    memory = model.encode(src_tensor)

    tgt_tokens = [dataset.word2idx['<SOS>']]
    recent_tokens = set()

    with torch.no_grad():
        for step in range(max_len):
            tgt_tensor = torch.tensor([tgt_tokens]).to(device)
            tgt_mask = model.generate_square_subsequent_mask(len(tgt_tokens)).to(device)

            output = model.decode(tgt_tensor, memory, tgt_mask)
            logits = output[:, -1, :]

            # Penalize recently used tokens
            for token in recent_tokens:
                logits[0, token] -= 2.0

            # Use nucleus sampling
            logits = logits / 0.85  # temperature
            probs = F.softmax(logits, dim=-1)

            sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1)
            cumulative_probs = torch.cumsum(sorted_probs, dim=-1)

            mask = cumulative_probs <= 0.9
            mask[..., 0] = True

            filtered_probs = sorted_probs.clone()
            filtered_probs[~mask] = 0
            filtered_probs = filtered_probs / filtered_probs.sum()

            next_token_idx = torch.multinomial(filtered_probs, num_samples=1).item()
            next_token = sorted_indices[0, next_token_idx].item()

            if next_token == dataset.word2idx['<EOS>']:
                break

            tgt_tokens.append(next_token)
            recent_tokens.add(next_token)
            if len(recent_tokens) > 5:
                recent_tokens = set(list(recent_tokens)[-5:])

    response_tokens = []
    for idx in tgt_tokens[1:]:
        word = dataset.idx2word.get(idx, '<UNK>')
        if word not in ['<PAD>', '<SOS>', '<EOS>', '<UNK>']:
            response_tokens.append(word)

    return ' '.join(response_tokens) if response_tokens else "..."

In [None]:
def generate_response_beam(model, src, dataset, beam_width=5, max_len=100, device='cpu'):
    model.eval()
    eos_idx = dataset.word2idx['<EOS>']
    sos_idx = dataset.word2idx['<SOS>']

    src_tokens = [sos_idx] + dataset.encode(src) + [eos_idx]
    src_tensor = torch.tensor([src_tokens]).to(device)
    memory = model.encode(src_tensor)

    # Initialize beam
    beam = [{'tokens': [sos_idx], 'score': 0.0, 'prev_scores': []}]

    for step in range(max_len):
        candidates = []
        for b in beam:
            if b['tokens'][-1] == eos_idx:
                candidates.append(b)
                continue

            tgt_tensor = torch.tensor([b['tokens']]).to(device)
            tgt_mask = model.generate_square_subsequent_mask(tgt_tensor.size(1)).to(device)

            output = model.decode(tgt_tensor, memory, tgt_mask)
            log_probs = F.log_softmax(output[:, -1, :], dim=-1)

            top_log_probs, top_indices = torch.topk(log_probs, beam_width, dim=-1)
            for i in range(beam_width):
                next_token = top_indices[0, i].item()
                log_prob = top_log_probs[0, i].item()
                new_tokens = b['tokens'] + [next_token]
                # Length-normalized score to avoid short sequences
                new_score = (b['score'] * len(b['tokens']) + log_prob) / len(new_tokens)**0.6
                candidates.append({'tokens': new_tokens, 'score': new_score, 'prev_scores': b['prev_scores'] + [log_prob]})

        # Select top beam_width
        beam = sorted(candidates, key=lambda x: x['score'], reverse=True)[:beam_width]

        if all(b['tokens'][-1] == eos_idx for b in beam):
            break

    # Best sequence
    best_tokens = beam[0]['tokens'][1:]
    if best_tokens and best_tokens[-1] == eos_idx:
        best_tokens = best_tokens[:-1]

    response = ' '.join(dataset.idx2word.get(idx, '<UNK>') for idx in best_tokens
                        if idx not in [dataset.word2idx['<PAD>'], sos_idx, eos_idx, dataset.word2idx['<UNK>']])
    return response if response else "..."

## 6. Run the Model

In [None]:
BATCH_SIZE = 128  # Increased, monitor GPU memory
EPOCHS = 50  # Increased
LEARNING_RATE = 0.0002  # Reduced
D_MODEL = 256  # Increased
NHEAD = 8
NUM_ENCODER_LAYERS = 3  # Increased
NUM_DECODER_LAYERS = 3  # Increased
DIM_FEEDFORWARD = 1024  # Increased
DROPOUT = 0.15  # Reduced
MAX_LEN = 100  # Increased
BEAM_WIDTH = 5  # For new generation
GRAD_CLIP = 0.5
LABEL_SMOOTHING = 0.05  # Reduced

# Device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Load dataset
dataset = DialogDataset('dialogs.txt', max_len=MAX_LEN)

# Split 80/20 instead of 70/30
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
generator = torch.Generator().manual_seed(42)
train_dataset, val_dataset = torch.utils.data.random_split(
    dataset, [train_size, val_size], generator=generator
)

print(f"\nTrain size: {train_size}, Val size: {val_size}")

# Create dataloaders
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=0,  # Changed from 2 to 0 for compatibility
    pin_memory=True if device.type == 'cuda' else False
)
val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=collate_fn,
    num_workers=0,
    pin_memory=True if device.type == 'cuda' else False
)

# Create SMALLER model
model = Seq2SeqTransformer(
    vocab_size=len(dataset.vocab),
    d_model=D_MODEL,
    nhead=NHEAD,
    num_encoder_layers=NUM_ENCODER_LAYERS,
    num_decoder_layers=NUM_DECODER_LAYERS,
    dim_feedforward=DIM_FEEDFORWARD,
    dropout=DROPOUT,
    max_len=MAX_LEN
).to(device)

print(f"\nModel parameters: {sum(p.numel() for p in model.parameters()):,}")

# Load GloVe embeddings (use 100d, closest to our D_MODEL=128)
print("\nLoading GloVe embeddings...")
glove_embeddings = load_glove_embeddings(
    './drive/MyDrive/glove.6B.200d.txt',  # CHANGED: Use 100d instead of 300d
    dataset.word2idx,
    embedding_dim=200
)

# Adjust embeddings to match D_MODEL
if glove_embeddings.shape[1] != D_MODEL:
    padding = np.zeros((glove_embeddings.shape[0], D_MODEL - glove_embeddings.shape[1]))
    glove_embeddings = np.concatenate([glove_embeddings, padding], axis=1)

model.load_pretrained_embeddings(glove_embeddings)

# Criterion with label smoothing
pad_idx = dataset.word2idx.get('<PAD>', 0)
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx, label_smoothing=LABEL_SMOOTHING)

# Optimizer
optimizer = optim.AdamW(
    model.parameters(),
    lr=LEARNING_RATE,
    weight_decay=0.01,
    betas=(0.9, 0.98)
)

# Scheduler
# scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
#     optimizer,
#     T_0=5,
#     T_mult=2,
#     eta_min=1e-6
# )
# New Scheduler with Warmup (replace CosineAnnealing)
class InverseSqrtScheduler:
    def __init__(self, optimizer, warmup_steps=4000, d_model=256):
        self.optimizer = optimizer
        self.warmup_steps = warmup_steps
        self.d_model = d_model
        self.current_step = 0

    def step(self):
        self.current_step += 1
        arg1 = self.current_step ** -0.5
        arg2 = self.current_step * (self.warmup_steps ** -1.5)
        lr = (self.d_model ** -0.5) * min(arg1, arg2)
        for param_group in self.optimizer.param_groups:
            param_group['lr'] = lr

scheduler = InverseSqrtScheduler(optimizer, warmup_steps=4000, d_model=D_MODEL)

# Mixed precision
scaler = torch.cuda.amp.GradScaler(enabled=(device.type == 'cuda'))

# Training loop
print("\n" + "="*60)
print("Starting training with OPTIMIZED hyperparameters")
print("="*60)
best_val_loss = float('inf')
patience = 6
patience_cnt = 0

for epoch in range(EPOCHS):
    train_loss = train_epoch(model, train_loader, optimizer, criterion, device, epoch, scheduler)  # Pass scheduler
    val_loss = evaluate(model, val_loader, criterion, device)

    print(f"\nEpoch {epoch+1}/{EPOCHS}")
    print(f"  Train Loss: {train_loss:.4f}")
    print(f"  Val Loss: {val_loss:.4f}")
    print(f"  LR: {optimizer.param_groups[0]['lr']:.6e}")

    if val_loss < best_val_loss - 1e-4:
        best_val_loss = val_loss
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_loss': val_loss,
        }, 'best_chatbot_model.pt')
        patience_cnt = 0
        print("  → Model saved (new best)!")
    else:
        patience_cnt += 1
        print(f"  No improvement. patience {patience_cnt}/{patience}")
        if patience_cnt >= patience:
            print("Early stopping triggered.")
            break

    # Test with diverse generation every 3 epochs
    if (epoch + 1) % 3 == 0:
        test_questions = [
            "hi, how are you doing?",
            "what's your favorite movie?",
            "how's the weather?"
        ]
        print("\n  Sample generations (Diverse Sampling):")
        for q in test_questions:
            response = generate_response_beam(model, q, dataset, device=device)
            print(f"    Q: {q}")
            print(f"    A: {response}")
        print()

print("\n" + "="*60)
print("Training complete!")
print("="*60)

# Load best model


In [None]:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import math
from collections import Counter
import re
import torch.nn.functional as F

checkpoint = torch.load('.best_chatbot_model.pt')
model.load_state_dict(checkpoint['model_state_dict'])
print(f"\nLoaded best model from epoch {checkpoint['epoch']+1}")
print(f"Best validation loss: {checkpoint['val_loss']:.4f}")

# Interactive chat
print("\n" + "="*60)
print("Interactive Chat (type 'quit' to exit)")
print("="*60)

while True:
    user_input = __builtins__.input("\nYou: ")
    if user_input.lower() == 'quit':
        break

    response = generate_response_beam(model, q, dataset, device=device)
    print(f"Bot: {response}")

FileNotFoundError: [Errno 2] No such file or directory: '.best_chatbot_model.pt'