In [None]:
# Tokenizer setup: load tokenizer and ensure [PAD] token exists
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('gpt2')
# Add pad token if missing and set it
if tokenizer.pad_token is None or '[PAD]' not in tokenizer.get_vocab():
    tokenizer.add_special_tokens({'pad_token': '[PAD]'})
    tokenizer.pad_token = '[PAD]'
    print("Added and set [PAD] token as tokenizer.pad_token.")
else:
    print("[PAD] token already present in tokenizer.")
# Expose vocab size for downstream model creation
vocab_size = len(tokenizer)
print(f'Tokenizer vocab size: {vocab_size}')

In [None]:
# Utility helpers to create tensors on the same device as the model
import torch
def to_model_device(tensor):
    """Move a tensor to the device where the model parameters live."""
    try:
        device = next(model.parameters()).device
    except Exception:
        device = torch.device('cpu')
    return tensor.to(device)

def randint_on_model_device(low, high, size, dtype=torch.long):
    """Create a random integer tensor on the model device."""
    try:
        device = next(model.parameters()).device
    except Exception:
        device = torch.device('cpu')
    return torch.randint(low, high, size, dtype=dtype, device=device)

print('Model device helpers installed (to_model_device, randint_on_model_device)')

In [None]:
# Duplicate pad-token check removed; consolidated at the top of the notebook.

# GourmetGPT: Recipe Generation Model Development

This notebook implements, trains, evaluates, and exports the GourmetGPT model in accordance with the project constitution and specification.

---

## 1. Import Dependencies and Set Up Environment
- Install and import required libraries (PyTorch, transformers, etc.)
- Set random seeds for reproducibility
- Configure Google Drive integration for artifact storage

In [None]:
# Install and import required libraries
!pip install torch transformers --quiet
import torch
import random
import numpy as np
from transformers import AutoTokenizer
from google.colab import drive

def set_seed(seed=123):
    torch.manual_seed(seed)
    random.seed(seed)
    np.random.seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
set_seed(123)
h
# Ensure CUDA is available and set global device (Colab: Runtime -> Change runtime type -> GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if device.type != 'cuda':
    raise RuntimeError("CUDA device not available. Please enable GPU runtime in Colab: Runtime -> Change runtime type -> GPU.")
print('Using device:', device)

drive.mount('/content/drive')
print('Google Drive mounted.')

## 2. Load and Validate Datasets
- Load pretraining dataset from Google Drive (`[BOS]... [EOS]` format)
- Load fine-tuning dataset from Google Drive (no `[BOS]`/`[EOS]` tokens in `response`)
- Validate schema and completeness
- Perform automated checks as per constitution.md

In [None]:
# Load pretraining dataset (.txt)
pretrain_path = '/content/drive/MyDrive/PhD/GourmetGPT/Dataset/structured_recipes_pretrain.txt'
with open(pretrain_path, 'r', encoding='utf-8') as f:
    pretrain_data = f.read().split('[EOS]')
pretrain_data = [r.strip() for r in pretrain_data if r.strip()]
print(f'Loaded {len(pretrain_data)} pretraining recipes.')

# Validate pretraining schema
for i, recipe in enumerate(pretrain_data[:3]):
    assert '[BOS]' in recipe and 'Title:' in recipe and 'Ingredients:' in recipe and 'Instructions:' in recipe, f"Schema error in recipe {i}"
print('Pretraining dataset schema validated.')

# Load fine-tuning dataset (.jsonl)
import json
finetune_path = '/content/drive/MyDrive/PhD/GourmetGPT/Dataset/structured_recipes_finetune.jsonl'
finetune_data = []
with open(finetune_path, 'r', encoding='utf-8') as f:
    for line in f:
        obj = json.loads(line)
        finetune_data.append(obj)
print(f'Loaded {len(finetune_data)} fine-tuning samples.')

# Validate fine-tuning schema (no [BOS]/[EOS] required)
for i, sample in enumerate(finetune_data[:3]):
    assert 'instruction' in sample and 'response' in sample, f"Schema error in sample {i}"
    assert 'Title:' in sample['response'] and 'Ingredients:' in sample['response'] and 'Instructions:' in sample['response'], f"Response schema error in sample {i}"
print('Fine-tuning dataset schema validated.')

## 3. Design and Implement GPT Architecture
- Define the GPT model architecture using PyTorch
- Configure tokenizer and model configuration parameters

