# 1. Environment Setup

In [None]:
# skip this part, for now
# !virtualenv SubtextScribe # create virtual environment
# activate virtual environment
# %cd SubtextScribe
# %source bin/activate

In [None]:
# make sure pip is up to date
# %pip install --upgrade pip

In [None]:
# make sure necessary libraries are installed
!pip list # run this to check

# 2. Install Required Libraries

In [None]:
# if not, install libraries here
%pip install torch torchvision torchaudio
%pip install transformers
%pip install numpy pandas matplotlib spacy sklearn nltk praw
%pip install virtualenv # install virtualenv package
%pip install transformers torch accelerate # install Hugging Face 'transformers' library, PyTorch, and 'accelerate' for efficient model training / parallelization
%pip install ipywidgets --upgrade


In [None]:
!python3 -m spacy download en_core_web_sm # download small English model (essential for performing various NLP tasks on English text through tokenization)

In [None]:
import os
import gc
import spacy
import torch

# for model training
from transformers import GPT2Tokenizer, GPT2LMHeadModel, GPT2Config, Trainer, TrainingArguments
from sklearn.model_selection import train_test_split

# for model evaluation
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# 3. Data Collection

In [None]:
# clone LitBank git repository into designated directory
# !cd ** your_directory_here **
# !git clone https://github.com/dbamman/litbank.git

In [None]:
# define file paths
base_path = "your path here" # make sure this reflects your own path (change it if necessary)
entities_path = os.path.join(base_path, "entities", "brat")
events_path = os.path.join(base_path, "events", "brat")
original_texts_path = os.path.join(base_path, "original")

In [None]:
# read original texts
def read_original_texts(path):
    texts = {}
    for filename in os.listdir(path):
        with open(os.path.join(path, filename), 'r') as file:
            texts[filename] = file.read()
    return texts

original_texts = read_original_texts(original_texts_path)

In [None]:
def process_text_files(path):
    texts = {}
    for filename in os.listdir(path):
        if filename.endswith('.txt'):
            with open(os.path.join(path, filename), 'r', encoding='utf-8') as file:
                base_filename = filename.replace("_brat.txt", "")
                texts[base_filename] = file.read()
    return texts

In [None]:
def process_ann_files(path):
    annotations = {}
    for filename in os.listdir(path):
        if filename.endswith('.ann'):
            with open(os.path.join(path, filename), 'r', encoding='utf-8') as file:
                base_filename = filename.replace("_brat.ann", "")
                annotations[base_filename] = [line.strip() for line in file.readlines()]
    return annotations

In [None]:

texts = process_text_files(os.path.join(base_path, "entities", "brat")) # assuming the text files are the same in both entities and events folders
entities_annotations = process_ann_files(os.path.join(base_path, "entities", "brat"))
events_annotations = process_ann_files(os.path.join(base_path, "events", "brat"))

In [None]:
# debugging (print texts)
print("\Texts:")
for filename, text in texts.items():
    print(f"\nFile: {filename}")
    print(text)

In [None]:
# debugging (print entities annotations)
print("Entities Annotations:")
for filename, annotation in entities_annotations.items():
    print(f"\nFile: {filename}")
    for ann in annotation:
        print(ann)

In [None]:
# debugging (print events annotations)
print("Events Annotations:")
for filename, annotation in events_annotations.items():
    print(f"\nFile: {filename}")
    for ann in annotation:
        print(ann)

In [None]:
print(entities_annotations.items())

# 4. Data Preprocessing

In [None]:
nlp = spacy.load("en_core_web_sm") # load small English model from Spacy for nlp tasks

In [None]:
def preprocess_text(text):
    doc = nlp(text.lower())
    processed_text = " ".join([token.lemma_ for token in doc if not token.is_stop and not token.is_punct])
    return processed_text

In [None]:
# parse .ann files and align them with text
def parse_annotations(ann_lines):
    annotations = []
    for line in ann_lines:
        if line:
            parts = line.split('\t')
            if len(parts) > 2:
                ann_id, ann_info, ann_text = parts
                ann_info_parts = ann_info.split(' ')
                if len(ann_info_parts) == 3:
                    ann_type, start, end = ann_info_parts
                    annotations.append({'id': ann_id, 'type': ann_type, 'start': int(start), 'end': int(end), 'text': ann_text})
                # else:
                    # print(f"Unexpected format in annotation: {line}") # debugging
    return annotations


