<a href="https://colab.research.google.com/github/Jayasurya227/Build-Your-Own-GPT-Creating-a-Custom-Text-Generation-Engine./blob/main/Build_Your_Own_GPT_Creating_a_Custom_Text_Generation_Engine.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Google Drive Mount**

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**Library Installation And Data Loading**

In [3]:
# !pip install datasets

import warnings
warnings.filterwarnings("ignore")

import re
import torch
import random
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from datasets import load_dataset
from transformers import GPT2Tokenizer

dataset = load_dataset("roneneldan/TinyStories", split="train", streaming=True)

**Tiny Stories And Stream DataSet Class**

In [2]:
from torch.utils.data import IterableDataset

class TinyStoriesStreamDataset(IterableDataset):
    def __init__(self, dataset_stream, tokenizer, block_size=512, min_length=30):
        self.dataset = dataset_stream
        self.tokenizer = tokenizer
        self.block_size = block_size
        self.min_length = min_length

    def __iter__(self):
        for sample in self.dataset:
            text = sample["text"].strip()
            if len(text) < self.min_length:
                continue

            text = re.sub(r'\s+', ' ', text)
            text = re.sub(r'[“”]', '"', text)
            text = re.sub(r"[‘’]", "'", text)
            text = re.sub(r'[^a-zA-Z0-9.,!?\'"\s]', '', text)

            tokenized = self.tokenizer(
                text,
                truncation=True,
                add_special_tokens=True,
                padding="max_length",
                max_length=self.block_size,
                return_tensors="pt"
            )

            input_ids = tokenized["input_ids"][0]
            attention_mask = tokenized["attention_mask"][0]

            yield {
                "input_ids": input_ids[:-1],
                "labels": input_ids[1:],
                "attention_mask": attention_mask[:-1]
            }

**Load Tokenizer,Data Loader,Model And Optimizer Setup**

In [5]:
from transformers import GPT2Tokenizer
from torch.utils.data import DataLoader
from torch.optim import AdamW
from transformers import GPT2Config, GPT2LMHeadModel
from tqdm.auto import tqdm
import torch


total_samples = 2119719
batch_size = 52
max_batches_per_epoch = total_samples // batch_size


tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token

stream_dataset = TinyStoriesStreamDataset(dataset, tokenizer)
train_loader = DataLoader(stream_dataset, batch_size=batch_size)

config = GPT2Config(
    vocab_size=len(tokenizer),
    n_positions=512,
    n_ctx=512,
    n_embd=256,
    n_layer=4,
    n_head=4,
    pad_token_id=tokenizer.pad_token_id)


model = GPT2LMHeadModel(config)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs")
    model = torch.nn.DataParallel(model)

optimizer = AdamW(model.parameters(), lr=5e-5)

**Training Loop And CheckPoint And Sampling**

In [None]:
from pathlib import Path
import json
from tqdm.auto import tqdm
from torch.nn.utils import clip_grad_norm_

# Define checkpoint directory
checkpoint_dir = Path("/content/drive/MyDrive/TinyLLM/model/")

epochs = 10
history = []

model.train()

for epoch in range(epochs):
    print(f"\nEpoch {epoch + 1}/{epochs}")
    epoch_loss = 0.0

    for i, batch in enumerate(tqdm(train_loader, total=max_batches_per_epoch)):
        if i >= max_batches_per_epoch:
            break

        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        attention_mask = batch["attention_mask"].to(device)

        outputs = model(input_ids=input_ids, labels=labels, attention_mask=attention_mask)
        loss = outputs.loss

        optimizer.zero_grad()
        loss.backward()
        clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        epoch_loss += loss.item()

    avg_loss = epoch_loss / max_batches_per_epoch
    history.append(avg_loss)
    print(f"Average Loss: {avg_loss:.4f}")

    # Save model after every epoch
    epoch_checkpoint = checkpoint_dir / f"tinygpt2_epoch{epoch+1}"
    epoch_checkpoint.mkdir(parents=True, exist_ok=True)
    model.save_pretrained(epoch_checkpoint)
    tokenizer.save_pretrained(epoch_checkpoint)
    print(f"Model checkpoint saved at {epoch_checkpoint}")

    # Generate sample output
    model.eval()
    sample_input = tokenizer.encode("Once upon a time", return_tensors="pt").to(device)
    generated_ids = model.generate(
        sample_input,
        max_length=50,
        num_return_sequences=1,
        no_repeat_ngram_size=2,
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id
    )
    generated_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
    print(f"Sample Output:\n{generated_text}")
    model.train()

history_path = Path("/content/drive/MyDrive/TinyLLM/training_history.json")
with open(history_path, "w") as f:
    json.dump(history, f)
