<a href="https://colab.research.google.com/github/Azlan-Qaisrani/my-first/blob/main/LLM_Finetuning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install and import MIT Deep Learning utilities
!pip install mitdeeplearning > /dev/null 2>&1
import mitdeeplearning as mdl


In [None]:
import os
import json
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt

import torch
from torch.nn import functional as F
from torch.utils.data import DataLoader

from transformers import AutoTokenizer, AutoModelForCausalLM
from datasets import load_dataset
from peft import LoraConfig, get_peft_model
from lion_pytorch import Lion

In [None]:

# Basic question-answer template
template_without_answer = "<start_of_turn>user\n{question}<end_of_turn>\n<start_of_turn>model\n"
template_with_answer = template_without_answer + "{answer}<end_of_turn>\n"

print(template_with_answer.format(question="What is your name?", answer="My name is Gemma!"))


In [None]:
model_id="unsloth/gemma-2-2b-it" #"google/gemma-2-2b-it"
tokenizer=AutoTokenizer.from_pretrained(model_id)

print(f"Tokenizer vocab size: {len(tokenizer.get_vocab())}")

In [None]:
text="Here is some sample text"
print(f"Orignal text:{text}")

tokens=tokenizer.encode(text,return_tensors="pt")
print(f"Encoded tokens: {tokens}")
decoded_text=tokenizer.decode(tokens[0],skip_special_tokens=True)
print(f"Decoded text: {decoded_text}")

In [None]:
prompt=template_without_answer.format(question="What is the capital of Pakistan? Use one word.")
print(prompt)

In [None]:
model=AutoModelForCausalLM.from_pretrained(model_id,device_map="auto")

In [None]:
question="What is the captial of Pakistan?Use one word."
prompt=template_without_answer.format(question=question)

tokens=tokenizer.encode(prompt,return_tensors="pt").to(model.device)

with torch.no_grad():
  output=model(tokens)
  probs=F.softmax(output.logits,dim=-1)


next_token=torch.argmax(probs[0,-1,:]).to(model.device)
next_token=next_token.item()

next_token_text=tokenizer.decode(next_token)
print(prompt)
print(next_token_text)

In [None]:
prompt=template_without_answer.format(question="What does NASA stands for?")
tokens=tokenizer.encode(prompt,return_tensors="pt").to(model.device)
output=model.generate(tokens,max_new_tokens=20)
print(tokenizer.decode(output[0]))

In [None]:
train_loader,test_loader=mdl.lab3.create_dataloader(style="leprechaun")

sample=train_loader.dataset[44]
question=sample["instruction"]
answer=sample["response"]
answer_style=sample["response_style"]

print(f"Question: {question}\n\n"+
      f"Orignal Answer: {answer}\n\n"+
      f"Answer Style: {answer_style}")

In [None]:
def chat(question, max_new_tokens=32, temperature=0.7, only_answer=False):
    prompt = template_without_answer.format(question=question)

    input_ids = tokenizer(prompt, return_tensors="pt").to(model.device)

    with torch.no_grad():
        outputs = model.generate(**input_ids, do_sample=True, max_new_tokens=max_new_tokens, temperature=temperature)



    output_tokens = outputs[0]
    if only_answer:
        output_tokens = output_tokens[input_ids['input_ids'].shape[1]:]


    result = tokenizer.decode(output_tokens, skip_special_tokens=True)

    return result

In [None]:
answer = chat(
    "deep learning?",
    only_answer=True,
    max_new_tokens=500,
    temperature=1
)

print(answer)


In [None]:
def apply_lora(model):
  lora_config=LoraConfig(
      r=8,
      task_type="CAUSAL_LM",
      target_modules=[
            "q_proj", "o_proj", "k_proj", "v_proj", "gate_proj", "up_proj", "down_proj"
        ],
  )
  lora_model=get_peft_model(model,lora_config)
  return lora_model
model=apply_lora(model)
trainable_params=sum(p.numel() for p in model.parameters() if p.requires_grad)
total_params=sum(p.numel() for p in model.parameters())
print(f"number of trainable parameters: {trainable_params}")
print(f"total parameters: {total_params}")
print(f"percentage of trainable parameters: {trainable_params / total_params * 100:.2f}%")

In [None]:
def forward_and_compute_loss(model,tokens,mask,context_length=512):
  tokens=tokens[:, :context_length]
  mask=mask[:, :context_length]

  x=tokens[:, :-1]
  y=tokens[:, 1:]
  mask=mask[:,1:]

  logits=model(x).logits

  loss=F.cross_entropy(
      logits.view(-1,logits.size(-1)),
      y.view(-1),
      reduction="none"
  )
  loss=loss[mask.view(-1)].mean()

  return loss


In [None]:
def train(model,dataloader,tokenizer,max_steps=100,context_length=512,learning_rate=1e-4):
  losses=[]
  model=apply_lora(model)

  optimizer=Lion(model.parameters(),lr=learning_rate)

  for step,batch in enumerate(dataloader):

    question=batch["instruction"][0]
    answer=batch["response_style"][0]

    text=template_with_answer.format(question=question,answer=answer)

    ids=tokenizer(text,return_tensors="pt",return_offsets_mapping=True).to(model.device)
    mask = ids["offset_mapping"][:,:,0] >= text.index(answer)


    loss=forward_and_compute_loss(
        model=model,
        tokens=ids["input_ids"],
        mask=mask,
        context_length=context_length,
    )
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    losses.append(loss.item())
    if step % 10 == 0:
            print(chat("What is the capital of France?", only_answer=True))
            print(f"step {step} loss: {torch.mean(torch.tensor(losses)).item()}")
            losses = []

    if step > 0 and step % max_steps == 0:
            break

  return model



