<a href="https://colab.research.google.com/github/BraedynL0530/PortfolioWebsite/blob/master/NLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# README Summary Generator - FIXED VERSION
# Key fixes: Progress visibility, faster model, better error handling

# SETUP
from google.colab import drive
drive.mount('/content/drive')

!pip install -q transformers torch accelerate safetensors

# CONFIGURATION
import json
import os
import sys
from pathlib import Path
from tqdm.auto import tqdm
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

# Force output flush for Colab
sys.stdout.flush()

# Paths
DRIVE_BASE = '/content/drive/MyDrive/readme_training'
INPUT_FILE = f'{DRIVE_BASE}/training_data.json'
CHECKPOINT_FILE = f'{DRIVE_BASE}/summaries_checkpoint.json'
OUTPUT_FILE = f'{DRIVE_BASE}/summaries_final.json'

os.makedirs(DRIVE_BASE, exist_ok=True)

# CRITICAL: Use a smaller, faster model for Colab
# Pick ONE (uncomment it):

MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"

BATCH_SIZE = 1  # Process one at a time for better progress tracking
CHECKPOINT_INTERVAL = 5  # Save every 5 summaries

print(f"‚úÖ Model: {MODEL_NAME}")
print(f"‚úÖ Checkpoint every: {CHECKPOINT_INTERVAL}")
print(f"‚úÖ Drive path: {DRIVE_BASE}")
sys.stdout.flush()

# LOAD MODEL
print("\nüì¶ Loading model... (this may take 2-5 minutes)")
sys.stdout.flush()

try:
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        device_map="auto",
        dtype=torch.float16,
        low_cpu_mem_usage=True
    )

    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    print("‚úÖ Model loaded successfully!")
    print(f"‚úÖ Using device: {model.device}")
    sys.stdout.flush()

except Exception as e:
    print(f"‚ùå MODEL LOAD FAILED: {e}")
    print("Try using: TinyLlama/TinyLlama-1.1B-Chat-v1.0")
    sys.stdout.flush()
    raise

# LOAD DATA
print("\nüìÇ Loading README data...")
sys.stdout.flush()

with open(INPUT_FILE, 'r') as f:
    readmes_data = json.load(f)

print(f"‚úÖ Loaded {len(readmes_data)} READMEs")
sys.stdout.flush()

# Load checkpoint
try:
    with open(CHECKPOINT_FILE, 'r') as f:
        processed_summaries = json.load(f)
    processed_indices = {s['id'] for s in processed_summaries}
    print(f"üìã Resuming: {len(processed_summaries)} already done")
except FileNotFoundError:
    processed_summaries = []
    processed_indices = set()
    print("üìã Starting fresh")

sys.stdout.flush()

# Add unique IDs if missing
for i, item in enumerate(readmes_data):
    if 'id' not in item:
        item['id'] = i

remaining = [r for r in readmes_data if r['id'] not in processed_indices]
print(f"üìä Remaining: {len(remaining)}")
sys.stdout.flush()

# SUMMARY GENERATOR
def generate_summary(readme_text, max_length=1500):
    """Generate summary with better error handling"""

    # Truncate
    if len(readme_text) > max_length:
        readme_text = readme_text[:max_length] + "..."

    # Improved prompt for better technical summaries
    prompt = f"""Summarize this README in 4 lines:
1. Purpose:
2. Languages:
3. Frameworks:
4. Features:

If information is missing write "not specified". Do not make up information.

{readme_text}

Summary:
1. Purpose:"""

    try:
        inputs = tokenizer(
            prompt,
            return_tensors="pt",
            truncation=True,
            max_length=2048
        ).to(model.device)

        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=100,  # Reduced for speed
                temperature=0.7,
                do_sample=True,
                top_p=0.9,
                pad_token_id=tokenizer.pad_token_id,
                eos_token_id=tokenizer.eos_token_id
            )

        full_output = tokenizer.decode(outputs[0], skip_special_tokens=True)

        # Extract summary - now looking for "Technical Summary:"
        if "Technical Summary:" in full_output:
            summary = full_output.split("Technical Summary:")[-1].strip()
        elif "Summary:" in full_output:
            summary = full_output.split("Summary:")[-1].strip()
        else:
            summary = full_output[len(prompt):].strip()

        # Clean up
        summary = summary.replace('\n', ' ').strip()

        # Remove any trailing incomplete sentences (but keep full summary)
        # Only trim if it's unreasonably long (over 1000 chars)
        if len(summary) > 1000:
            # Try to cut at last sentence
            last_period = summary[:1000].rfind('.')
            if last_period > 500:
                summary = summary[:last_period + 1]

        return summary

    except Exception as e:
        print(f"Generation error: {e}")
        return f"Error generating summary: {str(e)[:100]}"