In [None]:
# Define GPT model architecture (simplified, for demonstration)
import torch.nn as nn

# vocab_size is derived from the tokenizer (created earlier)
print(f'Using tokenizer vocab_size = {vocab_size}')

class GPTModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.transformer = nn.Transformer(
            d_model=embed_dim,
            nhead=num_heads,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            batch_first=True
        )
        self.fc_out = nn.Linear(embed_dim, vocab_size)
    def forward(self, x):
        x = self.embedding(x)
        x = self.transformer(x, x)
        return self.fc_out(x)

# Example config (now derived from tokenizer)
embed_dim = 256
num_heads = 8
num_layers = 6
model = GPTModel(vocab_size, embed_dim, num_heads, num_layers)
print(model)

# If tokenizer is changed (e.g., added special tokens) after model creation,
# resize embedding and output layers to match new vocab size while preserving weights where possible.
new_vocab = len(tokenizer)
if new_vocab != model.embedding.num_embeddings:
    old_embed = model.embedding
    new_embed = nn.Embedding(new_vocab, embed_dim)
    # copy weights for overlapping indices
    num_to_copy = min(old_embed.num_embeddings, new_embed.num_embeddings)
    new_embed.weight.data[:num_to_copy] = old_embed.weight.data[:num_to_copy].clone()
    model.embedding = new_embed
    # adjust output layer
    model.fc_out = nn.Linear(embed_dim, new_vocab)
    print(f'Resized embedding and output layer to new vocab size: {new_vocab}')

In [None]:
# Pre-flight tests: verify tokenizer, embeddings, and a dummy forward pass
print('Running pre-flight checks...')
# 1) pad token present and set
assert tokenizer.pad_token is not None, 'tokenizer.pad_token is None'
print(f'pad_token: {tokenizer.pad_token}')
# 2) vocab vs embeddings
assert vocab_size == model.embedding.num_embeddings, f'vocab_size ({vocab_size}) != embedding.num_embeddings ({model.embedding.num_embeddings})'
print('vocab_size matches model.embedding.num_embeddings')
# 3) dummy batched forward pass (small shapes to validate indices)
import torch
# ensure inputs are created on the same device as the model
device = next(model.parameters()).device
batch = torch.randint(0, vocab_size, (2, 8), device=device)
model.eval()
with torch.no_grad():
    out = model(batch)
assert out.shape == (2, 8, vocab_size), f'Unexpected output shape: {out.shape}'
print('Dummy forward pass success:', out.shape)
print('Pre-flight checks passed.')

## 4. Train Model and Save Checkpoints
- Train the model on the loaded datasets
- Periodically save checkpoints to Google Drive
- Log training metrics

In [None]:
# Optimized training loop for A100 GPU with mixed precision
from torch.utils.data import DataLoader, Dataset
# Use the global `device` set earlier (must be CUDA)
model = model.to(device)
class RecipeDataset(Dataset):
    def __init__(self, texts, tokenizer, max_length=256):
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_length = max_length
    def __len__(self):
        return len(self.texts)
    def __getitem__(self, idx):
        tokens = self.tokenizer(self.texts[idx], truncation=True, padding='max_length', max_length=self.max_length, return_tensors='pt')
        return tokens['input_ids'].squeeze(0)
train_dataset = RecipeDataset(pretrain_data, tokenizer)
# Increase batch size for A100 (try 32, adjust if OOM)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, pin_memory=True)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scaler = torch.cuda.amp.GradScaler()
model.train()
for epoch in range(1):  # Demo: 1 epoch
    for batch_idx, batch in enumerate(train_loader):
        batch = batch.to(device)
        optimizer.zero_grad()
        with torch.cuda.amp.autocast():
            output = model(batch)
            loss = nn.CrossEntropyLoss()(output.view(-1, vocab_size), batch.view(-1))
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        if batch_idx % 10 == 0:
            print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
        # Save checkpoint every 100 batches
        if batch_idx % 100 == 0:
            torch.save(model.state_dict(), '/content/drive/MyDrive/PhD/GourmetGPT/model_state_dict.pth')
print('Training complete. Final checkpoint saved.')

## 5. Export Model Artifacts
- Export trained model state_dict, tokenizer, and config files as artifacts for deployment

