## The AI Product Pricer

A model that can estimate how much something costs, from its description.

The dataset is here:  
https://huggingface.co/datasets/McAuley-Lab/Amazon-Reviews-2023

And the folder with all the product datasets is here:  
https://huggingface.co/datasets/McAuley-Lab/Amazon-Reviews-2023/tree/main/raw/meta_categories

In [None]:
"""
Optimized Fine-tuning Code for Price Estimation
Key improvements:
1. Better data preparation and validation
2. Improved system and user prompts
3. Optimized hyperparameters
4. Enhanced training data diversity
5. Better evaluation metrics
"""
import os
import re
import math
import json
import random
from dotenv import load_dotenv
from huggingface_hub import login
import matplotlib.pyplot as plt
import numpy as np
import pickle
from collections import Counter, defaultdict
from openai import OpenAI
from anthropic import Anthropic

load_dotenv(override=True)
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY', 'your-key-if-not-using-env')
os.environ['ANTHROPIC_API_KEY'] = os.getenv('ANTHROPIC_API_KEY', 'your-key-if-not-using-env')
os.environ['HF_TOKEN'] = os.getenv('HF_TOKEN', 'your-key-if-not-using-env')

hf_token = os.environ['HF_TOKEN']
login(hf_token, add_to_git_credential=True)

from items import Item
from testing import Tester

openai = OpenAI()

%matplotlib inline

with open('train.pkl', 'rb') as file:
    train = pickle.load(file)

with open('test.pkl', 'rb') as file:
    test = pickle.load(file)

# OPTIMIZATION 1: Better data selection and stratification
def analyze_price_distribution(items):
    """Analyze price distribution to ensure balanced training"""
    prices = [item.price for item in items]
    price_ranges = {
        'low': [p for p in prices if p < 25],
        'medium': [p for p in prices if 25 <= p < 100],
        'high': [p for p in prices if 100 <= p < 500],
        'very_high': [p for p in prices if p >= 500]
    }
    
    print("Price Distribution Analysis:")
    for range_name, price_list in price_ranges.items():
        print(f"{range_name}: {len(price_list)} items (${min(price_list):.2f} - ${max(price_list):.2f})")
    
    return price_ranges

def stratified_sampling(items, n_samples, seed=42):
    """Create stratified sample to ensure balanced price representation"""
    random.seed(seed)
    
    # Group items by price ranges
    price_groups = defaultdict(list)
    for item in items:
        if item.price < 25:
            price_groups['low'].append(item)
        elif item.price < 100:
            price_groups['medium'].append(item)
        elif item.price < 500:
            price_groups['high'].append(item)
        else:
            price_groups['very_high'].append(item)
    
    # Sample proportionally from each group
    total_items = len(items)
    sampled_items = []
    
    for group_name, group_items in price_groups.items():
        group_proportion = len(group_items) / total_items
        group_samples = max(1, int(n_samples * group_proportion))  # At least 1 from each group
        
        if len(group_items) >= group_samples:
            sampled_items.extend(random.sample(group_items, group_samples))
        else:
            sampled_items.extend(group_items)
    
    # If we haven't reached target, sample remaining from largest groups
    while len(sampled_items) < n_samples:
        largest_group = max(price_groups.values(), key=len)
        remaining_items = [item for item in largest_group if item not in sampled_items]
        if remaining_items:
            sampled_items.append(random.choice(remaining_items))
        else:
            break
    
    return sampled_items[:n_samples]

# Analyze distribution
analyze_price_distribution(train)

# OPTIMIZATION 2: Improved data selection
# Use stratified sampling for better representation
fine_tune_train = stratified_sampling(train, 10000)  # Increased from 200
fine_tune_validation = stratified_sampling(train[10000:], 100)  # Increased from 50

print(f"\nTraining set: {len(fine_tune_train)} items")
print(f"Validation set: {len(fine_tune_validation)} items")