# MAIN PROCESSING LOOP
print("\nüöÄ Starting generation...\n")
print("=" * 60)
sys.stdout.flush()

for i, readme_data in enumerate(remaining):
    try:
        print(f"\n[{i+1}/{len(remaining)}] Processing: {readme_data.get('repo_name', 'Unknown')}")
        sys.stdout.flush()

        # Generate
        summary = generate_summary(readme_data['readme'])

        # Save result
        readme_data['summary'] = summary
        processed_summaries.append(readme_data)

        # Show FULL summary for first 5, then preview for rest
        if i < 5:
            print(f"   ‚úì FULL: {summary}")
        else:
            preview = summary[:100] + "..." if len(summary) > 100 else summary
            print(f"   ‚úì {preview}")
        sys.stdout.flush()

        # Checkpoint
        if (i + 1) % CHECKPOINT_INTERVAL == 0:
            with open(CHECKPOINT_FILE, 'w') as f:
                json.dump(processed_summaries, f, indent=2)
            print(f"\nüíæ CHECKPOINT SAVED: {len(processed_summaries)} summaries")
            print("=" * 60)
            sys.stdout.flush()

    except Exception as e:
        print(f"\n‚ùå FAILED {readme_data.get('repo_name', 'Unknown')}: {e}")
        sys.stdout.flush()
        continue

# FINAL SAVE
print("\n" + "=" * 60)
print("üíæ Saving final results...")
sys.stdout.flush()

with open(CHECKPOINT_FILE, 'w') as f:
    json.dump(processed_summaries, f, indent=2)

with open(OUTPUT_FILE, 'w') as f:
    json.dump(processed_summaries, f, indent=2)

print(f"""
‚ú® COMPLETE! ‚ú®

üìä Stats:
   Total: {len(processed_summaries)} summaries
   Checkpoint: {CHECKPOINT_FILE}
   Final: {OUTPUT_FILE}

üéØ Next: Download from Google Drive and train your model!
""")
sys.stdout.flush()

# PREVIEW
print("\nüìã Sample summaries:")
for i, item in enumerate(processed_summaries[:3]):
    print(f"\n{i+1}. {item.get('repo_name', 'Unknown')} ({item.get('stars', 0)} ‚≠ê)")
    print(f"   {item['summary']}")
sys.stdout.flush()

In [3]:
from torch.autograd import forward_ad
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import math
from dataclasses import  dataclass
torch.manual_seed(42)

#config
@dataclass
class config:
  vocab_size: int  #unique words
  block_size: int  #how far back(context) it can see, memory/ how many tokens back
  n_layer: int   # stacked blocks, more layers more reasoning more train time
  n_head: int   # attentions per layer, how many "eyes" looking for a new pattern
  n_embd: int   #size of vector for each token
  dropout: float  #prevents overfitting by stopping random paths
  pad_token_id: int



"""
self attention: part 1 of transformer
Q K V, query key value. helps use the two embeddings to learn diffrent meanings for words and give the diffrent vectors even if the same word
below is theory class is optimized, it condences the prjections into one huge vector and splits. other than that its nearly identical just more efficent
"""

"""
#learnable compenets
q_prog = nn.Linear(C, C, bias =False)
k_prog = nn.Linear(C, C, bias =False)
v_prog = nn.Linear(C, C, bias =False)

#weights
q_prog.weight.data = torch.randn(C,C)
q_prog.weight.data = torch.randn(C,C)
q_prog.weight.data = torch.randn(C,C)

#preform projection
q = q_prog(x)
k = k_prog(x)
v = v_prog(x)

scores = q @ k.transpose(-2,-1)
print("scores",scores)



Attention(Q,K,V)=softmax(‚ÄãQK^‚ä§/dk‚Äã‚Äã)V

d_k = k.size(-1)#last dimesion of
scaled_scores = scores / math.sqrt(d_k)
attention_weights = F.softmax(scaled_scores, dim=1)
print("scaled scores", scaled_scores)
print("scaled scores -> percentages", attention_weights)

# aggreation Last part of attention!
output = attention_weights @ v
print("output!:",output)

"""