In [None]:
# Export model artifacts
model_path = '/content/drive/MyDrive/PhD/GourmetGPT/model_state_dict.pth'
tokenizer_path = '/content/drive/MyDrive/PhD/GourmetGPT/tokenizer.json'
config_path = '/content/drive/MyDrive/PhD/GourmetGPT/config.json'

# Save model state_dict
torch.save(model.state_dict(), model_path)
print(f'Model weights saved to {model_path}')

# Save tokenizer
if hasattr(tokenizer, 'save_pretrained'):
    tokenizer.save_pretrained('/content/drive/MyDrive/PhD/GourmetGPT/')
    print(f'Tokenizer saved to {tokenizer_path}')
else:
    import json
    with open(tokenizer_path, 'w') as f:
        json.dump(tokenizer.__dict__, f)
    print(f'Tokenizer config saved to {tokenizer_path}')

# Save config
config = {
    'vocab_size': vocab_size,
    'embed_dim': embed_dim,
    'num_heads': num_heads,
    'num_layers': num_layers
}
with open(config_path, 'w') as f:
    json.dump(config, f)
print(f'Model config saved to {config_path}')

## 6. Unit Tests for Model and Data
- Implement unit tests in notebook cells to verify model components and data integrity

In [None]:
# Unit test: Model output shape
# create sample input on same device as model to avoid device mismatch
device = next(model.parameters()).device
sample_input = torch.randint(0, vocab_size, (2, 256), device=device)  # batch_size=2, seq_len=256
output = model(sample_input)
assert output.shape == (2, 256, vocab_size), f"Unexpected output shape: {output.shape}"
print('Model output shape test passed.')

# Unit test: Data integrity
assert all(isinstance(r, str) for r in pretrain_data), "Pretraining data not all strings."
assert all('instruction' in s and 'response' in s for s in finetune_data), "Fine-tuning data missing keys."
print('Data integrity tests passed.')

## 7. Quantize Model Weights
- Quantize the trained model weights to 4-bit precision for edge device deployment

In [None]:
# Section 7: Quantize Model Weights (commented out for now)
# We keep this block as a reference. Disabled to avoid interference
# with current inference/debugging while device/index issues are resolved.
#
# Example quantization (COMMENTED):
# import torch
# # Placeholder quantization library import
# # from quant_lib import quantize_model_int4
#
# def quantize_and_save(model, path):
#     """Quantize model weights to int4 and save to `path`.
#     NOTE: Adjust to the quantization library and device requirements.
#     """
#     # Many quant libs expect model to be on CPU
#     model_cpu = model.cpu()
#     # qmodel = quantize_model_int4(model_cpu)
#     # torch.save(qmodel.state_dict(), path)
#     # print(f"Quantized model saved to {path}")
#
# # Example usage (disabled):
# # quantize_and_save(model, '/content/drive/MyDrive/PhD/GourmetGPT/model_state_dict_quantized.pth')
#
# End of commented quantization block.

## 8. Offline Inference Test
- Load exported artifacts and run offline recipe generation tests to validate inference on CPU

In [None]:
# Section: Offline inference test (device-safe)
import torch
import torch.nn.functional as F

# Choose device (CUDA if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Load state dict safely (map to CPU first, then move model to device)
state = torch.load(model_path, map_location='cpu')
model.load_state_dict(state)
model.to(device)
model.eval()

def generate_recipe(prompt, tokenizer, model, max_length=256, temperature=1.0, top_k=50):
    """Device-safe simple autoregressive generator.
    Returns decoded text (skip_special_tokens=True).
    """
    model.eval()
    device = next(model.parameters()).device
    # Tokenize and move to model device
    inputs = tokenizer(prompt, return_tensors='pt', add_special_tokens=False)
    input_ids = inputs['input_ids'].to(device, dtype=torch.long)

    # Determine eos token id if available
    eos_id = getattr(tokenizer, 'eos_token_id', None)
    if eos_id is None and tokenizer.eos_token:
        eos_id = tokenizer.convert_tokens_to_ids(tokenizer.eos_token)

    with torch.no_grad():
        for _ in range(max_length):
            outputs = model(input_ids)
            # Support different model return types
            if isinstance(outputs, (list, tuple)):
                logits = outputs[0]
            elif hasattr(outputs, 'logits'):
                logits = outputs.logits
            else:
                logits = outputs

            next_logits = logits[:, -1, :]

            # Top-k filtering
            if top_k and top_k > 0:
                values, indices = torch.topk(next_logits, top_k, dim=-1)
                probs = torch.zeros_like(next_logits).to(device)
                probs.scatter_(1, indices, F.softmax(values / max(temperature, 1e-8), dim=-1))
            else:
                probs = F.softmax(next_logits / max(temperature, 1e-8), dim=-1)

            next_token = torch.multinomial(probs, num_samples=1)
            next_token = next_token.to(device, dtype=torch.long)

            input_ids = torch.cat([input_ids, next_token], dim=1)

            if eos_id is not None and next_token.item() == eos_id:
                break

    generated = input_ids[0].cpu().tolist()
    return tokenizer.decode(generated, skip_special_tokens=True)