# OPTIMIZATION 3: Enhanced prompts with better instructions
def messages_for_training(item):
    """Improved system prompt and user prompt for training"""
    system_message = """You are an expert price estimator with deep knowledge of retail markets, product categories, and pricing patterns. 

Your task is to estimate the retail price of items based on their descriptions, features, and specifications. Consider factors like:
- Product category and market positioning
- Brand reputation and quality indicators
- Features, specifications, and complexity
- Materials and build quality
- Target market and use case

Provide only the price in the format "Price is $X.XX" where X.XX is your estimate rounded to the nearest cent. Be precise and realistic in your pricing."""
    
    # Enhanced user prompt with better context
    user_prompt = item.test_prompt().replace(" to the nearest dollar", "").replace("\n\nPrice is $", "")
    user_prompt = f"Please estimate the retail price for this item:\n\n{user_prompt.strip()}"
    
    return [
        {"role": "system", "content": system_message},
        {"role": "user", "content": user_prompt},
        {"role": "assistant", "content": f"Price is ${item.price:.2f}"}
    ]

def messages_for_inference(item):
    """Consistent prompt format for inference"""
    system_message = """You are an expert price estimator with deep knowledge of retail markets, product categories, and pricing patterns. 

Your task is to estimate the retail price of items based on their descriptions, features, and specifications. Consider factors like:
- Product category and market positioning
- Brand reputation and quality indicators
- Features, specifications, and complexity
- Materials and build quality
- Target market and use case

Provide only the price in the format "Price is $X.XX" where X.XX is your estimate rounded to the nearest cent. Be precise and realistic in your pricing."""
    
    user_prompt = item.test_prompt().replace(" to the nearest dollar", "").replace("\n\nPrice is $", "")
    user_prompt = f"Please estimate the retail price for this item:\n\n{user_prompt.strip()}"
    
    return [
        {"role": "system", "content": system_message},
        {"role": "user", "content": user_prompt}
    ]

In [None]:
messages_for_training(train[0])

In [None]:
messages_for_inference(train[0])

In [None]:
# OPTIMIZATION 4: Data quality validation
def validate_training_data(items):
    """Validate training data quality"""
    valid_items = []
    issues = []
    
    for i, item in enumerate(items):
        # Check for reasonable price ranges
        if item.price <= 0:
            issues.append(f"Item {i}: Invalid price ${item.price}")
            continue
        
        # Check for minimum content length
        prompt = item.test_prompt()
        if len(prompt.strip()) < 50:
            issues.append(f"Item {i}: Prompt too short ({len(prompt)} chars)")
            continue
            
        # Check for reasonable token count
        if hasattr(item, 'token_count') and item.token_count < 20:
            issues.append(f"Item {i}: Too few tokens ({item.token_count})")
            continue
            
        valid_items.append(item)
    
    print(f"Data validation: {len(valid_items)}/{len(items)} items valid")
    if issues:
        print(f"Issues found: {len(issues)}")
        for issue in issues[:5]:  # Show first 5 issues
            print(f"  {issue}")
    
    return valid_items

# Validate data
fine_tune_train = validate_training_data(fine_tune_train)
fine_tune_validation = validate_training_data(fine_tune_validation)

def make_jsonl(items, for_training=True):
    """Create JSONL with improved error handling"""
    result = ""
    for item in items:
        try:
            if for_training:
                messages = messages_for_training(item)
            else:
                messages = messages_for_inference(item)
            
            messages_str = json.dumps(messages, ensure_ascii=False)
            result += '{"messages": ' + messages_str +'}\n'
        except Exception as e:
            print(f"Error processing item {item}: {e}")
            continue
    
    return result.strip()

def write_jsonl(items, filename, for_training=True):
    """Write JSONL with UTF-8 encoding"""
    with open(filename, "w", encoding='utf-8') as f:
        jsonl = make_jsonl(items, for_training)
        f.write(jsonl)

# Write training files
write_jsonl(fine_tune_train, "fine_tune_train.jsonl", for_training=True)
write_jsonl(fine_tune_validation, "fine_tune_validation.jsonl", for_training=True)

# Upload files
with open("fine_tune_train.jsonl", "rb") as f:
    train_file = openai.files.create(file=f, purpose="fine-tune")

with open("fine_tune_validation.jsonl", "rb") as f:
    validation_file = openai.files.create(file=f, purpose="fine-tune")

# OPTIMIZATION 5: Better hyperparameters
wandb_integration = {"type": "wandb", "wandb": {"project": "gpt-pricer-optimized"}}