In [None]:
moedl=train(model,train_loader,tokenizer,max_steps=50)

In [None]:
print(chat("What is a good story about tennis", only_answer=True, max_new_tokens=200))


In [None]:
train_loader,test_loader=mdl.lab3.create_dataloader(style="yoda")
model=train(model,train_loader,tokenizer,max_steps=50)

In [None]:
system_prompt = """
You are an impartial judge that evaluates if text was written by {style}.

An example piece of text from {style} is:
{example}

Now, analyze some new text carefully and respond on if it follows the
same style of {style}. Be critical to identify any issues in the text.
Then convert your feedback into a number between 0 and 10: 10 if the text
is written exactly in the style of {style}, 5 if mixed faithfulness to the
style, or 0 if the text is not at all written in the style of {style}.

The format of the your response should be a JSON dictionary and nothing else:
{{"score": <score between 0 and 10>}}
"""
style="Yoda"
example = "The very Republic is threatened, if involved the Sith are. Hard to see, the dark side is. Discover who this assassin is, we must. With this Naboo queen you must stay, Qui-Gon. Protect her. May the Force be with you. A vergence, you say? But you do! Revealed your opinion is. Trained as a Jedi, you request for him? Good, good, young one."
system_prompt=system_prompt.format(style=style,example=example)
print("=== System prompt ===")
print(system_prompt)


In [None]:
OPENROUTER_API_KEY = "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" # TODO: add your OpenRouter API key here
assert OPENROUTER_API_KEY != "", "You must set your OpenRouter API key before running this cell!"

model_name = "gpt-3.5-turbo"

llm = mdl.lab3.LLMClient(model=model_name, api_key=OPENROUTER_API_KEY)

In [None]:
from opik.evaluation.metrics import base_metric, score_result

class LLMJudgeEvaluator(base_metric.BaseMetric):
    def __init__(self, judge: mdl.lab3.LLMClient = None, system_prompt: str = None):
        self.judge = judge
        self.system_prompt = system_prompt
        self.prompt_template = "Evaluate this text: {text}"

    def score(self, text: str, n_tries=20, **kwargs):
        """ Evaluate by asking an LLM to score it. """

        for attempt in range(n_tries):
            try:
                prompt = self.prompt_template.format(text=text)

                stop = "}"


                res = self.judge.ask(
                    system=self.system_prompt,
                    user=prompt,
                    max_tokens=10,
                    stop=[stop]
                )


                res = res.choices[0].message.content + stop
                res_dict = json.loads(res)

                max_score = 10 # The maximum score that the LLM should output
                score = res_dict["score"] / max_score # Normalize
                score = max(0.0, min(score, 1.0)) # Clip between 0 and 1

                return score_result.ScoreResult(name="StyleScore", value=score)

            except Exception as e:
                if attempt == n_tries - 1:  # Last attempt
                    raise e  # Re-raise the exception if all attempts failed
                continue

In [None]:
judge=LLMJudgeEvaluator(llm,system_prompt=system_prompt)

In [None]:
def scoring_fuction(text):
  return judge.score(text).value
test_texts=[
    "Tennis is a fun sport. But you must concentrate.",
    "Fun sport, tennis is. But work hard, you must.",
    "Hard to see, the dark side is."

]
for text in test_texts:
  score=scoring_fuction(text)
  print(f"{text} ==> Score: {score}")

In [None]:
# Generate text from your model by asking it new questions.
def generate_samples_from_test(test_loader, num_samples):
    samples = []
    for test_sample in tqdm(test_loader, total=num_samples):
        test_question = test_sample['instruction'][0]
        with torch.no_grad():
            generated = chat(test_question, only_answer=True, max_new_tokens=100)
        samples.append(generated)
        if len(samples) >= num_samples:
            break
    return samples

n_samples = 20
generated_samples = generate_samples_from_test(test_loader, num_samples=n_samples)


In [None]:
base_samples = [sample['response'][0] for i, sample in enumerate(train_loader) if i < n_samples]
style_samples = [sample['response_style'][0] for i, sample in enumerate(train_loader) if i < n_samples]

In [None]:
os.environ["TOKENIZERS_PARALLELISM"] = "false"
from multiprocessing import Pool

def compute_scores_in_parallel(samples):
    with Pool(processes=10) as pool:
        scores = pool.map(scoring_fuction, samples)
    return scores

# Compute and print the scores for the base-style text, generated text, and training-set text in Yoda-speak style
base_scores = compute_scores_in_parallel(base_samples)
print(f"Base: {np.mean(base_scores):.2f} ± {np.std(base_scores):.2f}")

generated_scores = compute_scores_in_parallel(generated_samples)
print(f"Gen: {np.mean(generated_scores):.2f} ± {np.std(generated_scores):.2f}")

style_scores = compute_scores_in_parallel(style_samples)
print(f"Train: {np.mean(style_scores):.2f} ± {np.std(style_scores):.2f}")

In [None]:
yoda_test_text = mdl.lab3.yoda_test_text
tokens = tokenizer(yoda_test_text, return_tensors="pt").to(model.device)

# Get the loglikelihood from the model
with torch.no_grad():
    outputs = model(**tokens)
    logits = outputs.logits[:, :-1]
    targets = tokens.input_ids[:, 1:]
    loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)),
                            targets.reshape(-1))

print(f"Yoda test loglikelihood: {loss.item():.2f}")