In [1]:
import torch
import torch.nn as nn
import json
from transformers import Trainer, TrainingArguments
import torch.nn.functional as F

  from .autonotebook import tqdm as notebook_tqdm





In [2]:
from capas_gpt import TransformerBlock, LayerNorm

In [3]:
class GPTModel(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        self.trf_blocks = nn.Sequential(*[TransformerBlock(cfg) for _ in range(cfg["n_layers"])])
        self.final_norm = LayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)

    def forward(self, input_ids, attention_mask=None, labels=None):
        batch_size, seq_len = input_ids.shape
        tok_embeds = self.tok_emb(input_ids)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device=input_ids.device))
        x = tok_embeds + pos_embeds
        x = self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)

        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits.view(-1, logits.size(-1)), labels.view(-1))

        return {"loss": loss, "logits": logits}

In [4]:
with open("config_gpt.json", "r") as f:
    cfg = json.load(f)

In [5]:
model_path = "modelo_gpt_custom.pth"

model = GPTModel(cfg)
model.load_state_dict(torch.load(model_path, map_location="cpu"))
model.eval()

GPTModel(
  (tok_emb): Embedding(50257, 1280)
  (pos_emb): Embedding(1024, 1280)
  (drop_emb): Dropout(p=0.0, inplace=False)
  (trf_blocks): Sequential(
    (0): TransformerBlock(
      (att): MultiHeadAttention(
        (W_query): Linear(in_features=1280, out_features=1280, bias=True)
        (W_key): Linear(in_features=1280, out_features=1280, bias=True)
        (W_value): Linear(in_features=1280, out_features=1280, bias=True)
        (out_proj): Linear(in_features=1280, out_features=1280, bias=True)
        (dropout): Dropout(p=0.0, inplace=False)
      )
      (ff): FeedForward(
        (layers): Sequential(
          (0): Linear(in_features=1280, out_features=5120, bias=True)
          (1): GELU()
          (2): Linear(in_features=5120, out_features=1280, bias=True)
        )
      )
      (norm1): LayerNorm()
      (norm2): LayerNorm()
      (drop_shortcut): Dropout(p=0.0, inplace=False)
    )
    (1): TransformerBlock(
      (att): MultiHeadAttention(
        (W_query): Linear(i

In [6]:
import random

def generate_text(model,tokenizer,prompt,seed=42,max_new_tokens=50,temperature=0.9,top_k=50,top_p=0.95,repetition_penalty=1.1):
    device="cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)
    model.eval()

    if seed is not None:
        torch.manual_seed(seed)
        random.seed(seed)

    input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)
    generated_ids = input_ids.clone()

    for _ in range(max_new_tokens):
        input_ids_cropped = generated_ids[:, -cfg["context_length"]:]

        with torch.no_grad():
            outputs = model(input_ids=input_ids_cropped)
            logits = outputs["logits"][:, -1, :]

        for token_id in set(generated_ids[0].tolist()):
            logits[0, token_id] /= repetition_penalty

        logits = logits / temperature

        if top_k > 0:
            values, _ = torch.topk(logits, top_k)
            threshold = values[:, -1].unsqueeze(-1)
            logits[logits < threshold] = -float("Inf")

        if top_p < 1.0:
            sorted_logits, sorted_indices = torch.sort(logits, descending=True)
            probs = F.softmax(sorted_logits, dim=-1)
            cumulative_probs = torch.cumsum(probs, dim=-1)

            sorted_indices_to_remove = cumulative_probs > top_p
            sorted_indices_to_remove[:, 1:] = sorted_indices_to_remove[:, :-1].clone()
            sorted_indices_to_remove[:, 0] = False

            indices_to_remove = sorted_indices[sorted_indices_to_remove]
            logits[0, indices_to_remove] = -float("Inf")
        probs = F.softmax(logits, dim=-1)
        next_token = torch.multinomial(probs, num_samples=1)
        generated_ids = torch.cat((generated_ids, next_token), dim=1)

        if next_token.item() == tokenizer.eos_token_id:
            break

    output_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True)

    return output_text

In [7]:
from transformers import GPT2Tokenizer
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token

In [8]:
prompt = "I was walking in the park when I saw"
output = generate_text(model,tokenizer,prompt,repetition_penalty=1.2)
print(output)

I was walking in the park when I saw someone walk past me who said, 'Hey man you look like a big dude.'

"Well my son says to him, 'Dude you don't know what's going on with your mother.' And he goes and tells his friend where


In [9]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device);

In [10]:
import math

class LoRALayer(torch.nn.Module):
    def __init__(self, in_dim, out_dim, rank, alpha):
        super().__init__()
        self.A = torch.nn.Parameter(torch.empty(in_dim, rank))
        torch.nn.init.kaiming_uniform_(self.A, a=math.sqrt(5))
        self.B = torch.nn.Parameter(torch.zeros(rank, out_dim))
        self.alpha = alpha

    def forward(self, x):
        x = self.alpha * (x @ self.A @ self.B)
        return x

In [11]:
class LinearWithLoRA(torch.nn.Module):
    def __init__(self, linear, rank, alpha):
        super().__init__()
        self.linear = linear
        self.lora = LoRALayer(
            linear.in_features, linear.out_features, rank, alpha
        )

    def forward(self, x):
        return self.linear(x) + self.lora(x)

In [12]:
print(type(model.out_head))

<class 'torch.nn.modules.linear.Linear'>


In [13]:
for name, module in model.named_modules():
    print(f"{name}: {module.__class__.__name__}")

: GPTModel
tok_emb: Embedding
pos_emb: Embedding
drop_emb: Dropout
trf_blocks: Sequential
trf_blocks.0: TransformerBlock
trf_blocks.0.att: MultiHeadAttention
trf_blocks.0.att.W_query: Linear
trf_blocks.0.att.W_key: Linear
trf_blocks.0.att.W_value: Linear
trf_blocks.0.att.out_proj: Linear
trf_blocks.0.att.dropout: Dropout
trf_blocks.0.ff: FeedForward
trf_blocks.0.ff.layers: Sequential
trf_blocks.0.ff.layers.0: Linear
trf_blocks.0.ff.layers.1: GELU
trf_blocks.0.ff.layers.2: Linear
trf_blocks.0.norm1: LayerNorm
trf_blocks.0.norm2: LayerNorm
trf_blocks.0.drop_shortcut: Dropout
trf_blocks.1: TransformerBlock
trf_blocks.1.att: MultiHeadAttention
trf_blocks.1.att.W_query: Linear
trf_blocks.1.att.W_key: Linear
trf_blocks.1.att.W_value: Linear
trf_blocks.1.att.out_proj: Linear
trf_blocks.1.att.dropout: Dropout
trf_blocks.1.ff: FeedForward
trf_blocks.1.ff.layers: Sequential
trf_blocks.1.ff.layers.0: Linear
trf_blocks.1.ff.layers.1: GELU
trf_blocks.1.ff.layers.2: Linear
trf_blocks.1.norm1: LayerN

In [14]:
def replace_linear_with_lora(model, rank, alpha):
    for name, module in model.named_children():
        if isinstance(module, torch.nn.Linear) and any(x in name.lower() for x in ["q", "k", "v", "proj", "fc"]):
            setattr(model, name, LinearWithLoRA(module, rank, alpha))
        else:
            replace_linear_with_lora(module, rank, alpha)

In [15]:
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters before: {total_params:,}")

for param in model.parameters():
    param.requires_grad = False

total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters after: {total_params:,}")

Total trainable parameters before: 838,359,040
Total trainable parameters after: 0


In [16]:
replace_linear_with_lora(model, rank=16, alpha=16)

total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable LoRA parameters: {total_params:,}")

Total trainable LoRA parameters: 5,898,240


In [17]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
torch.manual_seed(123)

print(model)

GPTModel(
  (tok_emb): Embedding(50257, 1280)
  (pos_emb): Embedding(1024, 1280)
  (drop_emb): Dropout(p=0.0, inplace=False)
  (trf_blocks): Sequential(
    (0): TransformerBlock(
      (att): MultiHeadAttention(
        (W_query): LinearWithLoRA(
          (linear): Linear(in_features=1280, out_features=1280, bias=True)
          (lora): LoRALayer()
        )
        (W_key): LinearWithLoRA(
          (linear): Linear(in_features=1280, out_features=1280, bias=True)
          (lora): LoRALayer()
        )
        (W_value): LinearWithLoRA(
          (linear): Linear(in_features=1280, out_features=1280, bias=True)
          (lora): LoRALayer()
        )
        (out_proj): LinearWithLoRA(
          (linear): Linear(in_features=1280, out_features=1280, bias=True)
          (lora): LoRALayer()
        )
        (dropout): Dropout(p=0.0, inplace=False)
      )
      (ff): FeedForward(
        (layers): Sequential(
          (0): Linear(in_features=1280, out_features=5120, bias=True)
      

In [18]:
for name, param in model.named_parameters():
    if param.requires_grad:
        print(f"Trainable: {name} --> shape: {param.shape}")

Trainable: trf_blocks.0.att.W_query.lora.A --> shape: torch.Size([1280, 16])
Trainable: trf_blocks.0.att.W_query.lora.B --> shape: torch.Size([16, 1280])
Trainable: trf_blocks.0.att.W_key.lora.A --> shape: torch.Size([1280, 16])
Trainable: trf_blocks.0.att.W_key.lora.B --> shape: torch.Size([16, 1280])
Trainable: trf_blocks.0.att.W_value.lora.A --> shape: torch.Size([1280, 16])
Trainable: trf_blocks.0.att.W_value.lora.B --> shape: torch.Size([16, 1280])
Trainable: trf_blocks.0.att.out_proj.lora.A --> shape: torch.Size([1280, 16])
Trainable: trf_blocks.0.att.out_proj.lora.B --> shape: torch.Size([16, 1280])
Trainable: trf_blocks.1.att.W_query.lora.A --> shape: torch.Size([1280, 16])
Trainable: trf_blocks.1.att.W_query.lora.B --> shape: torch.Size([16, 1280])
Trainable: trf_blocks.1.att.W_key.lora.A --> shape: torch.Size([1280, 16])
Trainable: trf_blocks.1.att.W_key.lora.B --> shape: torch.Size([16, 1280])
Trainable: trf_blocks.1.att.W_value.lora.A --> shape: torch.Size([1280, 16])
Train

In [19]:
import pandas as pd

df = pd.read_excel("creepypastas.xlsx")

print(df.columns)

Index(['story_name', 'average_rating', 'tags', 'body',
       'estimated_reading_time', 'publish_date', 'categories'],
      dtype='object')


In [20]:
len(df)

3510

In [21]:
df_filtered = df[df['body'].str.len() <= 4000]
df_filtered = df_filtered[df_filtered['body'].str.len() > 20]

In [22]:
len(df_filtered)

723

In [23]:
text_data = df_filtered["body"].dropna().tolist()

In [None]:
with open("creepypastas.txt", "w", encoding="utf-8") as f:
    for story in text_data:
        clean_story = story.replace("\n", " ").strip()
        f.write(clean_story + "\n")

In [25]:
pip install datasets

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [26]:
from datasets import load_dataset

dataset = load_dataset("text", data_files={"train": "creepypastas.txt"})

Generating train split: 723 examples [00:00, 32920.25 examples/s]


In [27]:
dataset

DatasetDict({
    train: Dataset({
        features: ['text'],
        num_rows: 723
    })
})

In [None]:
def tokenize_function(example):
    encoding = tokenizer(
        example["text"],
        truncation=True,
        padding="max_length",
        max_length=cfg["context_length"] + 1
    )

    input_ids = encoding["input_ids"]
    if len(input_ids) < 2:
        return {}

    encoding["input_ids"] = input_ids[:-1]
    encoding["labels"] = input_ids[1:]

    if "attention_mask" in encoding:
        encoding["attention_mask"] = encoding["attention_mask"][:-1]

    return encoding

In [29]:
tokenized_dataset = dataset.map(
    tokenize_function,
    batched=False,
)

Map: 100%|██████████| 723/723 [00:02<00:00, 272.05 examples/s]


In [30]:
tokenized_dataset = tokenized_dataset.remove_columns(["text"])

In [31]:
tokenized_dataset

DatasetDict({
    train: Dataset({
        features: ['input_ids', 'attention_mask', 'labels'],
        num_rows: 723
    })
})

In [None]:
from transformers import default_data_collator

training_args = TrainingArguments(
    output_dir="./gpt2-lora-creepy",
    per_device_train_batch_size=1,
    num_train_epochs=1,
    gradient_accumulation_steps=2,
    logging_steps=10,
    save_strategy="epoch",
    learning_rate=2e-4,
    fp16=torch.cuda.is_available(),
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    tokenizer=tokenizer,
    data_collator=default_data_collator
)

  trainer = Trainer(


In [33]:
torch.cuda.empty_cache()

In [None]:
trainer.train()

Step,Training Loss
10,4.3334
20,1.5728
30,2.0095
40,1.6038
50,1.2404
60,1.617
70,1.2697
80,1.5303
90,1.5261
100,1.4007


TrainOutput(global_step=361, training_loss=1.495055221124369, metrics={'train_runtime': 7836.9827, 'train_samples_per_second': 0.092, 'train_steps_per_second': 0.046, 'total_flos': 0.0, 'train_loss': 1.495055221124369, 'epoch': 0.9986168741355463})

In [35]:
torch.save(model.state_dict(), "modelo_lora.pth")

In [41]:
model.load_state_dict(torch.load("modelo_lora.pth", map_location="cpu"))

<All keys matched successfully>

In [42]:
prompt = "I was walking in the park when I saw"
output = generate_text(model, tokenizer, prompt, repetition_penalty=1.2)
print(output)

I was walking in the park when I saw a man in his seventies stand up. He had been running for years now and today it started raining; he wasn’t even moving, just standing still on the sidewalk with his legs crossed to the side, as if that is what made