# Core logic for MultiHead
class CausalSelfAttention(nn.Module):
  def __init__(self, config :config):
    super().__init__()
    assert config.n_embd % config.n_head == 0
    self.n_head = config.n_head
    self.n_embd = config.n_embd
    self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=False) # Fuzed layer = more efficent
    self.attn_drop = nn.Dropout(config.dropout)
    self.register_buffer( # part of causal masking
        "bias",# buffer name
        torch.tril(torch.ones(config.block_size,config.block_size))
        .view(1,1, config.block_size, config.block_size)
    )

    self.c_proj = nn.Linear(config.n_embd,config.n_embd)

  def forward(self, x,pad_mask=None):
    B, T, C = x.size()
    head_dim = C // self.n_head

    # project once -> split
    qkv = self.c_attn(x)
    q, k, v = qkv.split(C, dim=2)

    # reshape into heads
    q = q.view(B, T, self.n_head, head_dim).transpose(1, 2)
    k = k.view(B, T, self.n_head, head_dim).transpose(1, 2)
    v = v.view(B, T, self.n_head, head_dim).transpose(1, 2)

    # attention
    att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(head_dim))
    att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float("-inf")) # prevents it from seeing future tokens

    # Prevent attending to padding tokens (BEFORE softmax now)
    if pad_mask is not None:
      att = att.masked_fill(
          pad_mask[:, None, None, :T] == 0,
          float("-inf")
      )

    att = F.softmax(att, dim=-1)
    att = self.attn_drop(att)

    # aggregate :3
    y = att @ v

    # merge heads
    y = y.transpose(1, 2).contiguous().view(B, T, C)

    # final projection
    y = self.c_proj(y)
    return y


class MLP(nn.Module):
  def __init__(self, config :config):
    super().__init__()
    self.fc = nn.Linear(config.n_embd, 4 * config.n_embd) #expands dimestions, think of it as more room to think / combining features
    self.proj = nn.Linear(4 * config.n_embd, config.n_embd) # condenses back so it can be added back to attetion
    self.drop = nn.Dropout(config.dropout) #refer to config

  def forward(self, x):
    x = self.fc(x)
    x = F.gelu(x) # makes x nonlinear so fc and proj dont just merge into one straight line
    x =self.proj(x)
    x = self.drop(x)

    return x

class Block(nn.Module): #residual connection
  def __init__(self, config : config): #litterly just does f(x) + x instead of f(x) so mlp dosesnt relearn it takes the learned/trained data and keeps it
    super().__init__()
    self.ln_1 = nn.LayerNorm(config.n_embd)
    self.attn = CausalSelfAttention(config)
    self.ln_2 = nn.LayerNorm(config.n_embd)
    self.mlp = MLP(config)

  def forward(self, x, pad_mask=None):  # ‚Üê Added pad_mask parameter
    # focus (the "+")
    x = x + self.attn(self.ln_1(x), pad_mask=pad_mask)  # ‚Üê Pass mask to attention
    x = x + self.mlp(self.ln_2(x))
    return x

In [4]:

class NLP(nn.Module):
  def __init__(self, config: config):
    super().__init__()
    # Input
    self.wte = nn.Embedding(config.vocab_size, config.n_embd)
    self.wpe = nn.Embedding(config.block_size, config.n_embd)
    self.drop = nn.Dropout(config.dropout)
    self.config = config
    self.pad_token_id = config.pad_token_id




    # Processing, makes a stack/block / LAYER for deeper understanding
    # Data flows through sequncesnsy so more refined/better understanding
    self.h = nn.ModuleList([Block(config) for _ in range(config.n_layer)])

    #output layers
    self.ln_f = nn.LayerNorm(config.n_embd) # final layer norm
    self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias = False) #language model head, parrel prediction(linear) makes raw score for each possible next token , good for training, and throws away the
    #rest(all but last vector) if not traning
    # Above makes raw score for each possible next token


    self.lm_head.weight = self.wte.weight

    self.apply(self._init_weights)

  def _init_weights(self, module):
    if isinstance(module, nn.Linear):
        nn.init.normal_(module.weight, mean=0.0, std=0.02)
        if module.bias is not None:
            nn.init.zeros_(module.bias)
    elif isinstance(module, nn.Embedding):
        nn.init.normal_(module.weight, mean=0.0, std=0.02)

  def forward(self, idx, targets= None): #idx = input targets = inputs shifter one pos to left
    B, T = idx.size()

    assert T <= self.config.block_size, f"Sequence length {T} exceeds block_size {self.config.block_size}"

    tok_emb = self.wte(idx)
    tok_emb[idx == self.pad_token_id] = 0
    pos = torch.arange(T, device=idx.device).unsqueeze(0)
    pos_emb = self.wpe(pos)
    x = self.drop(tok_emb + pos_emb)

    pad_mask = (idx != self.pad_token_id).float()  # ‚Üê Mask: 1 for real tokens, 0 for padding

    # Process through transformer blocks
    for block in self.h:
        x = block(x, pad_mask=pad_mask)  # ‚Üê Pass mask through each block

    # Final layer norm
    x = self.ln_f(x)

    # Output logits
    logits = self.lm_head(x)



    loss = None

    if targets is not None:
      # Shift for training
      logits_shifted = logits[:, :-1, :]
      targets_shifted = targets[:, 1:]

      loss = F.cross_entropy(
        logits.reshape(-1, logits.size(-1)),
        targets.reshape(-1),
        ignore_index=-100
    )

      print(f"    Computed loss: {loss.item():.6f}")

    return logits, loss

  @torch.no_grad()
  def summarize(self, idx, max_new_tokens, temperature = 0.7, top_k =50): # <1 rare words more likely, >1 more common words
    #uses max new tokens to create
    for _ in range(max_new_tokens):
      #crop if too long
        if idx.size(1) > self.config.block_size:
            idx_cond = idx[:, -self.config.block_size:]
        else:
            idx_cond = idx

        logits, _ = self(idx_cond)
        logits = logits[:, -1, :] / temperature
        #words used top X words
        if top_k > 0:
            v, _ = torch.topk(logits, top_k)
            logits[logits < v[:, [-1]]] = -float("inf")

        probs = F.softmax(logits, dim=-1)
        next_token = torch.multinomial(probs, num_samples=1)

        idx = torch.cat((idx, next_token), dim=1)

    return idx

