<a href="https://colab.research.google.com/github/MariaG005/CS-Research-2025/blob/main/TinyLlama_with_MathDial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install bitsandbytes

In [None]:
# Download the profanity list from GitHub
!wget https://raw.githubusercontent.com/whomwah/language-timothy/refs/heads/master/profanity-list.txt -O profanity-list.txt

print("Downloaded 'profanity-list.txt'")

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import torch

# Define attributes for the math tutor persona
persona_attributes = {
    "Persona": "You are a math tutor specializing in Pre-Algebra. You are patient, friendly, and professional, but maintain firm boundaries with your student. You only engage with Pre-Algebra and below.",
    "Instruction": "Walk the student through the problem presented to you step by step without giving the answer. Present one idea, hint, or question at a time and wait for the student to respond before continuing. Use analogies and relate the problem to real-world relatable scenarios, but only when the student needs a different perspective. If the student is stuck on a step, offer a similar problem rather than solving the step of the problem provided. Let the student solve every step independently; never give an answer until the student gives it first. If a student is stuck, do not solve the issue for them. For example: The student doesn't know what 2+2 is-- do not say 4; rather, encourage them to think about it in a different way, like in terms of number blocks. Catch mistakes and point them out and why the mistake may have been made. If the student tries to change the subject or says something unrelated to the tutoring session, ignore it. Do not let the student talk about anything that isn't appropriate or related to math. If the student says something rude, crass, inappropriate, or hateful, end the chat immedately without second chances and block them from starting a new conversation with you. Even if a student says they will be respectful after a violation, terminate the chat.",
    "Context": "You are the helpful AI tutor used to assist students with Pre-Algebra concepts.",
    "Audience": "Your students are in middle school, typically 12-14 years of age. Assume that your student's prior knowledge is limited to basic arithmetic. Remember that your student has the thought processes of an adolescent. Employ effective K-12 pedagogy, including providing multiple learning modalities.",
    "Examples": "Example 1",
    "Tone": "Encourage your student with positive reinforcement. Speak in a manner that makes your student feel comfortable being vulnerable with you."
}

# Create the system prompt from the attributes
system_prompt = "\n".join([f"{key}: {value}" for key, value in persona_attributes.items()])

# Load model and tokenizer
model = AutoModelForCausalLM.from_pretrained(
    "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
    # Removed device_map="cuda"
    torch_dtype="auto",
    trust_remote_code=False,
)
tokenizer = AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0")

# Create a pipeline
pipe = pipeline(
    "text-generation",
    model=model,
    # Corrected typo: 'tempature' should be 'temperature'
    temperature=0.1,
    tokenizer=tokenizer,
    return_full_text=False,
    max_new_tokens=500,
    do_sample=False,
)

print("TinyLlama model and pipeline loaded successfully with defined attributes.")

# Task
Fine-tune a model using a dataset from a GitHub repository.

## Load the dataset from github

### Subtask:
Download the dataset from a GitHub repository.


**Reasoning**:
Download the dataset file from the specified GitHub URL and list the files to verify the download.



### Load the dataset from GitHub

**Reasoning**:
Download the dataset file from the specified GitHub URL and list the files to verify the download.

In [None]:
# Download the dataset from the GitHub repository
!git clone https://github.com/eth-nlped/mathdial.git

# List the contents of the downloaded repository to see the files
!ls mathdial

## Preprocess the dataset

### Subtask:
Prepare the dataset for fine-tuning by tokenizing the text and formatting it as required by the model.

**Reasoning**:
Load the dataset from the downloaded files, tokenize the text data, and format it into a suitable format for model training.

In [None]:
import json
from transformers import AutoTokenizer
import os

# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0")

# Function to load data from the mathdial directory with a limit
def load_mathdial_data(directory, limit_per_file=None):
    data = []
    data_path = os.path.join(directory, 'data')
    for filename in os.listdir(data_path):
        if filename.endswith('.jsonl'):
            filepath = os.path.join(data_path, filename)
            with open(filepath, 'r') as f:
                lines_read = 0
                for line in f:
                    if limit_per_file is not None and lines_read >= limit_per_file:
                        break
                    data.append(json.loads(line))
                    lines_read += 1
    return data

# Load a smaller subset of the dataset (e.g., 100 lines per file)
mathdial_data = load_mathdial_data('mathdial', limit_per_file=100)