In [None]:
# Improved hyperparameters
fine_tuning_job = openai.fine_tuning.jobs.create(
    training_file=train_file.id,
    validation_file=validation_file.id,
    model="gpt-4o-mini-2024-07-18",
    seed=42,
    hyperparameters={
        "n_epochs": 3,  # Increased from 1 - more epochs for better learning
        "batch_size": 4,  # Smaller batch size for more stable training
        "learning_rate_multiplier": 0.5  # Lower learning rate for stability
    },
    integrations=[wandb_integration],
    suffix="pricer-v2"
)

print(f"Fine-tuning job created: {fine_tuning_job.id}")

In [None]:
# Monitor training
def monitor_training(job_id, max_checks=50):
    """Monitor training progress"""
    import time
    
    for i in range(max_checks):
        job = openai.fine_tuning.jobs.retrieve(job_id)
        print(f"Status: {job.status}")
        
        if job.status == "succeeded":
            print(f"Training completed! Model: {job.fine_tuned_model}")
            return job.fine_tuned_model
        elif job.status == "failed":
            print("Training failed!")
            return None
        
        time.sleep(60)  # Check every minute
    
    return None

# OPTIMIZATION 6: Improved inference function
def get_price_improved(s):
    """Enhanced price extraction with better error handling"""
    if not s:
        return 0.0
    
    # Remove common prefixes and clean up
    s = s.replace('Price is $', '').replace('$', '').replace(',', '').strip()
    
    # Try multiple regex patterns
    patterns = [
        r"(\d+\.?\d*)",  # Basic number
        r"(\d{1,4}(?:,\d{3})*(?:\.\d{2})?)",  # Currency format
        r"[-+]?\d*\.?\d+",  # Scientific notation
    ]
    
    for pattern in patterns:
        match = re.search(pattern, s)
        if match:
            try:
                return float(match.group(1) if match.groups() else match.group())
            except ValueError:
                continue
    
    return 0.0

def gpt_fine_tuned_improved(item, model_name):
    """Improved inference with better error handling and consistency"""
    try:
        messages = messages_for_inference(item)
        
        response = openai.chat.completions.create(
            model=model_name,
            messages=messages,
            seed=42,
            max_tokens=20,  # Increased from 7
            temperature=0.1,  # Low temperature for consistency
            top_p=0.9
        )
        
        reply = response.choices[0].message.content
        price = get_price_improved(reply)
        
        # Sanity check - reject unrealistic prices
        if price < 0.01 or price > 50000:
            print(f"Warning: Unusual price prediction: ${price:.2f} for {item.title[:50]}...")
        
        return price
        
    except Exception as e:
        print(f"Error predicting price for {item.title[:50]}: {e}")
        return 0.0

# OPTIMIZATION 7: Enhanced evaluation
def evaluate_model_comprehensive(predictor, test_data, model_name="Fine-tuned Model"):
    """Comprehensive model evaluation with multiple metrics"""
    print(f"\n=== Evaluating {model_name} ===")
    
    # Run standard test
    tester = Tester(predictor, test_data, title=model_name)
    tester.run()
    
    # Additional analysis
    errors = tester.errors
    truths = tester.truths
    guesses = tester.guesses
    
    # Price range analysis
    price_ranges = {
        'low': [(t, g, e) for t, g, e in zip(truths, guesses, errors) if t < 25],
        'medium': [(t, g, e) for t, g, e in zip(truths, guesses, errors) if 25 <= t < 100],
        'high': [(t, g, e) for t, g, e in zip(truths, guesses, errors) if 100 <= t < 500],
        'very_high': [(t, g, e) for t, g, e in zip(truths, guesses, errors) if t >= 500]
    }
    
    print(f"\n=== Performance by Price Range ===")
    for range_name, data in price_ranges.items():
        if data:
            range_errors = [e for _, _, e in data]
            range_truths = [t for t, _, _ in data]
            avg_error = np.mean(range_errors)
            mape = np.mean([e/t for t, _, e in data if t > 0]) * 100
            print(f"{range_name.upper()}: {len(data)} items, Avg Error: ${avg_error:.2f}, MAPE: {mape:.1f}%")

In [None]:
# Wait for training to complete and then test
# You'll need to replace this with your actual fine-tuned model name
fine_tuned_model_name = monitor_training(fine_tuning_job.id)

In [None]:
# When ready to test:
if fine_tuned_model_name:
    test_predictor = lambda item: gpt_fine_tuned_improved(item, fine_tuned_model_name)
    evaluate_model_comprehensive(test_predictor, test[:250], "Optimized Fine-tuned Model")