In [None]:
# =======================
# FULL TRAINING BLOCK
# =======================

import json
import torch
import joblib
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2TokenizerFast
from tqdm import tqdm
from google.colab import drive
import os


# -------- Drive --------
drive.mount("/content/drive", force_remount=False)

# -------- Device --------
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# -------- Paths --------
DATA_PATH = "/content/drive/MyDrive/readme_training/summaries_clean.json"#summaries_final.json"
MODEL_SAVE_PATH = "/content/drive/MyDrive/readme_model_state.pt"
TOKENIZER_SAVE_PATH = "/content/drive/MyDrive/readme_tokenizer.joblib"

# -------- Tokenizer --------
tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
tokenizer.add_special_tokens({
    "pad_token": "<|pad|>",
    "additional_special_tokens": ["<|sep|>"]
})

PAD_ID = tokenizer.pad_token_id
SEP_ID = tokenizer.convert_tokens_to_ids("<|sep|>")

cfg = config(
    vocab_size=len(tokenizer),
    block_size=256,
    n_layer=6,
    n_head=8,
    n_embd=512,
    dropout=0.1,
    pad_token_id=tokenizer.pad_token_id
)

model = NLP(cfg).to(DEVICE)

# -------- Resize embeddings --------
model.wte = torch.nn.Embedding(len(tokenizer), model.config.n_embd)
model.lm_head.weight = model.wte.weight
model.pad_token_id = PAD_ID
model = model.to(DEVICE)

# -------- Dataset --------
class ReadmeSummaryDataset(Dataset):
    def __init__(self, path, tokenizer, block_size, min_summary_tokens=20):
        with open(path, "r") as f:
            raw_data = json.load(f)

        self.tokenizer = tokenizer
        self.block_size = block_size
        self.data = []

        dropped = 0

        for item in raw_data:
            readme = item.get("readme", "").strip()
            summary = item.get("summary", "").strip()

            if not readme or not summary:
                dropped += 1
                continue

            readme_ids = tokenizer.encode(readme)
            summary_ids = tokenizer.encode(summary)

            if len(summary_ids) < min_summary_tokens:
                dropped += 1
                continue

            # ensure summary actually fits after SEP
            if len(readme_ids) + 1 >= block_size:
                dropped += 1
                continue

            self.data.append(item)

        print(f"üßπ Dataset cleanup:")
        print(f"   Loaded: {len(raw_data)}")
        print(f"   Kept:   {len(self.data)}")
        print(f"   Dropped:{dropped}")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
      item = self.data[idx]

      readme_ids = self.tokenizer.encode(item["readme"])
      summary_ids = self.tokenizer.encode(item["summary"])

      # Input: README <SEP> SUMMARY
      input_ids = readme_ids + [SEP_ID] + summary_ids

      # Target: Ignore README, predict SUMMARY
      # Use -100 (PyTorch's ignore_index default) instead of PAD_ID
      targets = [-100] * len(readme_ids) + [-100] + summary_ids

      # Truncate
      input_ids = input_ids[:self.block_size]
      targets = targets[:self.block_size]

      # Pad
      pad_len = self.block_size - len(input_ids)
      if pad_len > 0:
          input_ids += [PAD_ID] * pad_len
          targets += [-100] * pad_len  # ‚Üê Use -100, not PAD_ID

      return (
          torch.tensor(input_ids, dtype=torch.long),
          torch.tensor(targets, dtype=torch.long),
      )


