In [None]:
!pip install datasets

In [None]:
!pip install gradio

In [None]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import get_peft_model, LoraConfig, TaskType, prepare_model_for_kbit_training
from datasets import load_dataset
from transformers import Trainer, TrainingArguments, DataCollatorForLanguageModeling
from torch.utils.data import DataLoader
import os
from transformers import TextGenerationPipeline
import gradio as gr

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
model_name = "openai-community/gpt2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
    device_map="auto" if torch.cuda.is_available() else None
)


In [None]:
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
    model.config.pad_token_id = model.config.eos_token_id

In [None]:
dataset = load_dataset("tatsu-lab/alpaca")
print(f"Dataset loaded with {len(dataset['train'])} training examples")


In [None]:
def prepare_training_example(example, max_length=512):
    """
    Prepares a dataset example for instruction fine-tuning by tokenizing and
    properly formatting inputs and labels.

    Args:
        example (dict): Dictionary containing 'instruction', 'input', and 'output' keys
        max_length (int): Maximum sequence length

    Returns:
        dict: Processed example with input_ids, attention_mask, and labels
    """
    try:
        # Format the prompt and full text
        if example['input']:
            prompt = f"Instruction: {example['instruction']}\nInput: {example['input']}\nOutput:"
        else:
            # Handle cases where input is empty
            prompt = f"Instruction: {example['instruction']}\nOutput:"

        full_text = prompt + " " + example["output"]

        # Tokenize the full sequence
        tokenized = tokenizer(
            full_text,
            truncation=True,
            max_length=max_length,
            padding="max_length",
            return_attention_mask=True
        )

        # Tokenize just the prompt to find its length
        prompt_tokens = tokenizer(
            prompt,
            add_special_tokens=False
        )

        # Set up labels: -100 for prompt tokens (to be ignored by loss function)
        labels = tokenized["input_ids"].copy()
        prompt_length = len(prompt_tokens["input_ids"])

        # Set prompt tokens to -100 so they're ignored in loss calculation
        for i in range(prompt_length):
            if i < len(labels):
                labels[i] = -100

        # Also mask padding tokens in the labels
        for i in range(len(labels)):
            if tokenized["attention_mask"][i] == 0:  # This is a padding token
                labels[i] = -100

        tokenized["labels"] = labels
        return tokenized

    except Exception as e:
        print(f"Error processing example: {e}")

        return {
            "input_ids": [tokenizer.pad_token_id] * 2,
            "attention_mask": [0] * 2,
            "labels": [-100] * 2
        }

In [None]:
max_length = 512  # Adjust based on your needs and GPU memory
processed_dataset = dataset.map(
    lambda x: prepare_training_example(x, max_length=max_length),
    remove_columns=dataset["train"].column_names,
    desc="Processing dataset",
    num_proc=4
)

In [None]:
def evaluate_model(model, tokenizer, test_examples, device="cuda", max_new_tokens=100):
    """
    Evaluates a model on a list of test examples.

    Args:
        model: The model to evaluate
        tokenizer: The tokenizer to use
        test_examples: List of dictionaries with 'instruction', 'input', and 'reference_output' keys
        device: Device to run inference on
        max_new_tokens: Maximum number of tokens to generate

    Returns:
        dict: Dictionary with generated responses and metrics
    """
    model.eval()
    results = []

    for example in test_examples:
        instruction = example['instruction']
        input_text = example.get('input', '')
        reference = example.get('reference_output', '')

        # Format the prompt based on whether input is provided
        if input_text:
            prompt = f"Instruction: {instruction}\nInput: {input_text}\nOutput:"
        else:
            prompt = f"Instruction: {instruction}\nOutput:"

        # Tokenize and move to device
        inputs = tokenizer(prompt, return_tensors="pt").to(device)

        # Generate response
        with torch.no_grad():
            outputs = model.generate(
                input_ids=inputs["input_ids"],
                attention_mask=inputs["attention_mask"],
                max_new_tokens=max_new_tokens,
                temperature=0.7,
                top_p=0.9,
                do_sample=True,
                pad_token_id=tokenizer.pad_token_id,
            )

        # Decode and extract only the response part
        full_output = tokenizer.decode(outputs[0], skip_special_tokens=True)
        response = full_output[len(prompt):].strip()

        results.append({
            'instruction': instruction,
            'input': input_text,
            'generated_output': response,
            'reference_output': reference
        })

    return results

In [None]:
# Sample test examples
test_examples = [
    {
        'instruction': 'Write a short poem',
        'input': 'about artificial intelligence',
        'reference_output': 'In silicon dreams and neural might,\nA new mind awakens to the light.\nNot born of flesh but human thought,\nIntelligence that we have wrought.'
    },
    {
        "instruction": "Translate the following sentence to German",
        "input": "I am hungry",
        "reference_output": "Ich habe Hunger"
    },
    {
        'instruction': 'Summarize the main idea',
        'input': 'The Internet of Things (IoT) refers to the billions of physical devices around the world that are now connected to the internet, collecting and sharing data.',
        'reference_output': 'The Internet of Things (IoT) is a network of billions of internet-connected physical devices worldwide that collect and share data.'
    },
    {
        'instruction': 'Explain the concept of photosynthesis in simple terms',
        'input': '',
        'reference_output': 'Photosynthesis is how plants make their own food. They use sunlight, water, and carbon dioxide to create energy and oxygen. It\'s like plants cooking their meals using sunlight as the heat source.'
    },
]

