<a href="https://colab.research.google.com/github/Sourabh92133/pricer_llm_fine_tuning/blob/main/notebooks/inference_and_evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# pip installs

!pip install -q --upgrade torch==2.5.1+cu124 torchvision==0.20.1+cu124 torchaudio==2.5.1+cu124 --index-url https://download.pytorch.org/whl/cu124
!pip install -q --upgrade requests==2.32.3 bitsandbytes==0.46.0 transformers==4.48.3 accelerate==1.3.0 datasets==3.2.0 peft==0.14.0 trl==0.14.0 matplotlib wandb

In [None]:
import os
import re
import math
from tqdm import tqdm
from google.colab import userdata
from huggingface_hub import login
import torch
import torch.nn.functional as F
import transformers
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, set_seed
from datasets import Dataset, load_dataset,DatasetDict
from datetime import datetime
from peft import PeftModel
import matplotlib.pyplot as plt


In [None]:
# constants
BASE_MODEL = "meta-llama/Meta-Llama-3.1-8B"
Project_Name="pricer"
HF_USER="sourabh004"         # you use you HF-USER name

# the run
Run_Name="25-12-28_07.13.56"
Project_Run_Name=f"{Project_Name}-{Run_Name}"      # or directly use this "pricer-25-12-28_07.13.56"
Fine_Tuned_Model_Name=f"{HF_USER}/{Project_Run_Name}"    # or directly use this from hugging face "sourabh004/pricer-25-12-28_07.13.56"
# Revision="d7e54ea4678868601675cb7072a79deb5c8a2786"     # checkpoint that you want to run
Revision=None      # if you want to run latest checkpoint

# dataset
DATASET_NAME = "ed-donner/pricer-data"      # i used this data but you can use your own dataset
MAX_SEQUENCE_LENGTH=182

# hyperparameter for Qlora
Quant_4bit=True
%matplotlib inline     # this is only for notebook

# Used for writing to output in color

GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
RESET = "\033[0m"
COLOR_MAP = {"red":RED, "orange": YELLOW, "green": GREEN}

In [None]:
# hf_token=userdata.get('HF_TOKEN')
# login(hf_token,add_to_git_credential=True)
# Hugging Face Authentication

# If running on Google Colab, you may set HF_TOKEN like this:

# os.environ["HF_TOKEN"] = userdata.get("HF_TOKEN")

HF_TOKEN = os.getenv("HF_TOKEN")
if HF_TOKEN is None:
    raise ValueError(
        "HF_TOKEN environment variable not set. "
        "Please set it via environment variables or Colab userdata."
    )

login(HF_TOKEN, add_to_git_credential=True)


In [None]:
dataset=load_dataset(DATASET_NAME)
dataset
test=dataset["test"]
train=dataset["train"]

In [None]:
if Quant_4bit:
  quant_config=BitsAndBytesConfig(
      load_in_4bit=True,
      bnb_4bit_use_double_quant=True,
      bnb_4bit_compute_dtype=torch.bfloat16,
      bnb_4bit_quant_type="nf4"
  )
else:
  quant_config=BitsAndBytesConfig(
      load_in_8bit=True,
      bnb_8bit_compute_dtype=torch.bfloat16
  )

In [None]:
# loading Tokenizer
tokenizer=AutoTokenizer.from_pretrained(BASE_MODEL,trust_remote_code=True)
tokenizer.pad_token=tokenizer.eos_token
tokenizer.pad_side="right"
# loading model
base_model=AutoModelForCausalLM.from_pretrained(BASE_MODEL,quantization_config=quant_config,device_map="auto")
base_model.generation_config.pad_token_id=tokenizer.pad_token_id

# load fine tuned model
if Revision:
  fine_tuned_model=PeftModel.from_pretrained(base_model,Fine_Tuned_Model_Name,revision=Revision)
else:
  fine_tuned_model=PeftModel.from_pretrained(base_model,Fine_Tuned_Model_Name)


print(f"Memory footprint: {fine_tuned_model.get_memory_footprint() / 1e6:.1f} MB")

In [None]:
fine_tuned_model


In [None]:
def extract_price(s):
    if "Price is $" in s:
      contents = s.split("Price is $")[1]
      contents = contents.replace(',','')
      match = re.search(r"[-+]?\d*\.\d+|\d+", contents)      # this part will search for digits
      return float(match.group()) if match else 0
    return 0

