In [None]:
import torch
from transformers import AutoModelForCausalLM, BitsAndBytesConfig, AutoTokenizer
import json
import os
import re


import sys
import importlib
# Add parent directory to sys.path
sys.path.append(os.path.abspath("../src"))
import mistral_testing
importlib.reload(mistral_testing)
from mistral_testing import generate_domains

import os
from dotenv import load_dotenv
load_dotenv()


In [None]:
print(f'GPU: {torch.cuda.get_device_name(0)}')
print(f'VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB')

MODEL_PATH = os.getenv("MISTRAL_MODEL_PATH")

# Test loading Llama 2 7B with 4-bit quantization
model = AutoModelForCausalLM.from_pretrained(
    MODEL_PATH,
    quantization_config=BitsAndBytesConfig(load_in_4bit=True),
    device_map='auto',
    # local_files_only=True
)
print('✅ Your Legion 5 Pro can handle it!')

In [None]:
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, local_files_only=True)

# File where results will be saved
output_file = "mistral.json"

# Load input businesses
with open("../data/all_businesses_descriptions_and_domains.json", "r") as f:
    data = json.load(f)
    data = data["results"]

# Load previous progress if exists
if os.path.exists(output_file):
    with open(output_file, "r") as f:
        results = json.load(f)
else:
    results = []

# Keep track of already processed business IDs to skip duplicates
processed_ids = {entry["business"]["fsq_place_id"] for entry in results if "business" in entry}

for entry in data:
    business = entry["business"]
    business_id = business["fsq_place_id"]

    # Skip if already processed
    if business_id in processed_ids:
        continue

    description = business["normalized_description"]
    name = business["name"]

    print(f"\n=== Generating for {name} ===")
    result = generate_domains(model, tokenizer, description)

    # Append new result
    results.append({
        "business": business,
        "generated_domains": result
    })

    # Save immediately after each generation
    with open(output_file, "w") as f:
        json.dump(results, f, indent=2)

    print(f"✅ Saved result for {name}")

print("\n🎉 All businesses processed and saved!")


In [None]:
# Test the function
description = "A modern coffee shop with artisanal pastries and coworking space"
result = generate_domains(model, tokenizer, description)

print("Generated Domains:")
print(json.dumps(result['domains'], indent=2))
print(f"\nOverall Confidence Metrics:")
print(f"Mean Confidence: {result['confidence']['mean_confidence']:.4f}")
print(f"Min Confidence: {result['confidence']['min_confidence']:.4f}")
print(f"Confidence Std: {result['confidence']['confidence_std']:.4f}")
print(f"Mean Entropy: {result['confidence']['entropy']:.4f}")
print(f"Overall Score: {result['overall_confidence_score']:.4f}")

# Print per-domain confidence scores
print(f"\n🎯 Per-Domain Confidence Scores:")
print("="*50)
for i, domain_conf in enumerate(result['domain_confidences'], 1):
    print(f"Domain {i}: {domain_conf['domain']}")
    print(f"  Mean Confidence: {domain_conf['mean_confidence']:.4f}")
    print(f"  Min Confidence:  {domain_conf['min_confidence']:.4f}")
    print(f"  Max Confidence:  {domain_conf['max_confidence']:.4f}")
    print(f"  Std Dev:         {domain_conf['confidence_std']:.4f}")
    print(f"  Mean Entropy:    {domain_conf['mean_entropy']:.4f}")
    print(f"  Overall Score:   {domain_conf['overall_score']:.4f}")
    print(f"  Tokens Used:     {domain_conf['num_tokens']}")
    print()

In [None]:
# Load your JSON file
with open("mistral.json", "r", encoding="utf-8") as f:
    data = json.load(f)

all_entries = []

for i, entry in enumerate(data, start=1):
    print(f"\n🔎 Processing entry {i}: {entry['business'].get('name')}")
    
    gen_text = entry.get("generated_domains", "")
    
    if not gen_text:
        print("  ⚠️ No 'generated_domains' field found.")
        entry["parsed_suggestions"] = []
        all_entries.append(entry)
        continue
    
    # Debug: show the beginning of the generated text
    print("  Generated text (first 200 chars):", gen_text[:200].replace("\n", " "), "...")
    
    # Look for JSON block after OUTPUT:
    match = re.search(r'OUTPUT:\s*(\{.*\})', gen_text, re.DOTALL)
    if match:
        json_str = match.group(1)
        print("  ✅ Found JSON block, length:", len(json_str))
        
        try:
            json_block = json.loads(json_str)
            suggestions = [s["domain"] for s in json_block.get("suggestions", [])]
            print("  ✅ Parsed suggestions:", suggestions)
            
            entry["parsed_suggestions"] = suggestions
        except json.JSONDecodeError as e:
            print("  ❌ JSON parsing failed:", e)
            entry["parsed_suggestions"] = []
    else:
        print("  ❌ No OUTPUT JSON block found in text.")
        entry["parsed_suggestions"] = []
    
    all_entries.append(entry)

# Save updated entries
with open("parsed_mistral.json", "w", encoding="utf-8") as f:
    json.dump(all_entries, f, indent=2, ensure_ascii=False)

print("\n✅ Done! Parsed suggestions added to each entry.")
