In [None]:
# pip installs

!pip install -q --upgrade torch==2.5.1+cu124 torchvision==0.20.1+cu124 torchaudio==2.5.1+cu124 --index-url https://download.pytorch.org/whl/cu124
!pip install -q --upgrade requests==2.32.3 bitsandbytes==0.46.0 transformers==4.48.3 accelerate==1.3.0 datasets==3.2.0 peft==0.14.0 trl==0.14.0 matplotlib wandb

In [None]:
# imports

import os
import re
import math
from tqdm import tqdm
import numpy as np
from google.colab import userdata
from huggingface_hub import login
import torch
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, mean_absolute_percentage_error
import torch.nn.functional as F
import transformers
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, set_seed
from datasets import load_dataset, Dataset, DatasetDict
from datetime import datetime
from peft import PeftModel
import matplotlib.pyplot as plt

In [None]:
# Constants

BASE_MODEL = "meta-llama/Meta-Llama-3.1-8B"
PROJECT_NAME = "pricer"
HF_USER = "ed-donner"
RUN_NAME = "2024-09-13_13.04.39"
PROJECT_RUN_NAME = f"{PROJECT_NAME}-{RUN_NAME}"
REVISION = "e8d637df551603dc86cd7a1598a8f44af4d7ae36"
FINETUNED_MODEL = f"{HF_USER}/{PROJECT_RUN_NAME}"


DATASET_NAME = f"{HF_USER}/pricer-data"
# Or just use the one I've uploaded
# DATASET_NAME = "ed-donner/pricer-data"

# Hyperparameters for QLoRA

QUANT_4_BIT = True
top_K = 6

%matplotlib inline

# Used for writing to output in color

GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
RESET = "\033[0m"
COLOR_MAP = {"red":RED, "orange": YELLOW, "green": GREEN}

In [None]:
# Log in to HuggingFace

hf_token = userdata.get('HF_TOKEN')
login(hf_token, add_to_git_credential=True)

In [None]:
dataset = load_dataset(DATASET_NAME)
train_full = dataset['train']
test_full = dataset['test']

# TRAIN_SIZE = len(train_full)
# TEST_SIZE = len(test_full)

TRAIN_SIZE = 8000  # Very small for testing
TEST_SIZE = 2000    # Very small for testing

train = train_full.select(range(min(TRAIN_SIZE, len(train_full))))
test = test_full.select(range(min(TEST_SIZE, len(test_full))))

print(f"Using small test dataset:")
print(f"  Train samples: {len(train)} (full dataset has {len(train_full)})")
print(f"  Test samples: {len(test)} (full dataset has {len(test_full)})")
print(f"\nTo use full dataset, set TRAIN_SIZE and TEST_SIZE to None or large numbers")

In [None]:
if QUANT_4_BIT:
  quant_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_quant_type="nf4"
  )
else:
  quant_config = BitsAndBytesConfig(
    load_in_8bit=True,
  )

In [None]:
# Load the Tokenizer and the Model

tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"

base_model = AutoModelForCausalLM.from_pretrained(
    BASE_MODEL,
    quantization_config=quant_config,
    device_map="auto",
)
base_model.generation_config.pad_token_id = tokenizer.pad_token_id

# Load the fine-tuned model with PEFT
if REVISION:
    fine_tuned_model = PeftModel.from_pretrained(base_model, FINETUNED_MODEL, revision=REVISION)
else:
    fine_tuned_model = PeftModel.from_pretrained(base_model, FINETUNED_MODEL)

fine_tuned_model.eval()

In [None]:
def extract_price(s):
    if "Price is $" in s:
      contents = s.split("Price is $")[1]
      contents = contents.replace(',','')
      match = re.search(r"[-+]?\d*\.\d+|\d+", contents)
      return float(match.group()) if match else 0
    return 0

In [None]:
# Original prediction function takes the most likely next token

