In [None]:
!pip install -q transformers peft==0.11.1 bitsandbytes accelerate "trl<0.9.0" datasets

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
!mkdir -p /content/model/fine-tuned

INSERT HUGGINGFACE_TOKEN BELOW

In [None]:
from huggingface_hub import login

# https://huggingface.co/settings/tokens
login(token="*huggingface_token*")

In [None]:
%%writefile /content/finetune_model.py
import os
import argparse
import warnings
import json
import torch
import gc
import transformers
import bitsandbytes as bnb
from datasets import Dataset
from peft import prepare_model_for_kbit_training, LoraConfig, get_peft_model
from trl import SFTConfig, SFTTrainer, DataCollatorForCompletionOnlyLM
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, EarlyStoppingCallback


chosen_user = None
tokenizer = None


def format_example(example):
    messages = [
        {'role': 'system', 'content': f'You are an assistant that mimics {chosen_user} in a WhatsApp group chat with his friends that switch between English and Italian. Respond naturally in {chosen_user}\'s style to the conversation below. The message should take the context into account so that it is coherent and flows naturally with the conversation. When {chosen_user} is mentioned in any of the messages in the conversation, you should pay more attention to that message when replying.'}
    ]
    
    for entry in example["context"]:
        messages.append({'role': 'user', 'content': f"[{entry['speaker']}]: {entry['message']}"})
    
    messages.append({'role': 'assistant', 'content': example['target_response']})
    
    return {'text': tokenizer.apply_chat_template(messages, tokenize=False)}


