# 1 Imports

In [None]:
! pip install -q datasets==2.21.0 requests torch peft bitsandbytes transformers==4.43.1 trl accelerate sentencepiece tiktoken matplotlib

In [None]:
# imports

import os
import re
import math
from tqdm import tqdm
from huggingface_hub import login
from dotenv import load_dotenv
import torch
import transformers
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, TrainingArguments, set_seed
from peft import LoraConfig, PeftModel
from datetime import datetime
from datasets import load_dataset, Dataset, DatasetDict
from datetime import datetime
import matplotlib.pyplot as plt

# 2 Connect to OpenAI & HuggingFace

In [None]:
# Load environment variables in a file called .env

load_dotenv()
api_key = os.getenv('OPEN_API_KEY')

In [None]:
# Load environment variables in a file called .env

load_dotenv()
hf_token = os.getenv('HF_TOKEN')
login(hf_token, add_to_git_credential=True)

# 3 Model Selection

In [None]:
# Tokenizers
qwen_2_5 = "Qwen/Qwen2.5-7B"
gemma_2 = "google/gemma-2-9b"
phi_3 = "microsoft/Phi-3-medium-4k-instruct"

# Constants
base_model = gemma_2
hf_user = 'filipespacheco'
dataset_name = f'{hf_user}/pricer-data'
max_sequence_length = 182
quant_4_bit = True

# Used for writing output in color
green = "\033[92m"
yellow = "\033[93m"
red = "\033[91m"
reset = "\033[0m"
color_map = {
    "green": green,
    "yellow": yellow,
    "red": red
}

%matplotlib inline

In [None]:
def investigate_tokenizer(model_name):
    print("Investigating tokenizer for model:", model_name)
    tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
    for number in [0, 1, 10, 100, 999, 1000, 10000]:
        tokens = tokenizer.encode(str(number), add_special_tokens=False)
        print(f"The tokens for {number} are: {tokens}")

In [None]:
investigate_tokenizer(gemma_2)

## 3.1 Load Dataset

In [None]:
dataset = load_dataset(dataset_name)
train = dataset['train']
test = dataset['test']

## 3.2 Load Tokenizer and Model

In [None]:
if quant_4_bit:
    quant_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
    )

else:
    quant_config = BitsAndBytesConfig(
        load_in_8bit=True,
        bnb_8bit_compute_dtype=torch.bfloat16,
    )

In [None]:
tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"

base_model = AutoModelForCausalLM.from_pretrained(
    base_model,
    quantization_config=quant_config,
    device_map="auto",
    trust_remote_code=True,
)

base_model.generation_config.pad_token_id = tokenizer.eos_token_id

print(f"Memory footprint of model: {base_model.get_memory_footprint() / 1024**3:.1f} GB")

## 3.3 Prediction preparation

In [None]:
def extract_price(s):
    if "Price is $" in s:
        contents = s.split("Price is $")[1]
        contents = s.split(',','').replace('$','')
        match = re.search(r"[-+]?\d*\.\d+|\d+", contents)
        return float(match.group(0)) if match else 0
    return 0

In [None]:
extract_price("Price is $1,234.56")  # Example usage

In [1]:
def model_predict(prompt):
    set_seed(42)
    inputs = tokenizer.encode(prompt, return_tensors="pt").to('cuda')
    attention_mask = torch.ones(inputs.shape, device='cuda')
    outputs = base_model.generate(
        inputs,
        attention_mask=attention_mask,
        max_length=max_sequence_length,
        num_return_sequences=1,
    )
    response = tokenizer.decode(outputs[0])
    return response

In [None]:
model_predict(test[0]['text'])

## 3.4 Evaluation Class

In [None]:
class Tester:

    def __init__(self, predictor, data, title=None, size=250):
        self.predictor = predictor
        self.data = data
        self.title = title or predictor.__name__.replace("_", " ").title()
        self.size = size
        self.guesses = []
        self.truths = []
        self.errors = []
        self.sles = []
        self.colors = []

    def color_for(self, error, truth):
        if error<40 or error/truth < 0.2:
            return "green"
        elif error<80 or error/truth < 0.4:
            return "orange"
        else:
            return "red"

    def run_datapoint(self, i):
        datapoint = self.data[i]
        guess = self.predictor(datapoint["text"])
        truth = datapoint["price"]
        error = abs(guess - truth)
        log_error = math.log(truth+1) - math.log(guess+1)
        sle = log_error ** 2
        color = self.color_for(error, truth)
        title = datapoint["text"].split("\n\n")[1][:20] + "..."
        self.guesses.append(guess)
        self.truths.append(truth)
        self.errors.append(error)
        self.sles.append(sle)
        self.colors.append(color)
        print(f"{COLOR_MAP[color]}{i+1}: Guess: ${guess:,.2f} Truth: ${truth:,.2f} Error: ${error:,.2f} SLE: {sle:,.2f} Item: {title}{RESET}")

    def chart(self, title):
        max_error = max(self.errors)
        plt.figure(figsize=(12, 8))
        max_val = max(max(self.truths), max(self.guesses))
        plt.plot([0, max_val], [0, max_val], color='deepskyblue', lw=2, alpha=0.6)
        plt.scatter(self.truths, self.guesses, s=3, c=self.colors)
        plt.xlabel('Ground Truth')
        plt.ylabel('Model Estimate')
        plt.xlim(0, max_val)
        plt.ylim(0, max_val)
        plt.title(title)
        plt.show()

    def report(self):
        average_error = sum(self.errors) / self.size
        rmsle = math.sqrt(sum(self.sles) / self.size)
        hits = sum(1 for color in self.colors if color=="green")
        title = f"{self.title} Error=${average_error:,.2f} RMSLE={rmsle:,.2f} Hits={hits/self.size*100:.1f}%"
        self.chart(title)

    def run(self):
        self.error = 0
        for i in range(self.size):
            self.run_datapoint(i)
        self.report()

    @classmethod
    def test(cls, function, data):
        cls(function, data).run()

In [None]:
Tester.test(model_predict, test)