In [None]:
# Function to run tests and display results
def run_model_evaluation(model_name="Base Model", save_results=False):
    print(f"\n===== {model_name} Evaluation =====")

    model.to(device)
    results = evaluate_model(model, tokenizer, test_examples, device)

    for i, result in enumerate(results):
        print(f"\nExample {i+1}:")
        print(f"Instruction: {result['instruction']}")
        if result['input']:
            print(f"Input: {result['input']}")
        print(f"\nGenerated output: {result['generated_output']}")
        if result['reference_output']:
            print(f"Reference output: {result['reference_output']}")
        print("-" * 50)

    if save_results:
        import json
        import os
        results_dir = "./evaluation_results"
        os.makedirs(results_dir, exist_ok=True)

        with open(f"{results_dir}/{model_name.replace(' ', '_').lower()}_results.json", "w") as f:
            json.dump(results, f, indent=2)
        print(f"Results saved to {results_dir}/{model_name.replace(' ', '_').lower()}_results.json")

    return results

In [None]:
# 1. Test before fine-tuning
print("\n\n=============== BEFORE FINE-TUNING EVALUATION ===============")
before_results = run_model_evaluation("Before Fine-tuning", save_results=True)

In [None]:
# Configure LoRA
peft_config = LoraConfig(
    task_type=TaskType.CAUSAL_LM,
    r=8,
    lora_alpha=32,
    lora_dropout=0.1,
    bias="none",
    target_modules=["c_attn", "c_proj"],
    inference_mode=False
)

In [None]:
# Prepare model for LoRA fine-tuning
model = prepare_model_for_kbit_training(model)
model = get_peft_model(model, peft_config)
model.print_trainable_parameters()


In [None]:
# Set up training arguments
output_dir = "./gpt2-alpaca-lora"
training_args = TrainingArguments(
    output_dir=output_dir,
    per_device_train_batch_size=4,
    gradient_accumulation_steps=4,
    warmup_steps=100,
    max_steps=1000,  # Adjust based on dataset size and needs
    learning_rate=2e-4,
    fp16=torch.cuda.is_available(),
    logging_steps=10,
    save_steps=200,
    save_total_limit=3,
    report_to="tensorboard",
    run_name="gpt2-alpaca-lora",
    remove_unused_columns=False,
)

In [None]:
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=False  # Not using masked language modeling
)

In [None]:
# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=processed_dataset["train"],
    data_collator=data_collator,
)

# Train model
trainer.train()

In [None]:
trainer.save_model("./my_trained_chatbot_model")

In [None]:
print("\n\n=============== AFTER FINE-TUNING EVALUATION ===============")
after_results = run_model_evaluation("After Fine-tuning", save_results=True)

In [None]:
# Loading the trained model
def load_model(model_path):
    tokenizer = AutoTokenizer.from_pretrained("gpt2")  # Base tokenizer, adjust if needed
    model = AutoModelForCausalLM.from_pretrained(model_path)

    # If your tokenizer doesn't have a padding token, set it
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    return model, tokenizer

In [None]:
class ChatbotConversation:
    def __init__(self, model_path):
        self.model, self.tokenizer = load_model(model_path)
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model.to(self.device)
        self.pipeline = TextGenerationPipeline(
            model=self.model,
            tokenizer=self.tokenizer,
            device=0 if self.device == "cuda" else -1
        )
        self.conversation_history = []

    def reset(self):
        self.conversation_history = []
        return "Conversation has been reset."

    def format_prompt(self):
        if not self.conversation_history:
            return ""

        formatted_history = []
        for role, message in self.conversation_history:
            formatted_history.append(f"{role}: {message}")

        return "\n".join(formatted_history) + "\nAssistant:"

    def generate_response(self, user_input, max_length=100, temperature=0.7):
        # Add user input to history
        self.conversation_history.append(("User", user_input))

        # Format the prompt with conversation history
        prompt = self.format_prompt()

        # Generate response
        generated_text = self.pipeline(
            prompt,
            max_length=len(self.tokenizer.encode(prompt)) + max_length,
            do_sample=True,
            temperature=temperature,
            top_p=0.92,
            top_k=50,
            repetition_penalty=1.1,
            pad_token_id=self.tokenizer.eos_token_id
        )[0]["generated_text"]

        # Extract only the model's response
        response = generated_text[len(prompt):].strip()

        # Clean up the response if it contains start of a new turn
        if "User:" in response:
            response = response.split("User:")[0].strip()

        # Add the response to history
        self.conversation_history.append(("Assistant", response))

        return response

In [None]:
#Creating a Gradio interface
def create_chatbot_interface(model_path):
    chatbot = ChatbotConversation(model_path)

    def user_interaction(message, history):
        return chatbot.generate_response(message)

    def reset_conversation():
        return chatbot.reset()

    with gr.Blocks() as demo:
        gr.Markdown("# Your Fine-Tuned GPT-2 Chatbot")

        chatbot_interface = gr.ChatInterface(
            user_interaction,
            examples=[
                "Hello, how can you help me today?",
                "Tell me something interesting.",
                "What capabilities do you have?"
            ],
            title="Chat with the AI Assistant"
        )

        with gr.Row():
            reset_btn = gr.Button("Reset Conversation")
            reset_btn.click(reset_conversation, outputs=gr.Textbox())

    return demo

In [None]:
if __name__ == "__main__":

    MODEL_PATH = "./my_trained_chatbot_model"

    print(f"Loading model from {MODEL_PATH}...")
    demo = create_chatbot_interface(MODEL_PATH)
    print("Starting Gradio interface...")
    demo.launch(share=True)  #