In [None]:
import json
from transformers import OpenAIGPTTokenizer, OpenAIGPTLMHeadModel, AutoModelForCausalLM, AutoTokenizer
import torch
from torch.utils.data import DataLoader, Dataset

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

In [None]:
username = ...

# Dataset and Tokenizer Prep

Loading the dataset, adding new tokens to the tokenizer's vocab, creating the Dataset object.

In [None]:
def read_jsonl(filename : str) -> list:
    """
    Reads .jsonl file
    """

    with open (filename, 'r') as fp:
        jsonl_data = [json.loads(x) for x in fp.readlines()]
    
    return jsonl_data

In [None]:
# Load emotes file
with open("../custom/emotes.txt", 'r') as fp:
    VALID_EMOTES = [x.strip() for x in fp.readlines()]

IMG_TOKEN = "[IMG]"
GIF_TOKEN = "[GIF]"
LINK_TOKEN = "[LINK]"
new_vocab = [IMG_TOKEN, GIF_TOKEN, LINK_TOKEN] + VALID_EMOTES

SPECIAL_TOKENS = ["[BOS]", "[SEP]", "[EOS]", "[PAD]"]

In [None]:
tokenizer = AutoTokenizer.from_pretrained('openai-gpt')

# https://stackoverflow.com/questions/76198051/how-to-add-new-tokens-to-an-existing-huggingface-tokenizer
new_tokens = set(new_vocab) - set(tokenizer.vocab.keys())
tokenizer.add_tokens(list(new_tokens))

# We can add these special tokens to the vocabulary and the embeddings of the model:
tokenizer.add_special_tokens({
    'pad_token': '[PAD]', 
    'sep_token' : "[SEP]", 
    'bos_token' : "[BOS]",
    'eos_token' : "[EOS]"
})

In [None]:
class MimicDataset(Dataset):
    def __init__(self, train_texts, tokenizer):
        self.raw_strings = train_texts
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.raw_strings)

    def __getitem__(self, idx):
        train_text = self.raw_strings[idx]
        tokenized = self.tokenizer(train_text, return_tensors="pt", padding='max_length', max_length=512, truncation=True)
        input_ids = tokenized.input_ids.squeeze()
        
        labels = torch.full(input_ids.shape, self.tokenizer.pad_token_id)
        labels[:-1] = input_ids[:-1]
        
        return {
            "input_ids": input_ids,
            "labels" : labels
        }

In [None]:
train_data = [x["train"] for x in read_jsonl(f"../messages/user_messages/{username}.jsonl")]
user_dataset = MimicDataset(train_data, tokenizer)
data_loader = DataLoader(user_dataset, batch_size=8, shuffle=True)

# Model Training


In [None]:
model = OpenAIGPTLMHeadModel.from_pretrained("openai-gpt")
model.resize_token_embeddings(len(tokenizer))
model.to(device)

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
num_epochs = 3

model.train()
for epoch in range(num_epochs):
    total_loss = 0
    for batch in tqdm.tqdm(data_loader):
        input_ids = batch["input_ids"].to(device)
        # labels = batch["labels"].to(device)  # Shifted by one position
        optimizer.zero_grad()

        # Forward pass with custom masks
        outputs = model(input_ids, labels=input_ids)
        # outputs = model(input_ids, labels=labels)

        loss = outputs.loss
        total_loss += loss.item()

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    average_loss = total_loss / len(data_loader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Average Loss: {average_loss}")

# Save the trained model if needed
model.save_pretrained(f"models/gpt/{username}/model")
tokenizer.save_pretrained(f"models/gpt/{username}/tokenizer")

# Testing Output

In [None]:
tokenizer = AutoTokenizer.from_pretrained(f"models/gpt/{username}/tokenizer")
model = AutoModelForCausalLM.from_pretrained(f"models/gpt/{username}/model")

In [None]:
prompt = "hello. how are you?"
inputs = tokenizer(f"[BOS] {prompt} [SEP]", return_tensors="pt").input_ids
outputs = model.generate(inputs, 
                         max_new_tokens=200, 
                         do_sample=True, 
                         top_p=0.97, 
                         temperature=1.0) # top_k=150,
tokenizer.batch_decode(outputs, skip_special_tokens=False)