In [None]:
processed_texts = {key: preprocess_text(text) for key, text in texts.items()}
 # entities and events annotations are both dictionaries where each value is a list of lines
parsed_entities = {key: parse_annotations(ann_lines) for key, ann_lines in entities_annotations.items()}
parsed_events = {key: parse_annotations(ann_lines) for key, ann_lines in events_annotations.items()}

In [None]:
# integrate annotations into preprocessed texts
def integrate_annotations(text, annotations):
    for ann in sorted(annotations, key=lambda x: x['start'], reverse=True):
        text = text[:ann['start']] + f" <{ann['type']}>" + ann['text'] + f"</{ann['type']}> " + text[ann['end']:]
    return text

integrated_texts = {key: integrate_annotations(processed_texts[key], parsed_entities[key] + parsed_events[key]) for key in processed_texts}

# 5. Pre-Tune Model Evaluation

In [None]:
# # load pre-trained GPT-2 model and its tokenizer
# tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
# model = GPT2LMHeadModel.from_pretrained('gpt2')

# load pre-trained GPT-2 Large model and its tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('gpt2-large')
model = GPT2LMHeadModel.from_pretrained('gpt2-large')

# # load pre-trained GPT-2 XL model and its tokenizer
# tokenizer = GPT2Tokenizer.from_pretrained('gpt2-xl')
# model = GPT2LMHeadModel.from_pretrained('gpt2-xl')

In [None]:
# list of prompts to use for perplexity evaluation
prompts = [
    "[Topic: Space Exploration] As the spaceship neared the mysterious planet, Captain Lyra noticed an unusual signal coming from the surface...",
    "[Topic: Medieval Kingdom] Deep within the ancient castle's walls, Sir Gareth stumbled upon a secret passage that had been hidden for centuries...",
    "[Topic: Underwater Adventure] In the depths of the uncharted ocean, marine biologist Dr. Elara spotted a strange glow emanating from an underwater cave...",
    "[Topic: Time Travel] When Professor Milton activated the time machine, he didn't expect to find himself in the middle of a bustling Victorian market...",
    "[Topic: Lost Civilization] Amidst the dense jungle, explorer Isabella uncovered the ruins of a civilization that maps had never documented...",
    "[Topic: Futuristic City] In the year 2150, Detective Kai roamed the neon-lit streets of Neo-Tokyo, following the trail of a mysterious technology heist...",
    "[Topic: Magical School] On her first day at the Arcane Academy, young witch Elowen discovered a magical artifact that had been hidden in the library's oldest section...",
    "[Topic: Dystopian World] In a world where the sun never rose, Luna and her rebel companions planned their next move against the oppressive regime...",
    "[Topic: Alien Encounter] As the alien spacecraft landed in the quiet countryside, farmer Jim cautiously approached, unaware of how this encounter would change his life...",
    "[Topic: Arctic Expedition] Trapped in a fierce blizzard during their Arctic expedition, Dr. Hansen and her team found refuge in an ice cave with mysterious carvings..."
]

In [None]:
# function for generating a response based on given prompt
def generate_text(model, tokenizer, prompt, max_length=150):
    inputs = tokenizer.encode(prompt, return_tensors='pt')
    attention_mask = torch.ones(inputs.shape, dtype=torch.long)
    outputs = model.generate(
        inputs,                         # input token IDs to model
        attention_mask=attention_mask,  # mask that indicates which tokens to pay attention to and which to ignore
        max_length=max_length,          # maximum length of the sequence to be generated
        num_return_sequences=1,         # number of different sequences to generate from the same prompt (a num greater than 1 allows model to generate multiple different continuations from same prompt)
        do_sample=True,                 # enables / disables sampling (if set to True, the model samples from probability distribution of next token, leading to more varied and random outputs, and if False, the model deterministically picks next most likely token)
        temperature=0.8,                # controls randomness of output (a value of 1.0 means no change to original probabilities, values less than 1.0 make model outputs more deterministic (less random), while values greater than 1.0 introduce more randomness)
        top_k=30,                       # limits number of highest probability vocab tokens considered for each step (a lower top_k leads to more deterministic outputs, while a higher top_k allows for more varied outputs)
        top_p=0.92,                     # considers smallest set of tokens whose cumulative probability exceeds threshold top_p (dynamically adapts size of token set based on next token's probability distribution)
        repetition_penalty=1.3          # penalizes model for repeating same token (a value greater than 1.0 discourages repetition, while a value less than 1.0 encourages it)
    )
    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return generated_text