# -------- DataLoader --------
dataset = ReadmeSummaryDataset(
    DATA_PATH,
    tokenizer,
    model.config.block_size
)

loader = DataLoader(
    dataset,
    batch_size=8,
    shuffle=True,
    pin_memory=True
)

# -------- Optimizer --------
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=5e-5,
    betas=(0.9, 0.95),
    weight_decay=0.1
)

# -------- Training --------
model.train()
EPOCHS = 3
GRAD_CLIP = 1.0

for epoch in range(EPOCHS):
    pbar = tqdm(loader, desc=f"Epoch {epoch}")
    for idx, targets in pbar:
        idx = idx.to(DEVICE)
        targets = targets.to(DEVICE)

        logits, loss = model(idx, targets)

        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), GRAD_CLIP)
        optimizer.step()

        pbar.set_postfix(loss=f"{loss.item():.4f}")

def evaluate(model, tokenizer, path, block_size): #adding eval to fix supervised training issues
    model.eval()
    dataset = ReadmeSummaryDataset(path, tokenizer, block_size)
    loader = DataLoader(dataset, batch_size=4)

    total_loss = 0
    steps = 0

    with torch.no_grad():
        for idx, targets in loader:
            idx = idx.to(DEVICE)
            targets = targets.to(DEVICE)

            _, loss = model(idx, targets)
            total_loss += loss.item()
            steps += 1

    model.train()
    return total_loss / max(steps, 1)


EVAL_PATH = "/content/drive/MyDrive/readme_training/eval_samples.json"

if os.path.exists(EVAL_PATH):
    eval_loss = evaluate(model, tokenizer, EVAL_PATH, model.config.block_size)
    print(f"üß™ Eval loss: {eval_loss:.4f}")
else:
    print("‚ö†Ô∏è No eval file found, skipping evaluation")



# -------- Save model + tokenizer separately --------
torch.save(model.state_dict(), MODEL_SAVE_PATH)
joblib.dump(tokenizer, TOKENIZER_SAVE_PATH)

print("‚úÖ Training complete")
print(f"‚úÖ Model saved to {MODEL_SAVE_PATH}")
print(f"‚úÖ Tokenizer saved to {TOKENIZER_SAVE_PATH}")



In [None]:
import torch
from transformers import GPT2Tokenizer
from google.colab import drive

drive.mount('/content/drive')

FINAL_MODEL_PATH = '/content/drive/MyDrive/readme_model_state.pt'
TOKENIZER_SAVE_PATH = "/content/drive/MyDrive/readme_tokenizer.joblib"

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

tokenizer = joblib.load(TOKENIZER_SAVE_PATH)

cfg = config(
    vocab_size=len(tokenizer)
    block_size=256,
    n_layer=6,
    n_head=8,
    n_embd=512,
    dropout=0.0,
    pad_token_id=tokenizer.pad_token_id
)

model = NLP(cfg).to(DEVICE)
checkpoint = torch.load(FINAL_MODEL_PATH, map_location=DEVICE)
model.load_state_dict(torch.load(FINAL_MODEL_PATH, map_location=DEVICE))
model.eval()



model = model.to(DEVICE)
model.eval()

# Test README
test_readme = """
 React UI Components

A comprehensive React component library for building modern web applications with ease.

Features

Pre-built Components: Includes buttons, forms, modals, tooltips, and navigation components
TypeScript Support: Fully typed components with IntelliSense support
Dark Mode: Built-in theming system with automatic dark mode detection
Responsive Design: Mobile-first components that adapt to any screen size
Accessibility: WCAG 2.1 compliant with proper ARIA labels

Installation


 Quick Start


How It Works

The library uses React hooks and context API for state management. Each component is built with styled-components for CSS-in-JS styling. The theming system uses CSS variables that can be toggled via a ThemeProvider wrapper. All components are tree-shakeable to minimize bundle size.

Documentation

Visit our docs at https://docs.example.com
"""

prompt_ids = tokenizer.encode(test_readme) + [SEP_ID]
tokens = torch.tensor([prompt_ids]).to(DEVICE)

# Generate
with torch.no_grad():
    generated = model.summarize(tokens, max_new_tokens=100, temperature=0.7, top_k=50)

# Decode only the generated part (after SEP)
result = tokenizer.decode(generated[0][len(prompt_ids):], skip_special_tokens=True)
print(f"Summary: {result}")