print(f"\nTraining history saved to {history_path}")

**Resume Training And Check Point**

In [None]:
from pathlib import Path
from tqdm.auto import tqdm
from torch.nn.utils import clip_grad_norm_
from transformers import GPT2Tokenizer
from torch.utils.data import DataLoader
from torch.optim import AdamW
from transformers import GPT2Config, GPT2LMHeadModel
from tqdm.auto import tqdm
import torch

# Load model and tokenizer from checkpoint (epoch 6)
checkpoint_path = Path("/content/drive/MyDrive/TinyLLM/model/tinygpt2_epoch6")
model = GPT2LMHeadModel.from_pretrained(checkpoint_path)
tokenizer = GPT2Tokenizer.from_pretrained(checkpoint_path)

total_samples = 2119719
batch_size = 52
max_batches_per_epoch = total_samples // batch_size

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs")
    model = torch.nn.DataParallel(model)

# Optimizer
optimizer = AdamW(model.parameters(), lr=5e-5)

# Training parameters
checkpoint_dir = Path("/content/drive/MyDrive/TinyLLM/model/")
epochs = 12  # Continue up to epoch 10
start_epoch = 6  # Start from epoch 6

model.train()

for epoch in range(start_epoch, epochs):
    print(f"\nEpoch {epoch + 1}/{epochs}")
    epoch_loss = 0.0

    for i, batch in enumerate(tqdm(train_loader, total=max_batches_per_epoch)):
        if i >= max_batches_per_epoch:
            break

        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)
        attention_mask = batch["attention_mask"].to(device)

        outputs = model(input_ids=input_ids, labels=labels, attention_mask=attention_mask)
        loss = outputs.loss

        optimizer.zero_grad()
        loss.backward()
        clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        epoch_loss += loss.item()

    avg_loss = epoch_loss / max_batches_per_epoch
    print(f"Average Loss: {avg_loss:.4f}")

    # Save model after each epoch
    epoch_checkpoint = checkpoint_dir / f"tinygpt2_epoch{epoch+1}"
    epoch_checkpoint.mkdir(parents=True, exist_ok=True)
    model.save_pretrained(epoch_checkpoint)
    tokenizer.save_pretrained(epoch_checkpoint)
    print(f"Model checkpoint saved at {epoch_checkpoint}")

    # Generate sample output
    model.eval()
    sample_input = tokenizer.encode("Once upon a time", return_tensors="pt").to(device)
    generated_ids = model.generate(
        sample_input,
        max_length=50,
        num_return_sequences=1,
        no_repeat_ngram_size=2,
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id
    )
    generated_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
    print(f"Sample Output:\n{generated_text}")
    model.train()

**Ganarate Text From  a Saved GPT-2 Check Point**

In [None]:
from transformers import GPT2Tokenizer, GPT2LMHeadModel

model_directory = "epoch_5"

tokenizer = GPT2Tokenizer.from_pretrained(model_directory)
model = GPT2LMHeadModel.from_pretrained(model_directory)


def generate(input_text, max_len):

  tokenizer.pad_token = tokenizer.eos_token

  inputs = tokenizer(
      input_text,
      return_tensors='pt',
      padding=True,
      return_attention_mask=True
  )

  output = model.generate(
      input_ids=inputs['input_ids'],
      attention_mask=inputs['attention_mask'],
      max_length=max_len
  )

  generated_text = tokenizer.decode(output[0], skip_special_tokens=True)
  return generated_text

print(generate("Once there was little boy",30))
print(generate("Once there was little girl",30))
print(generate("Once there was a cute",30))
print(generate("Once there was a cute little",30))
print(generate("Once there was a handsome",30))

**Inferance With PreTrained Tiny Stories Model**

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig

model = AutoModelForCausalLM.from_pretrained('roneneldan/TinyStories-3M')

tokenizer = AutoTokenizer.from_pretrained("EleutherAI/gpt-neo-125M")

prompt = "Once upon a time there was"


def generate(input_text, max_len):

  tokenizer.pad_token = tokenizer.eos_token

  inputs = tokenizer(
      input_text,
      return_tensors='pt',
      padding=True,
      return_attention_mask=True
  )

  output = model.generate(
      input_ids=inputs['input_ids'],
      attention_mask=inputs['attention_mask'],
      max_length=max_len
  )

  generated_text = tokenizer.decode(output[0], skip_special_tokens=True)
  return generated_text

  return output_text

print(generate("Once there was little boy",30))
print(generate("Once there was little girl",30))
print(generate("Once there was a cute",30))
print(generate("Once there was a cute little",30))
print(generate("Once there was a handsome",30))