In [None]:
# Install dependencies (restart environment after installation)
%pip install transformers
# %pip install matplotlib
%pip install torch --index-url https://download.pytorch.org/whl/cu124
%pip install accelerate
%pip install pandas
%pip install datasets
%pip install numpy
# %pip install evaluate

In [None]:
# import dependencies
from transformers import GPT2Tokenizer, GPT2LMHeadModel, TrainingArguments, Trainer, GPT2Config
import torch
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset

In [None]:
# check for cuda gpu
print(f"[!] GPU Available: {torch.cuda.is_available()}")

In [None]:
# load gpt-2 model and tokenizer
def load_model_and_tokenizer(local_path="./model", model_name="gpt2"):
    if False:#os.path.exists(local_path):
        print("[+] loading model from local directory.")
        model = GPT2LMHeadModel.from_pretrained(local_path).to("cuda")
        tokenizer = None
        try:
          tokenizer = GPT2Tokenizer.from_pretrained(local_path)
        except:
          tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
    else:
        print("[+] loading model from Hugging-Face hub")

        config = GPT2Config.from_pretrained("gpt2")

        config.hidden_dropout_prob = 0.1
        config.attention_probs_dropout_prob = 0.1

        model = GPT2LMHeadModel.from_pretrained(model_name, config=config).to("cuda")
        # model = GPT2LMHeadModel.from_pretrained(model_name).to("cuda")

        tokenizer = GPT2Tokenizer.from_pretrained(model_name)

    if tokenizer.pad_token == None:
      tokenizer.pad_token = tokenizer.eos_token

    return model, tokenizer

In [None]:
# Pre-process "All-seasons.csv" (south park dialog dataset)
def preprocess_south_park_dialog_data(tokenizer, file_path="./data/All-seasons.csv"):
    dialog_data = []

    df = pd.read_csv(file_path, encoding="utf-8")
    lines = df["Line"].dropna().values

    # Structure training data format
    for i in range(0, len(lines) - 1, 2):
        user = lines[i]
        bot = lines[i+1]
        dialog_data.append(f"<USER>: {user} {tokenizer.eos_token}\n<BOT>: {bot}{tokenizer.eos_token}\n")

    # Visualize data - printing the last 2 elements
    print("\n[+] Visualizing pre-processed south park dataset.")
    print(dialog_data[:2])

    return dialog_data

In [None]:
# Tokenize dialog data
def tokenize_data(data, tokenizer, max_length=1024):
    return tokenizer(data, return_tensors="pt", padding=True, truncation=True, max_length=max_length)

In [None]:
def data_collator(data):
    input_ids = torch.stack([f[0] for f in data])
    attention_mask = torch.stack([f[1] for f in data])

    labels = input_ids.clone()
    labels[input_ids == tokenizer.pad_token_id] = -100

    return {
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "labels": labels
    }

In [None]:
import re

# Function to check for english text
def is_english(text):
  pattern = r'^[A-Za-z0-9\s.,!?\'":;()&*%#$@^_+-=<>{}[\]\\/|]*$'
  return bool(re.match(pattern, text))

In [None]:
def get_train_eval_data(tokenized_data, test_size=0.2, random_state=42):
    # Tokenize the data
    input_ids = tokenized_data["input_ids"]
    attention_mask = tokenized_data["attention_mask"]

    # Split the data
    train_input_ids, eval_input_ids, train_attention_mask, eval_attention_mask = train_test_split(
        input_ids, attention_mask, test_size=test_size, random_state=random_state
    )

    # Create the TensorDataset objects
    train_dataset = TensorDataset(train_input_ids, train_attention_mask)
    eval_dataset = TensorDataset(eval_input_ids, eval_attention_mask)

    return train_dataset, eval_dataset

In [None]:
def save_model_and_tokenizer(model, tokenizer, local_path="/content/drive/MyDrive/gpt2-finetuning-new/final-save"):
    print(f"\n[+] saving model & tokenizer to: {local_path}")
    model.save_pretrained(local_path)
    tokenizer.save_pretrained(local_path)