def find_all_linear_names(model, bits):
    cls = (
        bnb.nn.Linear4bit if bits == 4 else (bnb.nn.Linear8bitLt if bits == 8 else torch.nn.Linear)
    )
    lora_module_names = set()
    for name, module in model.named_modules():
        if isinstance(module, cls):
            names = name.split('.')
            lora_module_names.add(names[0] if len(names) == 1 else names[-1])

    return list(lora_module_names)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Fine-tune a language model with QLoRA')
    parser.add_argument('--model-name', type=str, default='meta-llama/Meta-Llama-3-8B-Instruct',
                        help='Model name to load (default: "meta-llama/Meta-Llama-3-8B-Instruct")')
    parser.add_argument('--user', type=str, default=None,
                        help='User name to load specific train/val files (e.g., "Paolo")')
    parser.add_argument('--min-msg-length', type=int, default=0,
                        help='Minimum number of words in messages (default: 0)')
    parser.add_argument('--epochs', type=int, default=2,
                        help='Number of training epochs (default: 2)')
    parser.add_argument('--train-batch-size', type=int, default=8,
                        help='Per device batch size for training (default: 4)')
    parser.add_argument('--eval-batch-size', type=int, default=4,
                        help='Per device batch size for evaluation (default: 4)')
    parser.add_argument('--drive-path', type=str, default='/content/drive/MyDrive/qlora',
                        help='Path in Google Drive for data and model output')
    parser.add_argument('--patience', type=int, default=3,
                        help='Early stopping patience. Number of evaluation calls with no improvement after which training will stop (default: 3)')
    parser.add_argument('--early-stopping', action='store_true',
                        help='Enable early stopping based on validation loss')
    # New arguments for configurable parameters
    parser.add_argument('--lora-rank', type=int, default=4,
                        help='Rank for LoRA adaptation (default: 4)')
    parser.add_argument('--lora-alpha', type=float, default=None,
                        help='Alpha for LoRA adaptation (default: 2 * rank)')
    parser.add_argument('--lora-dropout', type=float, default=0.05,
                        help='Dropout for LoRA adaptation (default: 0.05)')
    parser.add_argument('--eval-steps', type=float, default=500,
                        help='Number of steps between evaluations (default: 500, can also use fractions like 0.2)')
    parser.add_argument('--save-steps', type=float, default=500,
                        help='Number of steps between saving checkpoints (default: 500, can also use fractions like 0.2)')
    parser.add_argument('--save-total-limit', type=int, default=2,
                        help='Maximum number of checkpoints to keep (default: 2)')
    parser.add_argument('--learning-rate', type=float, default=1e-5,
                        help='Learning rate for fine-tuning (default: 1e-5)')
    parser.add_argument('--logging-steps', type=int, default=50,
                        help='Number of steps between logging updates (default: 50)')

    args = parser.parse_args()

    torch.cuda.empty_cache()
    gc.collect()

    chosen_user = args.user

    warnings.filterwarnings('ignore')

    rank = args.lora_rank
    lora_alpha = args.lora_alpha if args.lora_alpha is not None else rank * 2

    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type='nf4',
        bnb_4bit_compute_dtype=torch.bfloat16
    )

    model = AutoModelForCausalLM.from_pretrained(
        args.model_name,
        device_map='auto',
        trust_remote_code=True,
        quantization_config=bnb_config
    )

    tokenizer = AutoTokenizer.from_pretrained(
        args.model_name
    )

    PAD_TOKEN = '<|pad|>'

    special_tokens = ['[Federico]:', '[Paolo]:', '[Riccardo Santini]:', '[Guglielmone]:']
    tokenizer.add_special_tokens({'pad_token': PAD_TOKEN, 'additional_special_tokens': special_tokens})

    tokenizer.padding_side = 'right'

    model.resize_token_embeddings(len(tokenizer), pad_to_multiple_of=8)

    model.gradient_checkpointing_enable()
    model = prepare_model_for_kbit_training(model)

    target_modules = [
        'gate_proj',
        'q_proj',
        'k_proj',
        'v_proj',
        'o_proj',
        'down_proj',
        'up_proj',
        'lm_head'
    ]

    config = LoraConfig(
        r=rank,
        lora_alpha=lora_alpha,
        target_modules=target_modules,
        lora_dropout=args.lora_dropout,
        bias='none',
        task_type='CAUSAL_LM'
    )

    model = get_peft_model(model, config)

    train_dataset_path = f'{args.drive_path}/data/processed/train_conversations_{args.user.lower()}.json'
    val_dataset_path = f'{args.drive_path}/data/processed/val_conversations_{args.user.lower()}.json'

    with open(f'{args.drive_path}/data/processed/train_conversations_{args.user.lower()}.json', 'r', encoding='utf-8') as f:
        raw_training_data = json.load(f)

    with open(f'{args.drive_path}/data/processed/val_conversations_{args.user.lower()}.json', 'r', encoding='utf-8') as f:
        raw_validation_data = json.load(f)

    print(f'Formatting training dataset...')
    train_examples = [
        conversation for conversation in raw_training_data
        if len(conversation['target_response'].split()) >= args.min_msg_length
    ]
    train_dataset = Dataset.from_list(train_examples)
    train_dataset = train_dataset.map(format_example, remove_columns=train_dataset.column_names)

    print(f'Formatting validation dataset...')
    val_examples = [
        conversation for conversation in raw_validation_data
        if len(conversation['target_response'].split()) >= args.min_msg_length
    ]
    val_dataset = Dataset.from_list(val_examples)
    val_dataset = val_dataset.map(format_example, remove_columns=val_dataset.column_names)

    print(f'Training dataset size: {len(train_dataset)}')
    print(f'Validation dataset size: {len(val_dataset)}')

    response_template = '<|end_header_id|>'

    data_collator = DataCollatorForCompletionOnlyLM(
        response_template=response_template,
        tokenizer=tokenizer
    )

    print(f"LoRA Configuration:")
    print(f"  - Rank: {rank}")
    print(f"  - Alpha: {lora_alpha}")
    print(f"  - Dropout: {args.lora_dropout}")
    print(f"Training Parameters:")
    print(f"  - Learning rate: {args.learning_rate}")
    print(f"  - Evaluation steps: {args.eval_steps}")
    print(f"  - Save steps: {args.save_steps}")
    print(f"  - Save total limit: {args.save_total_limit}")

    training_args = SFTConfig(
        dataset_text_field='text',  # this is the default column name
        max_seq_length=512,
        per_device_train_batch_size=args.train_batch_size,
        per_device_eval_batch_size=args.eval_batch_size,
        gradient_accumulation_steps=4,
        num_train_epochs=args.epochs,
        warmup_steps=4,
        learning_rate=args.learning_rate,
        fp16=True,
        evaluation_strategy='steps',
        eval_steps=args.eval_steps,
        save_safetensors=True,
        save_steps=args.save_steps,
        save_strategy='steps',
        save_total_limit=args.save_total_limit,
        logging_steps=args.logging_steps,
        output_dir=f'{args.drive_path}/model/training',
        optim='paged_adamw_8bit',
        run_name=f'finetune-{args.user}-{args.model_name.split("/")[-1]}',
        load_best_model_at_end=True,
        metric_for_best_model="eval_loss",
        greater_is_better=False,
    )

    callbacks = []
    if args.early_stopping:
        print(f"Early stopping enabled with patience {args.patience}")
        callbacks.append(EarlyStoppingCallback(early_stopping_patience=args.patience))

    trainer = SFTTrainer(
        model=model,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        args=training_args,
        data_collator=data_collator,
        callbacks=callbacks,
    )

    model.config.use_cache = False

    trainer.train()

    # Save the best model
    print("Saving the best model...")
    model.save_pretrained(f'{args.drive_path}/model/fine-tuned')
    tokenizer.save_pretrained(f'{args.drive_path}/model/fine-tuned')

    print('Model fine-tuning complete.')
    print(f'Model saved to {args.drive_path}/model/fine-tuned')
    print(f'Tokenizer saved to {args.drive_path}/model/fine-tuned')