def model_predict(prompt):
    set_seed(42)
    inputs = tokenizer.encode(prompt, return_tensors="pt").to("cuda")
    attention_mask = torch.ones(inputs.shape, device="cuda")
    outputs = fine_tuned_model.generate(inputs, attention_mask=attention_mask, max_new_tokens=3, num_return_sequences=1)
    response = tokenizer.decode(outputs[0])
    return extract_price(response)

In [None]:
def improved_model_predict(prompt, device="cuda"):
    set_seed(42)
    inputs = tokenizer.encode(prompt, return_tensors="pt").to(device)
    attention_mask = torch.ones(inputs.shape, device=device)

    with torch.no_grad():
        outputs = fine_tuned_model(inputs, attention_mask=attention_mask)
        next_token_logits = outputs.logits[:, -1, :].to('cpu')

    next_token_probs = F.softmax(next_token_logits, dim=-1)
    top_prob, top_token_id = next_token_probs.topk(top_K)
    prices, weights = [], []
    for i in range(top_K):
      predicted_token = tokenizer.decode(top_token_id[0][i])
      probability = top_prob[0][i]
      try:
        result = float(predicted_token)
      except ValueError as e:
        result = 0.0
      if result > 0:
        prices.append(result)
        weights.append(probability)
    if not prices:
      return 0.0, 0.0
    total = sum(weights)
    weighted_prices = [price * weight / total for price, weight in zip(prices, weights)]
    return sum(weighted_prices).item()

In [None]:

class Tester:

    def __init__(self, predictor, data, title=None, show_progress=True):
        self.predictor = predictor
        self.data = data
        self.title = title or predictor.__name__.replace("_", " ").title()
        self.size = len(data)
        self.guesses, self.truths, self.errors, self.rel_errors, self.sles, self.colors = [], [], [], [], [], []
        self.show_progress = show_progress

    def color_for(self, error, truth):
        if error < 40 or error / truth < 0.2:
            return "green"
        elif error < 80 or error / truth < 0.4:
            return "orange"
        else:
            return "red"

    def run_datapoint(self, i):
        datapoint = self.data[i]
        guess = self.predictor(datapoint["text"])
        truth = datapoint["price"]

        error = guess - truth
        abs_error = abs(error)
        rel_error = abs_error / truth if truth != 0 else 0
        log_error = math.log(truth + 1) - math.log(guess + 1)
        sle = log_error ** 2
        color = self.color_for(abs_error, truth)

        title = (datapoint["text"].split("\n\n")[1][:20] + "...") if "\n\n" in datapoint["text"] else datapoint["text"][:20]
        self.guesses.append(guess)
        self.truths.append(truth)
        self.errors.append(error)
        self.rel_errors.append(rel_error)
        self.sles.append(sle)
        self.colors.append(color)

        print(f"{COLOR_MAP[color]}{i+1}: Guess: ${guess:,.2f} Truth: ${truth:,.2f} "
              f"Error: ${abs_error:,.2f} RelErr: {rel_error*100:.1f}% SLE: {sle:,.2f} Item: {title}{RESET}")

    def chart_all(self, chart_title):
        """Compact version: 4 performance charts in one grid."""
        t, g = np.array(self.truths), np.array(self.guesses)
        rel_err, abs_err = np.array(self.rel_errors) * 100, np.abs(np.array(self.errors))

        fig, axs = plt.subplots(2, 2, figsize=(14, 10))
        fig.suptitle(f"Performance Dashboard — {chart_title}", fontsize=16, fontweight="bold")

        # Scatter plot
        max_val = max(t.max(), g.max()) * 1.05
        axs[1, 1].plot([0, max_val], [0, max_val], "b--", alpha=0.6)
        axs[1, 1].scatter(t, g, s=20, c=self.colors, alpha=0.6)
        axs[1, 1].set_title("Predictions vs Ground Truth")
        axs[1, 1].set_xlabel("True Price ($)")
        axs[1, 1].set_ylabel("Predicted ($)")

        # Accuracy by price range
        bins = np.linspace(t.min(), t.max(), 6)
        labels = [f"${bins[i]:.0f}–${bins[i+1]:.0f}" for i in range(len(bins)-1)]
        inds = np.digitize(t, bins) - 1
        avg_err = [rel_err[inds == i].mean() for i in range(len(labels))]
        axs[0, 0].bar(labels, avg_err, color="seagreen", alpha=0.8)
        axs[0, 0].set_title("Avg Relative Error by Price Range")
        axs[0, 0].set_ylabel("Relative Error (%)")
        axs[0, 0].tick_params(axis="x", rotation=30)

        # Relative error distribution
        axs[0, 1].hist(rel_err, bins=25, color="mediumpurple", edgecolor="black", alpha=0.7)
        axs[0, 1].set_title("Relative Error Distribution (%)")
        axs[0, 1].set_xlabel("Relative Error (%)")

        # Absolute error distribution
        axs[1, 0].hist(abs_err, bins=25, color="steelblue", edgecolor="black", alpha=0.7)
        axs[1, 0].axvline(abs_err.mean(), color="red", linestyle="--", label=f"Mean={abs_err.mean():.2f}")
        axs[1, 0].set_title("Absolute Error Distribution")
        axs[1, 0].set_xlabel("Absolute Error ($)")
        axs[1, 0].legend()

        for ax in axs.ravel():
            ax.grid(alpha=0.3)

        plt.tight_layout(rect=[0, 0, 1, 0.95])
        plt.show()

    def report(self):
        y_true = np.array(self.truths)
        y_pred = np.array(self.guesses)

        mae = mean_absolute_error(y_true, y_pred)
        rmse = math.sqrt(mean_squared_error(y_true, y_pred))
        rmsle = math.sqrt(sum(self.sles) / self.size)
        mape = mean_absolute_percentage_error(y_true, y_pred) * 100
        median_error = float(np.median(np.abs(y_true - y_pred)))
        r2 = r2_score(y_true, y_pred)

        hit_rate_green = sum(1 for c in self.colors if c == "green") / self.size * 100
        hit_rate_acceptable = sum(1 for c in self.colors if c in ("green", "orange")) / self.size * 100

        print(f"\n{'='*70}")
        print(f"FINAL REPORT: {self.title}")
        print(f"{'='*70}")
        print(f"Total Predictions: {self.size}")
        print(f"\n--- Error Metrics ---")
        print(f"Mean Absolute Error (MAE): ${mae:,.2f}")
        print(f"Median Error: ${median_error:,.2f}")
        print(f"Root Mean Squared Error (RMSE): ${rmse:,.2f}")
        print(f"Root Mean Squared Log Error (RMSLE): {rmsle:.4f}")
        print(f"Mean Absolute Percentage Error (MAPE): {mape:.2f}%")
        print(f"\n--- Accuracy Metrics ---")
        print(f"R² Score: {r2:.4f}")
        print(f"Hit Rate (Green): {hit_rate_green:.1f}%")
        print(f"Hit Rate (Green+Orange): {hit_rate_acceptable:.1f}%")
        print(f"{'='*70}\n")
        chart_title = f"{self.title} | MAE=${mae:,.2f} | RMSLE={rmsle:.3f} | R²={r2:.3f}"

        self.chart_all(chart_title)

    def run(self):
        iterator = tqdm(range(self.size), desc="Testing Model") if self.show_progress else range(self.size)
        for i in iterator:
            self.run_datapoint(i)
        self.report()

    @classmethod
    def test(cls, function, data, title=None):
        cls(function, data, title=title).run()

In [None]:
Tester.test(
    improved_model_predict, 
    test, 
    title="ed-donner Fine-tuned [Base | Llama 3.1 8B] (Improved - Small Test Set)"
)