sample_prompt = "Generate a vegetarian pizza recipe."
recipe = generate_recipe(sample_prompt, tokenizer, model)
print('Generated Recipe:', recipe)

## 9. Zip and Download Artifacts
- Create a zip archive of all model artifacts and provide a cell to download them for handover

In [None]:
# Cell 9: Safe zip & download artifacts
import os
import zipfile
from pathlib import Path

# --- Configuration: ensure these variables exist in your notebook scope ---
# Expected variables (set earlier in the notebook):
#   - zip_dir (folder to save the zip to)
#   - model_path (path to model_state_dict.pth)
#   - tokenizer_path (path to tokenizer.json or folder containing tokenizer files)
#   - config_path (path to config.json)
#   - quantized_path (optional, may be None)

# Normalize vars from globals
def get_var(name, default=None):
    return globals().get(name, default)

zip_dir = Path(get_var('zip_dir', None) or os.environ.get('ZIP_DIR') or '')
model_path = get_var('model_path', None)
tokenizer_path = get_var('tokenizer_path', None)
config_path = get_var('config_path', None)
quantized_path = get_var('quantized_path', None)

if not zip_dir:
    raise RuntimeError(
)

zip_dir = Path(zip_dir)
zip_dir.mkdir(parents=True, exist_ok=True)

# Provide sensible defaults if some paths are None
if model_path is None:
    model_path = str(zip_dir / 'model_state_dict.pth')
if tokenizer_path is None:
    tokenizer_path = str(zip_dir / 'tokenizer.json')
if config_path is None:
    config_path = str(zip_dir / 'config.json')
if quantized_path is None:
    quantized_path = str(zip_dir / 'model_state_dict_quantized.pth')

model_path = Path(model_path)
tokenizer_path = Path(tokenizer_path)
config_path = Path(config_path)
quantized_path = Path(quantized_path)

# Try to save missing artifacts from in-memory objects, if available
try:
    import torch
    if 'model' in globals() and not model_path.exists():
        try:
            torch.save(globals()['model'].state_dict(), model_path)
            print(f'Saved model state to {model_path}')
        except Exception as e:
            print(f'Could not save model state: {e}')
except Exception:
    pass

# Save tokenizer if tokenizer object exists and tokenizer file missing
try:
    tokenizer = globals().get('tokenizer', None)
    if tokenizer is not None and not tokenizer_path.exists():
        if hasattr(tokenizer, "save_pretrained"):
            try:
                save_dir = zip_dir / 'tokenizer_files'
                save_dir.mkdir(parents=True, exist_ok=True)
                tokenizer.save_pretrained(save_dir)
                if (save_dir / 'tokenizer.json').exists():
                    tokenizer_path = save_dir / 'tokenizer.json'
                else:
                    tokenizer_path = save_dir
                print(f'Saved tokenizer to {save_dir}')
            except Exception as e:
                print(f'Tokenizer save_pretrained failed: {e}')
        else:
            try:
                if hasattr(tokenizer, "to_json") or hasattr(tokenizer, "to_str"):
                    data = tokenizer.to_json() if hasattr(tokenizer, "to_json") else tokenizer.to_str()
                    tokenizer_path.write_text(data, encoding='utf-8')
                    print(f'Wrote tokenizer JSON to {tokenizer_path}')
            except Exception as e:
                print(f'Could not serialize tokenizer: {e}')
except Exception:
    pass

# Required artifacts (must exist)
required = {
    'model_state_dict.pth': model_path,
    'config.json': config_path,
}

tokenizer_is_dir = tokenizer_path.is_dir()
if tokenizer_is_dir:
    tokenizer_entries = [(str(p.relative_to(tokenizer_path.parent)), p) for p in tokenizer_path.rglob('*') if p.is_file()]