Upload dataset files to Google Drive:
/content/drive/MyDrive/finetuning/data/processed/train_conversations.json
/content/drive/MyDrive/finetuning/data/processed/val_conversations.json

In [None]:
!python /content/finetune_model.py \
  --model-name "meta-llama/Meta-Llama-3-8B-Instruct" \
  --user "paolo" \
  --min-msg-length 0 \
  --learning-rate 7e-5 \
  --epochs 5 \
  --early-stopping \
  --patience 3 \
  --lora-rank 4 \
  --lora-alpha 8 \
  --lora-dropout 0.05 \
  --eval-steps 100 \
  --save-steps 100 \
  --save-total-limit 3

from google.colab import runtime
runtime.unassign()

Save adapter weights from model/fine-tuned to local machine after training

# TESTING

In [None]:
%%writefile /content/test_model.py
import torch
import gc
import argparse
import os
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    pipeline
)
from peft import PeftModel

chosen_user = None

def load_fine_tuned_model(model_path, base_model_name):
    # Free up CUDA memory before starting
    torch.cuda.empty_cache()
    gc.collect()
    
    # Configure quantization for inference
    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.bfloat16,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_use_double_quant=True,
    )
    
    # Load the base model with quantization
    print(f"Loading base model: {base_model_name}")
    base_model = AutoModelForCausalLM.from_pretrained(
        base_model_name,
        quantization_config=quantization_config,
        device_map="auto",
        trust_remote_code=True,
        low_cpu_mem_usage=True,
    )
    
    # Load tokenizer
    print(f"Loading tokenizer...")
    tokenizer = AutoTokenizer.from_pretrained(base_model_name)
    
    # Add the same special tokens that were used during training
    special_tokens = ['[Federico]:', '[Paolo]:', '[Riccardo Santini]:', '[Guglielmone]:']
    pad_token = '<|pad|>'
    tokenizer.add_special_tokens({'pad_token': pad_token, 'additional_special_tokens': special_tokens})
    
    # Set padding side to match training
    tokenizer.padding_side = 'right'

    # Resize model embeddings to match tokenizer size
    base_model.resize_token_embeddings(len(tokenizer), pad_to_multiple_of=8)
    
    # Load the fine-tuned LoRA adapter
    print(f"Loading adapter from: {model_path}")
    model = PeftModel.from_pretrained(
        base_model,
        model_path,
        device_map="auto",
    )
    
    return model, tokenizer


def generate_response(model, tokenizer, prompt, max_length=100, temperature=0.7, top_p=0.9, repetition_penalty=1.2):
    # Create input tokens with attention mask
    inputs = tokenizer(
        prompt, 
        return_tensors="pt",
        padding=True,
        truncation=True,
        max_length=4096,  # Match max_seq_length from training
        return_attention_mask=True
    ).to(model.device)
    
    # Generate with the model
    with torch.no_grad():
        outputs = model.generate(
            input_ids=inputs.input_ids,
            attention_mask=inputs.attention_mask,
            max_new_tokens=max_length,
            temperature=temperature,
            top_p=top_p,
            repetition_penalty=repetition_penalty,
            do_sample=True,
            pad_token_id=tokenizer.pad_token_id
        )
    
    # Decode only the newly generated tokens
    input_length = inputs.input_ids.shape[1]
    generated_text = tokenizer.decode(outputs[0][input_length:], skip_special_tokens=True)
    
    # Clean up the response
    cleaned_response = generated_text.strip()
    # Remove "assistant" prefix if present
    if cleaned_response.lower().startswith("assistant"):
        cleaned_response = cleaned_response[len("assistant"):].strip()
    
    return cleaned_response


def parse_conversation_input(user_input):
    messages = []
    parts = user_input.split("|")
    
    for part in parts:
        part = part.strip()
        if ":" in part:
            speaker, message = part.split(":", 1)
            messages.append({
                "speaker": speaker.strip(),
                "message": message.strip()
            })
    
    return messages


