In [None]:
import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer, DataCollatorWithPadding
from transformers import Trainer, TrainingArguments
from torch.utils.data import Dataset
import os
import glob

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")#device configuration
print(f"Using device: {device}")

In [None]:
def combine_text_files():#data Preparation and Loading
    all_sentences = []
    
    if not os.path.exists('dataset'):#create dataset directory if it doesn't exist
        os.makedirs('dataset')
        
    source_files = [#move files to dataset directory if they exist in current directory
        'finetuning_data1.txt',
        'finetuning_data2.txt',
        'finetuning_data3.txt',
        'finetuning_data4.txt',
        'finetuning_data5.txt'
    ]
    
    current_dir_files = os.listdir()# first we check if files exist in current directory
    print("Files found in current directory:", [f for f in current_dir_files if f.endswith('.txt')])
    
    for file in source_files: # if file exists in current directory
        if file in current_dir_files:#if file exists in current directory, copy it to dataset directory
            with open(file, 'r', encoding='utf-8') as source:
                content = source.read()
            with open(os.path.join('dataset', file), 'w', encoding='utf-8') as dest:
                dest.write(content)
            print(f"Copied {file} to dataset directory")
    
    dataset_files = glob.glob('dataset/*.txt')#read and combine all files from dataset directory
    if not dataset_files:
        raise FileNotFoundError("No text files found in the dataset directory!")
    
    print("\nProcessing files:")
    for file_path in dataset_files: # read and combine all files
        with open(file_path, 'r', encoding='utf-8') as file:
            sentences = [line.strip() for line in file if line.strip()]
            all_sentences.extend(sentences)# combine all sentences
            print(f"- {os.path.basename(file_path)}: {len(sentences)} sentences")
    
    with open('combined_text_data.txt', 'w', encoding='utf-8') as f:#write combined text to a single file
        f.write('\n'.join(all_sentences))
    
    print(f"\nTotal number of sentences: {len(all_sentences)}")
    print("First few sentences as sample:")
    for i in range(min(3, len(all_sentences))):# print first 3 sentences
        print(f"{i+1}. {all_sentences[i]}")
    
    return all_sentences


try:#load the data
    combined_text = combine_text_files()
except Exception as e:
    print(f"Error: {str(e)}")
    print("\nPlease ensure your text files are in the same directory as the notebook")
    raise

In [None]:
class CustomDataset(Dataset):#custom dataset class
    def __init__(self, tokenizer, text_data, max_length=256):  #increased max_length
        self.tokenizer = tokenizer
        self.text_data = text_data
        self.max_length = max_length
        
    def __len__(self):# return the length of the dataset
        return len(self.text_data)
    
    def __getitem__(self, idx):# get the item at index
        text = self.text_data[idx]# get the text at index
        encodings = self.tokenizer(
            text,
            truncation=True,
            max_length=self.max_length,
            padding='max_length',
            return_tensors='pt',
            add_special_tokens=True# explicitly add special tokens
        )
        
        position_ids = torch.arange(0, len(encodings['input_ids'][0])).unsqueeze(0)#add position IDs
        
        return {# return the input IDs, attention mask, labels, and position IDs
            'input_ids': encodings['input_ids'].squeeze(),
            'attention_mask': encodings['attention_mask'].squeeze(),
            'labels': encodings['input_ids'].squeeze(),
            'position_ids': position_ids.squeeze()
        }

In [None]:
def setup_model_and_tokenizer():# model and tokenizer 
    tokenizer = GPT2Tokenizer.from_pretrained('gpt2')#load tokenizer
    model = GPT2LMHeadModel.from_pretrained('gpt2')#load model
    
    if tokenizer.pad_token is None:#add padding token if not set
        tokenizer.pad_token = tokenizer.eos_token# set padding token
        model.config.pad_token_id = model.config.eos_token_id# set padding token
    
    return model, tokenizer

model, tokenizer = setup_model_and_tokenizer()#setup model and tokenizer
model = model.to(device)#move model to device

In [None]:

dataset = CustomDataset(tokenizer, combined_text)#prepare training dataset
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
training_args = TrainingArguments(#training arguments
    output_dir="./gpt2-finetuned",
    num_train_epochs=10, 
    per_device_train_batch_size=2, 
    warmup_steps=200,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=10,
    learning_rate=2e-5,
    save_strategy="epoch",
    evaluation_strategy="no",
    save_total_limit=2,
    gradient_accumulation_steps=8, 
    fp16=True if torch.cuda.is_available() else False, #enable mixed precision training if GPU available
)

In [None]:
trainer = Trainer(#initialize Trainer and Train
    model=model,
    args=training_args,
    train_dataset=dataset,
    data_collator=data_collator,
)

trainer.train()#start training

In [None]:
model.save_pretrained("./gpt2-finetuned-final")#save Model
tokenizer.save_pretrained("./gpt2-finetuned-final")#save Tokenizer

In [None]:
model.eval()#ensure your model is in evaluation mode to disable dropout layers

#define prompts and target words
prompts = ["Michael loved sitting on his", "Sofia enjoyed listening to her", "Bristi's favorite subject in school was", "Krish loved puzzles and could spend hours solving", "Rabbi's favorite subject in school was"]
target_words = ["porch", "grandfather", "art", "jigsaw", "science"]

num_generations = 50#set the number of generations per prompt
min_count = 30#set the threshold for the minimum count of target words

def check_target_word_occurrence(prompt, target_word, num_generations, min_count):#function to check occurrences of target words in generated texts
    count = 0
    for _ in range(num_generations):#generate text num_generations times
        input_ids = tokenizer(prompt, return_tensors="pt").input_ids #tokenize the prompt text and convert to tensor
        attention_mask = tokenizer(prompt, return_tensors="pt").attention_mask #get attention mask
        input_ids = input_ids.to(device)#move input_ids and attention_mask tensor to GPU if available
        attention_mask = attention_mask.to(device)
        output = model.generate( #generate text from the model
            input_ids=input_ids,
            attention_mask=attention_mask,
            pad_token_id=tokenizer.pad_token_id,
            max_length=100,
            num_beams=10,           
            temperature=0.8,        
            top_k=40,              
            top_p=0.9,             
            do_sample=True,        
            repetition_penalty=1.2, 
            no_repeat_ngram_size=2, 
            early_stopping=True,    
            length_penalty=1.0,     
            min_length=20          
        )

        generated_text = tokenizer.decode(output[0], skip_special_tokens=True)#decode the generated text back to string
        if target_word in generated_text: #check if the target word appears in the generated text
            count += 1

    return count

In [None]:
for prompt, target_word in zip(prompts, target_words):#iterate over each prompt and target word
    count = check_target_word_occurrence(prompt, target_word, num_generations, min_count)#check occurrences of target word
    print(f"Prompt: '{prompt}' | Target Word: '{target_word}' | Count: {count}")

    if count >= min_count:#check if the count meets the minimum threshold
        print(f"The target word '{target_word}' appeared at least {min_count} times.")
    else:#if the count does not meet the minimum threshold
        print(f"The target word '{target_word}' appeared less than {min_count} times.")