else:
    required['tokenizer.json'] = tokenizer_path

optional = {
    'model_state_dict_quantized.pth': quantized_path if quantized_path.exists() else None
}

missing_required = [name for name, p in required.items() if not Path(p).exists()]
if missing_required:
    raise FileNotFoundError(
        "Missing required artifact(s): " + 
        
        
        
    )

# Build list of archive entries
to_zip = []
for arcname, path in required.items():
    p = Path(path)
    to_zip.append((arcname, p))

if tokenizer_is_dir:
    for arc_rel, p in tokenizer_entries:
        arcname = f"tokenizer_files/{Path(arc_rel).name}"
        to_zip.append((arcname, p))
elif 'tokenizer.json' in required and Path(required['tokenizer.json']).exists():
    to_zip.append(('tokenizer.json', Path(required['tokenizer.json'])))

if optional.get('model_state_dict_quantized.pth'):
    to_zip.append(('model_state_dict_quantized.pth', Path(optional['model_state_dict_quantized.pth'])))

zip_path = zip_dir / "gourmetgpt_artifacts.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
    for arcname, filepath in to_zip:
        try:
            zf.write(filepath, arcname=arcname)
            print(f"Added {filepath} as {arcname}")
        except Exception as e:
            print(f"Warning: could not add {filepath}: {e}")

print(f









    print(f"Download the zip manually from your Drive: {zip_path}")    print(f"Colab download not available or failed: {e}")except Exception as e:    colab_files.download(str(zip_path))    print("Attempting to download zip via Colab (may be blocked for large files)...")    from google.colab import files as colab_filestry:# Try to download in Colab (best-effort)Created zip: {zip_path} ({zip_path.stat().st_size / (1024*1024):.2f} MB)")

In [None]:
# Print total number of trainable parameters in the model
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters: {total_params:,}")

## 10. Training Metrics and Visualization for IEEE Reporting
- Track and visualize training loss and perplexity
- Example plots for inclusion in IEEE papers

In [None]:
# Track and visualize training loss and perplexity for IEEE reporting
import matplotlib.pyplot as plt
import math
losses = []  # Collect loss values during training
# Example: Append loss.item() inside your training loop
# losses.append(loss.item())
if losses:
    plt.figure(figsize=(8,4))
    plt.plot(losses, label='Training Loss')
    plt.xlabel('Batch')
    plt.ylabel('Loss')
    plt.title('Training Loss Curve')
    plt.legend()
    plt.grid(True)
    plt.show()
    avg_loss = sum(losses) / len(losses)
    perplexity = math.exp(avg_loss)
    print(f'Final Perplexity: {perplexity:.2f}')
else:
    print('No loss data collected. Please append loss values during training.')

## 11. Additional Metrics for Constitution Compliance
- Ingredient F1 Score
- Human Evaluation
- Model Footprint (parameters, memory)
- Latency (inference time)

In [None]:
# Ingredient F1 Score (example implementation)
def ingredient_f1(predicted, reference):
    pred_set = set(predicted.lower().split(','))
    ref_set = set(reference.lower().split(','))
    tp = len(pred_set & ref_set)
    fp = len(pred_set - ref_set)
    fn = len(ref_set - pred_set)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    return f1
predicted_ingredients = "egg, spinach, tomato, cheese"
reference_ingredients = "egg, tomato, cheese, onion"
f1_score = ingredient_f1(predicted_ingredients, reference_ingredients)
print(f'Ingredient F1 Score: {f1_score:.2f}')

# Human Evaluation (manual entry example)
human_scores = [4, 5, 3, 4]  # Example: ratings from 1-5
avg_human_score = sum(human_scores) / len(human_scores)
print(f'Average Human Evaluation Score: {avg_human_score:.2f}')

# Model Footprint (parameters, memory)
import torch
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
model_size_mb = sum(p.element_size() * p.nelement() for p in model.parameters()) / (1024 ** 2)
print(f'Total parameters: {total_params:,}')
print(f'Model size (MB): {model_size_mb:.2f}')

# Latency (inference time)
import time
prompt = "Healthy breakfast recipe"
input_ids = tokenizer(prompt, return_tensors='pt')['input_ids']
start_time = time.time()
with torch.no_grad():
    output = model(input_ids)
latency = time.time() - start_time
print(f'Inference latency: {latency*1000:.2f} ms')