In [None]:
extract_price("Price is $a fabulous 899.99$ or so")

In [None]:
# Original prediction function takes the most likely next token
def model_predict(prompt):
  inputs=tokenizer.encode(prompt,return_tensors="pt").to("cuda")    # this will return pytorch tensor then moves data to gpu
  attention_mask=torch.ones(inputs.shape,device="cuda")     # to remove warnings
  outputs=fine_tuned_model.generate(inputs,attention_mask=attention_mask,max_new_tokens=3,num_return_sequences=1)
  output=tokenizer.decode(outputs[0])
  return extract_price(output)

In [None]:
# An improved prediction function takes a weighted average of the top 3 choices
# This code would be more complex if we couldn't take advantage of the fact
# That Llama generates 1 token for any 3 digit number

top_K = 3

def improved_model_predict(prompt, device="cuda"):
    set_seed(42)
    inputs = tokenizer.encode(prompt, return_tensors="pt").to(device)
    attention_mask = torch.ones(inputs.shape, device=device)

    with torch.no_grad():
        outputs = fine_tuned_model(inputs, attention_mask=attention_mask)
        next_token_logits = outputs.logits[:, -1, :].to('cpu')

    next_token_probs = F.softmax(next_token_logits, dim=-1)
    top_prob, top_token_id = next_token_probs.topk(top_K)
    prices, weights = [], []
    for i in range(top_K):
      predicted_token = tokenizer.decode(top_token_id[0][i])
      probability = top_prob[0][i]
      try:
        result = float(predicted_token)
      except ValueError as e:
        result = 0.0
      if result > 0:
        prices.append(result)
        weights.append(probability)
    if not prices:
      return 0.0, 0.0
    total = sum(weights)
    weighted_prices = [price * weight / total for price, weight in zip(prices, weights)]
    return sum(weighted_prices).item()

In [None]:
# tester class-> you shpuld understand this
class Tester:

    def __init__(self, predictor, data, title=None, size=250):
        self.predictor = predictor
        self.data = data
        self.title = title or predictor.__name__.replace("_", " ").title()
        self.size = size
        self.guesses = []
        self.truths = []
        self.errors = []
        self.sles = []
        self.colors = []

    def color_for(self, error, truth):
        if error<40 or error/truth < 0.2:
            return "green"
        elif error<80 or error/truth < 0.4:
            return "orange"
        else:
            return "red"

    def run_datapoint(self, i):
        datapoint = self.data[i]
        guess = self.predictor(datapoint["text"])
        truth = datapoint["price"]
        error = abs(guess - truth)
        log_error = math.log(truth+1) - math.log(guess+1)
        sle = log_error ** 2
        color = self.color_for(error, truth)
        title = datapoint["text"].split("\n\n")[1][:20] + "..."
        self.guesses.append(guess)
        self.truths.append(truth)
        self.errors.append(error)
        self.sles.append(sle)
        self.colors.append(color)
        print(f"{COLOR_MAP[color]}{i+1}: Guess: ${guess:,.2f} Truth: ${truth:,.2f} Error: ${error:,.2f} SLE: {sle:,.2f} Item: {title}{RESET}")

    def chart(self, title):
        max_error = max(self.errors)
        plt.figure(figsize=(12, 8))
        max_val = max(max(self.truths), max(self.guesses))
        plt.plot([0, max_val], [0, max_val], color='deepskyblue', lw=2, alpha=0.6)
        plt.scatter(self.truths, self.guesses, s=3, c=self.colors)
        plt.xlabel('Ground Truth')
        plt.ylabel('Model Estimate')
        plt.xlim(0, max_val)
        plt.ylim(0, max_val)
        plt.title(title)
        plt.show()

    def report(self):
        average_error = sum(self.errors) / self.size
        rmsle = math.sqrt(sum(self.sles) / self.size)
        hits = sum(1 for color in self.colors if color=="green")
        title = f"{self.title} Error=${average_error:,.2f} RMSLE={rmsle:,.2f} Hits={hits/self.size*100:.1f}%"
        self.chart(title)

    def run(self):
        self.error = 0
        for i in range(self.size):
            self.run_datapoint(i)
        self.report()

    @classmethod
    def test(cls, function, data):
        cls(function, data).run()

In [None]:
Tester.test(model_predict, test)

In [None]:
Tester.test(improved_model_predict, test)