In [None]:
# text generation (qualitative evaluation)
custom_prompt = "Trapped in a fierce blizzard during their expedition, the scientists found refuge in an ice cavee" # you can prompt model with some starting text and generate a continuation

# generate text with model
generated_text = generate_text(model, tokenizer, custom_prompt)

# text formatting (insert newlines after certain number of words)
def insert_newlines(text, word_count=20):
    words = text.split()
    lines = [' '.join(words[i:i+word_count]) for i in range(0, len(words), word_count)]
    return '\n'.join(lines)

formatted_text = insert_newlines(generated_text, word_count=20)
print(formatted_text)

In [None]:
# evaluation metric functions to be used in evaluation stage

# perplexity function
def calculate_perplexity(model, tokenizer, text):
    inputs = tokenizer.encode(text, return_tensors="pt")
    outputs = model(inputs, labels=inputs)
    loss = outputs.loss
    return torch.exp(loss).item()

# Self-BLEU score function
def calculate_self_bleu(texts):
    scores = []
    for i, candidate in enumerate(texts):
        references = texts[:i] + texts[i+1:]
        scores.append(sentence_bleu(references, candidate, smoothing_function=SmoothingFunction().method1))
    return sum(scores) / len(scores)

In [None]:
# model evaluation function (pre / post-tuning)
def evaluate_model(model, tokenizer, prompts, num_samples=10):
    total_perplexity = 0
    generated_responses = []

    # generate one response for each prompt and calculate perplexity
    for i in range(num_samples):
        model_input_prompt = prompts[i]
        generated_text = generate_text(model, tokenizer, model_input_prompt)
        generated_responses.append(generated_text)

        perplexity = calculate_perplexity(model, tokenizer, model_input_prompt)
        total_perplexity += perplexity

    # calculate Self-BLEU using list of generated responses
    self_bleu = calculate_self_bleu(generated_responses)
    average_perplexity = total_perplexity / num_samples

    return average_perplexity, self_bleu

In [None]:
# convert texts to a list
integrated_text_list = list(integrated_texts.values())

# pre fine-tuned perplexity and BLEU score metrics (quantitative evaluation)
pre_perplexity, pre_self_bleu = evaluate_model(model, tokenizer, prompts, num_samples=10)
print(f"Average Perplexity (Pre-Tuning): {pre_perplexity}, Self-BLEU Score (Pre-Tuning): {pre_self_bleu}")

# 6. Model Training

In [None]:
# set padding token to be the same as EOS token
tokenizer.pad_token = tokenizer.eos_token

In [None]:
# tokenization and dataset preparation for GPT-2
class GPT2Dataset(torch.utils.data.Dataset):
    def __init__(self, txt_list, tokenizer, max_length=1024):
        self.input_ids = []
        self.attn_masks = []
        self.labels = []

        for txt in txt_list:
            encodings_dict = tokenizer('<|startoftext|>'+ txt + '<|endoftext|>', truncation=True, max_length=max_length, padding="max_length")
            self.input_ids.append(torch.tensor(encodings_dict['input_ids']))
            self.attn_masks.append(torch.tensor(encodings_dict['attention_mask']))
            self.labels.append(torch.tensor(encodings_dict['input_ids'])) # for language modeling, the labels are the input IDs
    
    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return {
            'input_ids': self.input_ids[idx],
            'attention_mask': self.attn_masks[idx],
            'labels': self.labels[idx]  # ensure labels are included here
        }