In [None]:
def get_training_arguments(learning_rate=3e-5, epochs=5):
    # Define parameters for training
    training_config = TrainingArguments(
        output_dir="/content/drive/MyDrive/gpt2-finetuning-new",
        # overwrite_output_dir=True,
        per_device_train_batch_size=32,
        per_device_eval_batch_size=32,
        eval_strategy="steps",
        save_steps=500,
        num_train_epochs=epochs,
        save_total_limit=epochs,
        logging_dir="./logs",
        logging_steps=10,
        warmup_steps=500,
        weight_decay=0.01,
        use_cpu=False,
        eval_steps=100,
        learning_rate=learning_rate,
        resume_from_checkpoint=True,
    )

    return training_config

In [None]:
def train_model(model, train_data, eval_data, tokenizer, training_args):
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_data,
        eval_dataset=eval_data,
        data_collator=data_collator,
    )

    # Start training
    trainer.train()

    # Save model after training
    save_model_and_tokenizer(model, tokenizer)

In [None]:
model, tokenizer = load_model_and_tokenizer()
dialog_data = preprocess_south_park_dialog_data(tokenizer)

tokenized_data = tokenize_data(dialog_data, tokenizer)

train_data, eval_data = get_train_eval_data(tokenized_data)

train_model(model, train_data, eval_data, tokenizer, get_training_arguments())


# Training on New Dataset
Fine-tuning on new dialog dataset - "casual_data_windows.csv" from reddit conversations.

In [None]:
import os
# Load model and tokenizer
# model, tokenizer = load_model_and_tokenizer("/content/drive/MyDrive/gpt2-finetuning-new/checkpoint-4500")
mode, tokenizer = load_model_and_tokenizer()

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

In [None]:
# Pre-process new dataset: "casual_data_windows.csv"
df = pd.read_csv("/content/data/casual_data_windows.csv", encoding="utf-8")

# Drop unused column
df = df.drop(columns=["2"])

# Remove rows where C2 and C3 share data
df = df[df["0"] != df["1"]]

# Remove duplicates in columns
df = df.drop_duplicates(subset=["0", "1"])

# Drop rows with any empty cells
df = df.dropna()

In [None]:
#  Visualize dataset
df.head(30)

In [None]:
import html
import re

def clean_user_and_subreddit_mention(text):
    text = re.sub(r"\bu/[a-zA-Z0-9_]+\b", "USER_MENTION", text)
    text = re.sub(r"\br/[a-zA-Z0-9_]+\b", "CHANNEL_MENTION", text)
    return text

def clean_text(text):
    text = html.unescape(text)
    text = text.replace("\r\n", " ").strip()
    text = " ".join(text.split())
    return text

def remove_emoticon(text):
    return re.sub(r'[^\w\s,.;()!]', '', text)

df["0"] = df["0"].apply(clean_user_and_subreddit_mention).apply(clean_text).apply(remove_emoticon)
df["1"] = df["1"].apply(clean_user_and_subreddit_mention).apply(clean_text).apply(remove_emoticon)

df = df[df["0"].str.strip().ne("") & df["1"].str.strip().ne("")]
df = df.reset_index(drop=True)


In [None]:
df.sample(20)

In [None]:
# Format data for training
dialog_data = []
illegal_line_count = 0
for _, row in df.iterrows():
    user = row["0"].strip()
    bot = row["1"].strip()

    dialog_data.append(f"<USER> {user}{tokenizer.eos_token} <BOT> {bot}{tokenizer.eos_token}")

    # if is_english(user) and is_english(bot):
    #     dialog_data.append(f"<USER> {user}{tokenizer.eos_token} <BOT> {bot}{tokenizer.eos_token}")
    # else:
    #     illegal_line_count += 1

print(f"[-] Removed {illegal_line_count} lines.")

In [None]:
print(dialog_data[:10])
print(f"\n[+] total data samples: {len(dialog_data)}")

In [None]:
# Tokenize the data
tokenized_data = tokenize_data(dialog_data, tokenizer)
print(tokenized_data)

In [None]:
train_data, eval_data = get_train_eval_data(tokenized_data)

In [None]:
train_model(model, train_data, eval_data, tokenizer, get_training_arguments(epochs=2))

In [None]:
# PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True
torch.cuda.empty_cache()