# Code Generator AI - Full Training Pipeline

This notebook implements full training with:
1. Real code data
2. Tokenization
3. Validation
4. Early stopping
5. GPU acceleration

## 1. Setup Environment

In [None]:
!pip install torch transformers datasets wandb numpy tqdm

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaTokenizer
from datasets import load_dataset
import wandb
import numpy as np
from tqdm.auto import tqdm
import json
import os
from pathlib import Path

# Check GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
!nvidia-smi

## 2. Mount Google Drive & Setup Project

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Create project directories
!mkdir -p /content/code_generator_ai
!mkdir -p /content/drive/MyDrive/code_generator_ai/{checkpoints,data,logs}

# Upload local files
!cp -r /content/drive/MyDrive/code_generator_ai/* /content/code_generator_ai/

# Add to Python path
import sys
sys.path.append('/content/code_generator_ai')

## 3. Load Model and Data

In [None]:
# Initialize tokenizer
tokenizer = RobertaTokenizer.from_pretrained('microsoft/codebert-base')

class CodeDataset(Dataset):
    def __init__(self, split='train'):
        # Load Python code dataset
        self.dataset = load_dataset('codeparrot/codeparrot-clean', split=split)
        self.tokenizer = tokenizer
        self.max_length = 512
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        item = self.dataset[idx]
        
        # Tokenize input and output
        inputs = self.tokenizer(
            item['prompt'],
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt'
        )
        
        outputs = self.tokenizer(
            item['code'],
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt'
        )
        
        return {
            'input_ids': inputs['input_ids'].squeeze(),
            'attention_mask': inputs['attention_mask'].squeeze(),
            'labels': outputs['input_ids'].squeeze()
        }

# Create datasets
train_dataset = CodeDataset('train')
val_dataset = CodeDataset('validation')

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8)

## 4. Training Configuration

In [None]:
from model.transformer import CodeGeneratorTransformer

# Initialize model
model = CodeGeneratorTransformer(
    vocab_size=tokenizer.vocab_size,
    d_model=512,
    nhead=8,
    num_encoder_layers=6,
    num_decoder_layers=6
).to(device)

# Training parameters
num_epochs = 10
learning_rate = 1e-4
patience = 3  # Early stopping patience

# Initialize optimizer and loss
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

# Initialize wandb for tracking
wandb.init(
    project='code-generator-ai',
    config={
        'learning_rate': learning_rate,
        'epochs': num_epochs,
        'batch_size': 8,
        'model_type': 'transformer',
        'd_model': 512,
        'nhead': 8
    }
)

## 5. Training Loop with Early Stopping

In [None]:
def validate(model, val_loader):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            outputs = model(input_ids, labels[:, :-1])
            loss = criterion(outputs.view(-1, outputs.size(-1)), labels[:, 1:].contiguous().view(-1))
            total_loss += loss.item()
    
    return total_loss / len(val_loader)

# Training loop
best_val_loss = float('inf')
patience_counter = 0
checkpoints_dir = Path('/content/drive/MyDrive/code_generator_ai/checkpoints')

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}')
    
    for batch in progress_bar:
        optimizer.zero_grad()
        
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        outputs = model(input_ids, labels[:, :-1])
        loss = criterion(outputs.view(-1, outputs.size(-1)), labels[:, 1:].contiguous().view(-1))
        
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        
        total_loss += loss.item()
        progress_bar.set_postfix({'loss': loss.item()})
        
        # Log to wandb
        wandb.log({
            'train_batch_loss': loss.item(),
            'epoch': epoch
        })
    
    # Validation
    val_loss = validate(model, val_loader)
    print(f'Epoch {epoch+1}, Train Loss: {total_loss/len(train_loader):.4f}, Val Loss: {val_loss:.4f}')
    
    # Log to wandb
    wandb.log({
        'train_epoch_loss': total_loss/len(train_loader),
        'val_loss': val_loss,
        'epoch': epoch
    })
    
    # Save checkpoint if best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_loss': val_loss,
        }, checkpoints_dir / 'best_model.pt')
    else:
        patience_counter += 1
    
    # Early stopping
    if patience_counter >= patience:
        print(f'Early stopping triggered after {epoch+1} epochs')
        break

wandb.finish()

## 6. Test Model Generation

In [None]:
def generate_code(prompt, max_length=100):
    model.eval()
    with torch.no_grad():
        # Tokenize input
        inputs = tokenizer(
            prompt,
            return_tensors='pt',
            padding=True,
            truncation=True,
            max_length=512
        ).to(device)
        
        # Initialize output with start token
        output_ids = torch.tensor([[tokenizer.bos_token_id]]).to(device)
        
        # Generate tokens
        for _ in range(max_length):
            outputs = model(inputs['input_ids'], output_ids)
            next_token = outputs[:, -1, :].argmax(dim=-1).unsqueeze(-1)
            output_ids = torch.cat([output_ids, next_token], dim=-1)
            
            if next_token.item() == tokenizer.eos_token_id:
                break
        
        # Decode output
        generated = tokenizer.decode(output_ids[0], skip_special_tokens=True)
        return generated

# Test generation
test_prompts = [
    "Write a Python function to find the factorial of a number",
    "Create a function to check if a string is palindrome",
    "Write a binary search implementation"
]

for prompt in test_prompts:
    print(f"\nPrompt: {prompt}")
    print("Generated Code:")
    print(generate_code(prompt))
    print("-" * 50)