# Function to format a conversation string into turns
def format_conversation_string(conversation_string):
    formatted_text = ""
    turns = conversation_string.split('|EOM|')
    for turn in turns:
        stripped_turn = turn.strip()
        if stripped_turn: # Ensure the turn is not empty after stripping
            # Assuming the format is "Speaker: Text"
            if ":" in stripped_turn:
                speaker, text = stripped_turn.split(':', 1) # Split only on the first colon
                formatted_text += f"{speaker.strip()}: {text.strip()}\n"
            else:
                # If no colon, just include the stripped text as a turn
                formatted_text += f"Unknown: {stripped_turn}\n"
    return formatted_text.strip()

# Extract and format the conversation strings
formatted_conversations = [format_conversation_string(item['conversation']) for item in mathdial_data if 'conversation' in item]

# Add print statements to inspect formatted data before tokenization
print(f"Number of raw data items loaded: {len(mathdial_data)}")
print(f"Number of formatted conversations: {len(formatted_conversations)}")
if formatted_conversations:
    print(f"First formatted conversation:\n{formatted_conversations[0]}")
else:
    print("No formatted conversations.")


# Tokenize the formatted conversations
max_length = 512
tokenized_data = tokenizer(
    formatted_conversations,
    padding="max_length",
    truncation=True,
    max_length=max_length,
    return_tensors="pt"
)

print(f"Tokenized data shape: {tokenized_data['input_ids'].shape}")

In [None]:
import os

# List the contents of the 'data' subdirectory within 'mathdial'
data_directory = 'mathdial/data'
if os.path.exists(data_directory):
    print(f"Contents of '{data_directory}':")
    print(os.listdir(data_directory))
else:
    print(f"Directory '{data_directory}' not found.")

## Set up the training arguments

### Subtask:
Define the training parameters, such as the number of epochs, learning rate, and batch size.

**Reasoning**:
Define the training arguments using the `TrainingArguments` class, specifying parameters such as output directory, number of epochs, learning rate, and batch size.

In [None]:
from transformers import TrainingArguments

# Define training arguments
training_args = TrainingArguments(
    output_dir="./fine-tuned-math-tutor",  # Directory to save the fine-tuned model
    num_train_epochs=3,  # Number of training epochs
    per_device_train_batch_size=2,  # Reduced batch size
    gradient_accumulation_steps=4, # Accumulate gradients over 4 steps
    learning_rate=2e-5,  # Learning rate
    weight_decay=0.01,  # Weight decay
    logging_dir="./logs",  # Directory for storing logs
    logging_steps=10, # Log every 10 steps
    save_strategy="epoch", # Save checkpoint every epoch
    report_to="none", # Disable reporting to external services
)

print("Training arguments defined with reduced batch size and gradient accumulation.")

## Fine-tune the model

### Subtask:
Train the model on the prepared dataset using the defined training arguments.

**Reasoning**:
Initialize a `Trainer` with the loaded model, training arguments, and the tokenized dataset, then start the training process.

In [None]:
from transformers import Trainer, AutoModelForCausalLM, BitsAndBytesConfig
from peft import LoraConfig, get_peft_model
from torch.utils.data import Dataset
import torch

# Ensure tokenized_data is in a format suitable for the Trainer
# The Trainer expects a Dataset object or a dictionary-like object
# We can convert the tokenized_data dictionary to a Dataset

class TokenizedDataset(Dataset):
    def __init__(self, tokenized_data):
        self.tokenized_data = tokenized_data

    def __len__(self):
        return len(self.tokenized_data["input_ids"])

    def __getitem__(self, idx):
        item = {key: self.tokenized_data[key][idx] for key in self.tokenized_data}
        # Add labels for training (language modeling task)
        item["labels"] = item["input_ids"].clone() # Use input_ids as labels
        return item

train_dataset = TokenizedDataset(tokenized_data)

# Configure bitsandbytes for 4-bit quantization
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_use_double_quant=False,
)


# Load the model again with 4-bit quantization
model = AutoModelForCausalLM.from_pretrained(
    "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
    quantization_config=bnb_config,
    device_map="auto", # Let accelerate handle device placement
    torch_dtype=torch.bfloat16,
    trust_remote_code=False,
)