In [None]:
# prepare and split datasets
integrated_text_list = list(integrated_texts.values())  # convert dictionary values to a list
train_texts, val_texts = train_test_split(integrated_text_list, test_size=0.1) # # split the encoded texts into training / validation sets (10% used for validation)

In [None]:
# create GPT2Dataset objects for training / validation sets
train_dataset = GPT2Dataset(train_texts, tokenizer)
val_dataset = GPT2Dataset(val_texts, tokenizer)

In [None]:
# set up training arguments (where parameters like batch size, number of epochs, learning rate, etc. are defined)

# # 1: less intensive version
# training_args = TrainingArguments(
#     output_dir='./results',
#     num_train_epochs=3,
#     per_device_train_batch_size=2,  # further reduced batch size
#     gradient_accumulation_steps=4,  # increased gradient accumulation
#     warmup_steps=500,
#     weight_decay=0.01,
#     logging_dir='./logs',
# )

# # 2: original training version
# training_args = TrainingArguments(
#     output_dir='./results',          # output directory
#     num_train_epochs=3,              # total number of training epochs
#     per_device_train_batch_size=4,   # batch size per device during training
#     warmup_steps=500,                # number of warmup steps for learning rate scheduler
#     weight_decay=0.01,               # strength of weight decay
#     logging_dir='./logs',            # directory for storing logs
# )

# # 3: more intensive version
# training_args = TrainingArguments(
#     output_dir='./results',
#     num_train_epochs=5,              # increased number of epochs
#     per_device_train_batch_size=8,   # increased batch size
#     warmup_steps=500,                # number of warmup steps for learning rate scheduler
#     weight_decay=0.01,               # strength of weight decay
#     logging_dir='./logs',            # directory for storing logs
#     learning_rate=5e-5,              # learning rate
# )

# 4: even more intensive version
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=10,             # further increased number of epochs
    per_device_train_batch_size=16,  # further increased batch size
    warmup_steps=1000,               # adjusted number of warmup steps
    weight_decay=0.02,               # increased weight decay
    logging_dir='./logs',
    learning_rate=3e-5,              # adjusted learning rate
    lr_scheduler_type='cosine',      # advanced learning rate scheduler
    evaluation_strategy='steps',     # evaluate more frequently
    eval_steps=500,                  # evaluation step
    gradient_accumulation_steps=1,   # adjust based on memory
    max_grad_norm=1.0,               # gradient clipping
    fp16=True if torch.cuda.is_available() else False, # enables mixed precision training if supported
)


In [None]:
# initialize trainer with model, training arguments, and training dataset
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset
)

In [None]:
# set environment variable to disable upper limit for memory allocations (can potentially lead to system instability, so use cautiously)
os.environ['PYTORCH_MPS_HIGH_WATERMARK_RATIO'] = '0.0'

In [None]:
gc.collect() # garbage collection
torch.cuda.empty_cache() # cache clearing

In [None]:
trainer.train() # train model
model.save_pretrained('./fine_tuned_gpt2') # save model

# 7. Post-Tune Model Evaluation

In [None]:
# load fine-tuned model
model = GPT2LMHeadModel.from_pretrained('./fine_tuned_gpt2')

In [None]:
# prepare model for evaluation
model.eval() # putting model in eval mode disables dropout layers and batch normalization during inference

In [None]:
# text generation (qualitative evaluation)
custom_prompt = "your input prompt here" # you can prompt model with some starting text and generate a continuation

# generate text with model
generated_text = generate_text(model, tokenizer, custom_prompt)

# text formatting (insert newlines after certain number of words)
def insert_newlines(text, word_count=20):
    words = text.split()
    lines = [' '.join(words[i:i+word_count]) for i in range(0, len(words), word_count)]
    return '\n'.join(lines)

formatted_text = insert_newlines(generated_text, word_count=20)
print(formatted_text)

In [None]:
# post fine-tuned perplexity and BLEU score metrics (quantitative evaluation)
post_perplexity, post_self_bleu = evaluate_model(model, tokenizer, prompts, num_samples=10)
print(f"Average Perplexity (Post-Tuning): {post_perplexity}, Self-BLEU Score (Post-Tuning): {post_self_bleu}")