In [None]:
!pip install -r requirements.txt
!pip install --upgrade bitsandbytes

In [None]:
from huggingface_hub import login
import json

with open("config.json", "r") as config_file:
    config = json.load(config_file)
    access_token = config["HF_ACCESS_TOKEN"]

login(token=access_token)

## Data

In [None]:
from datasets import load_dataset
from datasets.arrow_dataset import Dataset

def format_sample(sample):
    """ Helper function to format a single input sample"""
    instruction=sample['instruction']
    input_text=sample['input']
    output_text=sample['output']

    if input_text is None or input_text=="":
        formatted_prompt=(
            f"<|start_header_id|>user<|end_header_id|>\n\n"
            f"Below is an instruction that describes a task. Write a response that appropriately completes the request.\n\n"
            f"### Instruction:\n{instruction}\n\n"
            f"### Response:\n<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n"
            f"{output_text}<|eot_id|><|start_header_id|>"
        )
    else:
        formatted_prompt=(
            f"<|start_header_id|>user<|end_header_id|>\n\n"
            f"Below is an instruction that describes a task. Write a response that appropriately completes the request.\n\n"
            f"### Instruction:\n{instruction}\n\n### Input:\n{input_text}\n\n"
            f"### Response:\n<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n"
            f"{output_text}<|eot_id|><|end_of_text|>"
        )
    formatted_prompt="".join(formatted_prompt) # exclude trailing white spaces
    return formatted_prompt                    # stream text into the dataloader, one by one



def gen_train_input():
    """ Format all data input in alpaca style
        Return:
            A generator on train data "train_gen"
    """
    # load data
    ds=load_dataset("iamtarun/python_code_instructions_18k_alpaca",streaming=True, split="train")
    # datata set has 18.6k samples, we use 16.8k (90%) for training + 1.8k for validation
    num_samples=16800
    counter=0
    for sample in iter(ds):
        if counter>=num_samples:
            break
        formatted_prompt=format_sample(sample)
        yield {'text': formatted_prompt}
        counter+=1


def gen_val_input():
    """ Format all data input in alpaca style
        Return:
            A generator on val data "val_gen"
    """
    # load data
    ds=load_dataset("iamtarun/python_code_instructions_18k_alpaca",streaming=True, split="train")
    # datata set has 18.6k samples, we use 16.8k (90%) for training + 1.8k for validation
    num_samples=16800
    counter=0
    for sample in iter(ds):
        if counter<num_samples:
            counter+=1
            continue

        formatted_prompt=format_sample(sample)
        yield {'text': formatted_prompt}
        counter+=1

dataset_train = Dataset.from_generator(gen_train_input)
dataset_val=Dataset.from_generator(gen_val_input)

In [None]:
print(f"Train dataset size: {len(dataset_train)}")
print(f"Validation dataset size: {len(dataset_val)}")

print(f"Sample train:\n{dataset_train[0]}")


## Model and tokenizer

In [None]:
import torch
from peft import LoraConfig, AutoPeftModelForCausalLM
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments,BitsAndBytesConfig
from trl import SFTTrainer

model_name="meta-llama/Llama-3.2-1B-Instruct"
def create_and_prepare_model():
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
        bnb_4bit_use_double_quant=True,
    )

    model=AutoModelForCausalLM.from_pretrained(
        model_name,
        quantization_config=bnb_config,
        device_map="auto",
        token=access_token
    )

    peft_config=LoraConfig(
        lora_alpha=16,
        r=8,
        lora_dropout=0.05,
        bias="none",
        task_type="CAUSAL_LM",
        target_modules=['q_proj', 'k_proj', 'v_proj'],
    )
    tokenizer=AutoTokenizer.from_pretrained(model_name,token=access_token)
    tokenizer.add_special_tokens
    tokenizer.pad_token="<|end_of_text|>" # this token is already available in tokenizer list
    tokenizer.padding_side = "right"

    return model,peft_config,tokenizer

model,peft_config,tokenizer=create_and_prepare_model()

## Train

In [None]:
from trl import SFTConfig, SFTTrainer

args=SFTConfig(
    output_dir="./llama32-python",
    num_train_epochs=5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    gradient_accumulation_steps=4,
    gradient_checkpointing=True, # to save memmory
    optim="adamw_torch_fused",
    logging_steps=10,
    # Add evaluation strategy to compute validation loss during training
    evaluation_strategy="steps",  # Evaluate at the end of each epoch
    eval_steps=50,  # Evaluate every 50 steps    
    save_strategy="epoch",
    learning_rate=2e-4,
    bf16=True,
    tf32=False, # enable true for faster speed (supported in higher-end gpu)
    max_grad_norm=0.3,
    warmup_ratio=0.03, # follow QLoRA paper
    lr_scheduler_type="cosine",
    report_to="tensorboard",
    gradient_checkpointing_kwargs={"use_reentrant": False},
    dataset_text_field="text",
)