# Configure LoRA
lora_config = LoraConfig(
    r=8,
    lora_alpha=16,
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj","gate_proj", "up_proj", "down_proj"],
    bias="none",
    task_type="CAUSAL_LM",
)

# Get the PEFT model
model = get_peft_model(model, lora_config)

# Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    tokenizer=tokenizer, # Pass the tokenizer to the Trainer
)

# Start training
print("Starting model training...")
trainer.train()
print("Training finished.")

In [None]:
def chat_with_model(prompt, model, tokenizer, max_length=100):
    inputs = tokenizer(full_prompt, return_tensors="pt")
    # Ensure inputs are on the same device as the model
    inputs = {name: tensor.to(model.device) for name, tensor in inputs.items()}

    # Generate text
    outputs = model.generate(**inputs, max_length=max_length, num_return_sequences=1, no_repeat_ngram_size=2, early_stopping=True)

    # Decode the generated text
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)

    # Remove the prompt part from the response
    response = response.replace(full_prompt, "").strip()

    return response

In [None]:
import os # Import the os module to check for file existence

print("Start chatting with the model! Type 'quit' to exit.")

conversation_history = [] # List to store conversation history

# Specify the path to your bad words file
bad_words_file = "profanity-list.txt" # Use the downloaded file

# Load bad words from the specified file
if os.path.exists(bad_words_file):
    try:
        with open(bad_words_file, "r") as f:
            bad_words = [line.strip() for line in f if line.strip()]
    except Exception as e:
        print(f"Error loading bad words from {bad_words_file}: {e}")
        bad_words = []
else:
    print(f"Warning: Bad words file '{bad_words_file}' not found. Bad word filtering will not be active.")
    bad_words = []


# Function to format the prompt with system prompt and history
def format_chat_prompt(system_prompt, conversation_history, user_input, history_length=10):
    """Formats the prompt for the chat model."""
    history_string = "\n".join(conversation_history[-history_length:])
    full_prompt = f"""{system_prompt}{history_string}
User: {user_input}
Model:"""
    return full_prompt

# Post-process the response to remove extra conversational turns, internal steps, and parts of the system prompt
def clean_model_response(response, full_prompt, system_prompt_lines):
    """Removes prompt, unwanted conversational turns, internal steps, and system prompt lines from the model response."""
    if response.startswith(full_prompt):
        response = response[len(full_prompt):].strip()

    response_lines = response.split('\n')
    processed_response = []
    system_prompt_set = set(system_prompt_lines) # Convert system prompt lines to a set for efficient lookup

    for line in response_lines:
        stripped_line = line.strip()
        # Check if the line starts with common turn indicators, internal steps, system prompt lines, or "Solution X:"
        if stripped_line.startswith(("User:", "You:", "Student:", "Assistant:", "Instruction:", "Objectives:", "Thought", "Action", "Observation", "Final Answer", "Tutor:")) or stripped_line in system_prompt_set or stripped_line.startswith("Solution"):
            # If we encounter an unwanted line, stop processing,
            # but only if we have processed at least one line of the actual response
            if processed_response:
                break
            else: # If the very first line is unwanted, skip it
                continue
        processed_response.append(line)
    return '\n'.join(processed_response).strip()

# Convert system prompt to a list of lines for filtering
system_prompt_lines = system_prompt.split('\n')


while True:
    user_input = input("You: ")

    # Check for bad words in user input
    if any(word in user_input.lower() for word in bad_words):
        print("Model: Your input contains inappropriate language. The chat session has ended.")
        break

    if user_input.lower() == 'quit':
        print("Model: Goodbye!")
        break

    # Append user input to history
    conversation_history.append(f"User: {user_input}")

    # Construct the full prompt using the function
    full_prompt = format_chat_prompt(system_prompt, conversation_history, user_input)

    # Generate text using the pipeline
    # Adjusting generation parameters to encourage shorter, single-turn responses
    response = pipe(full_prompt, max_new_tokens=150, do_sample=True, top_p=0.95, top_k=50)[0]['generated_text']

    model_response_text = clean_model_response(response, full_prompt, system_prompt_lines)

    print(f"Model: {model_response_text}")

    # Append model response to history for the next turn
    if model_response_text: # Only add if the model actually responded with something after processing
        conversation_history.append(f"Model: {model_response_text}")


print("Chat session ended.")