def format_conversation(messages, chosen_user):
    # Format using the chat template to match training
    system_message = {
        'role': 'system', 
        'content': f'You are an assistant that mimics {chosen_user} in a WhatsApp group chat with his friends that switch between English and Italian. Respond naturally in {chosen_user}\'s style to the conversation below. The message should take the context into account so that it is coherent and flows naturally with the conversation. When {chosen_user} is mentioned in any of the messages in the conversation, you should pay more attention to that message when replying.'
    }
    
    user_messages = [
        {'role': 'user', 'content': f"[{message['speaker']}]: {message['message']}"} 
        for message in messages
    ]
    
    # Apply the chat template without the assistant's response
    formatted_conversation = tokenizer.apply_chat_template(
        [system_message] + user_messages,
        tokenize=False
    )
    
    return formatted_conversation


def interactive_mode(model, tokenizer, chosen_user):
    print("\n===== Interactive Mode =====")
    print("Type 'exit' to quit")
    print("Format your input as: 'Guglielmone: Message1 | Paolo: Message2 | ...'")
    
    while True:
        user_input = input("\nEnter conversation: ")
        if user_input.lower() == 'exit':
            break
        
        messages = parse_conversation_input(user_input)
        conversation = format_conversation(messages, chosen_user)
        
        print("\nFormatted prompt:")
        print(conversation)
        print("\nGenerating response...")
        
        completion = generate_response(
            model, 
            tokenizer, 
            conversation
        )
        
        print(f"\n[{chosen_user}]: {completion}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Test a fine-tuned model")
    parser.add_argument('--user', type=str, default="Paolo",
                        help='User name to be used in the prompt.')
    parser.add_argument("--model-path", type=str, default="/content/drive/MyDrive/qlora/model/fine-tuned/", 
                        help="Path to the fine-tuned model")
    parser.add_argument("--base-model", type=str, default="meta-llama/Meta-Llama-3-8B-Instruct", 
                        help="Name of the base model")
    parser.add_argument("--prompt", type=str, 
                        help="Single prompt to test (if not provided, interactive mode will start)")
    parser.add_argument("--max-length", type=int, default=100, 
                        help="Maximum length of generated text")
    parser.add_argument("--temperature", type=float, default=0.7, 
                        help="Sampling temperature")
    parser.add_argument("--top-p", type=float, default=0.9, 
                        help="Top-p sampling parameter")
    parser.add_argument("--repetition-penalty", type=float, default=1.2, 
                        help="Repetition penalty")
    parser.add_argument("--drive-path", type=str, default="/content/drive/MyDrive/qlora",
                        help="Path in Google Drive for data and model")
    
    args = parser.parse_args()

    chosen_user = args.user
    
    # Update model path if drive path is provided
    if args.drive_path and not args.model_path.startswith("/content/drive"):
        args.model_path = os.path.join(args.drive_path, "model/fine-tuned")
    
    # Check if model path exists
    if not os.path.exists(args.model_path):
        print(f"Warning: Model path {args.model_path} does not exist.")
        print(f"Checking Google Drive path...")
        if os.path.exists("/content/drive"):
            potential_paths = [
                "/content/drive/MyDrive/qlora/model/fine-tuned",
                "/content/drive/MyDrive/qlora/model/fine-tuned/adapter_model.bin"
            ]
            for path in potential_paths:
                if os.path.exists(path):
                    args.model_path = os.path.dirname(path)
                    print(f"Found model at: {args.model_path}")
                    break
    
    # Load the model
    model, tokenizer = load_fine_tuned_model(args.model_path, args.base_model)
    
    if args.prompt:
        # Single prompt mode
        print(f"\nPrompt: {args.prompt}")

        messages = parse_conversation_input(args.prompt)
        formatted_prompt = format_conversation(messages, chosen_user)

        print(f"\nFormatted prompt: {formatted_prompt}")
        
        completion = generate_response(
            model, 
            tokenizer, 
            formatted_prompt,
            max_length=args.max_length,
            temperature=args.temperature,
            top_p=args.top_p,
            repetition_penalty=args.repetition_penalty
        )
        print(f"[{chosen_user}]: {completion}")
    else:
        # Interactive mode
        interactive_mode(model, tokenizer, chosen_user)

In [None]:
from huggingface_hub import login
login(login(token="*huggingface_token*"))

In [None]:
!python /content/test_model.py \
  --user Paolo