In [None]:
trainer=SFTTrainer(
    model=model,
    args=args,
    train_dataset=dataset_train,
    eval_dataset=dataset_val,
    peft_config=peft_config,
    tokenizer=tokenizer
)
trainer.train()

In [None]:
# save finetuned model dict
model_file_name="LLAMA32_ft_python_code.pth"
torch.save(model.state_dict(), model_file_name)
print(f"Model saved as {model_file_name}")

## Load finetune model and generate text

In [None]:
from peft import PeftModel, LoraConfig
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

def load_fine_tune_model(base_model_id, saved_weights):
    # Load tokenizer and base model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    tokenizer = AutoTokenizer.from_pretrained(base_model_id)
    base_model = AutoModelForCausalLM.from_pretrained(base_model_id)
    base_model.to(device)
    
    # Create LoRA config - make sure these parameters match your training configuration
    peft_config = LoraConfig(
        lora_alpha=16,
        r=8,
        lora_dropout=0.05,
        bias="none",
        task_type="CAUSAL_LM",
        target_modules=['q_proj','k_proj','v_proj'],
    )
    
    # Initialize PeftModel
    lora_model = PeftModel(base_model, peft_config)
    
    # Load the saved weights
    state_dict = torch.load(saved_weights,map_location=device)
        
    # Create new state dict with correct prefixes and structure
    new_state_dict = {}
    for key, value in state_dict.items():
        # key start with "model"-> add "base_" to the new key for base_model
        new_key = f"base_{key}"        
        new_state_dict[new_key] = value
    
    # Load the weights with strict=False to allow partial loading
    lora_model.load_state_dict(new_state_dict, strict=False)
    
    # Set to evaluation mode
    lora_model = lora_model.eval()
    
    return lora_model, tokenizer

# Original model and saved weight
base_model_id = "meta-llama/Llama-3.2-1B-Instruct"
lora_weights = "LLAMA32_ft_python_code.pth"
# Load model
print("Loading fine-tuned model ...")
model_ft, tokenizer = load_fine_tune_model(base_model_id, lora_weights)
total_params=sum(p.numel() for p in model_ft.parameters())
trainable_params=sum(p.numel() for p in model_ft.parameters() if p.requires_grad)
print(f"Total parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")

In [32]:
def generate_ft(model, prompt, tokenizer, max_new_tokens, context_size=256, temperature=0.0, top_k=1, eos_id=[128001, 128009]):
    """
    Generate text using a language model with proper dtype handling 
    """
    # Get model's expected dtype and device
    model_dtype = next(model.parameters()).dtype
    model_device = next(model.parameters()).device
    
    formatted_prompt = (
        f"<|start_header_id|>user<|end_header_id|>\n\n"
        f"Below is an instruction that describes a task. Write a response that appropriately completes the request.\n\n"
        f"### Instruction:\n{prompt}"
        f"### Response:\n<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n"
    )
    
    # Encode and prepare input
    idx = tokenizer.encode(formatted_prompt)
    idx = torch.tensor(idx, dtype=torch.long, device=model_device).unsqueeze(0)
    num_tokens = idx.shape[1]
    
    # Generation loop
    for _ in range(max_new_tokens):
        idx_cond = idx[:, -context_size:]   # conditioning context        
        with torch.no_grad():
            # Forward pass 
            outputs = model(input_ids=idx_cond,use_cache=False)
            logits = outputs.logits
        
        # Focus on last time step
        logits = logits[:, -1, :]
        
        # Apply top-k filtering
        if top_k is not None and top_k > 0:
            top_logits, _ = torch.topk(logits, top_k)
            min_val = top_logits[:, [-1]]
            logits = torch.where(
                logits < min_val,
                torch.tensor(float('-inf'), device=model_device, dtype=model_dtype),
                logits
            )
        
        # Apply temperature and sample
        if temperature > 0.0:
            logits = logits / temperature
            probs = torch.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
        else:
            idx_next = torch.argmax(logits, dim=-1, keepdim=True)
        
        # Check for EOS
        if idx_next.item() in eos_id:
            break
            
        # Append new token
        idx = torch.cat((idx, idx_next), dim=1)
    
    # Decode generated text
    generated_ids = idx.squeeze(0)[num_tokens:]
    generated_text = tokenizer.decode(generated_ids)
    
    return generated_text


In [None]:
# Use the function
prompt = "Program a Flask API that will convert an image to grayscale."
print(generate_ft(model_ft, prompt, tokenizer, max_new_tokens=256))