In [None]:
#  Global Imports and Environment Setup

# Load API Key from .env file
from dotenv import load_dotenv
load_dotenv()

# Future annotations for type hints
from __future__ import annotations

# Standard Libraries
import os, re, json, math, shelve, atexit, hashlib, random, itertools, asyncio, time, textwrap
from pathlib import Path
from datetime import datetime
from collections import Counter
import concurrent.futures as cf

# Third-Party Libraries
import pandas as pd
import numpy as np
import tqdm

from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langchain_neo4j import Neo4jGraph
from langgraph.graph import StateGraph, END
import tenacity
import openai

# Jupyter notebook display utilities
from IPython.display import display

print("Imports & environment ready")

➜ Before cleaning — Abt sample:


Unnamed: 0,id,name,description,price
0,552,Sony Turntable - PSLX350H,Sony Turntable - PSLX350H/ Belt Drive System/ ...,
1,580,Bose Acoustimass 5 Series III Speaker System -...,Bose Acoustimass 5 Series III Speaker System -...,$399.00
2,4696,Sony Switcher - SBV40S,Sony Switcher - SBV40S/ Eliminates Disconnecti...,$49.00


➜ Before cleaning — Buy sample:


Unnamed: 0,id,name,description,manufacturer,price
0,10011646,Linksys EtherFast EZXS88W Ethernet Switch - EZ...,Linksys EtherFast 8-Port 10/100 Switch (New/Wo...,LINKSYS,
1,10140760,Linksys EtherFast EZXS55W Ethernet Switch,5 x 10/100Base-TX LAN,LINKSYS,
2,10221960,Netgear ProSafe FS105 Ethernet Switch - FS105NA,NETGEAR FS105 Prosafe 5 Port 10/100 Desktop Sw...,Netgear,



➜ After cleaning — Abt sample:


Unnamed: 0,prod_id,source,name,title_clean,brand_norm,price,price_bucket
0,552,abt,Sony Turntable - PSLX350H,sony turntable pslx350h,sony,,
1,580,abt,Bose Acoustimass 5 Series III Speaker System -...,bose acoustimass 5 series iii speaker system a...,bose,399.0,200-499
2,4696,abt,Sony Switcher - SBV40S,sony switcher sbv40s,sony,49.0,0-49


➜ After cleaning — Buy sample:


Unnamed: 0,prod_id,source,name,title_clean,brand_norm,price,price_bucket
0,10011646,buy,Linksys EtherFast EZXS88W Ethernet Switch - EZ...,linksys etherfast ezxs88w ethernet switch ezxs88w,linksys,,
1,10140760,buy,Linksys EtherFast EZXS55W Ethernet Switch,linksys etherfast ezxs55w ethernet switch,linksys,,
2,10221960,buy,Netgear ProSafe FS105 Ethernet Switch - FS105NA,netgear prosafe fs105 ethernet switch fs105na,netgear,,


In [None]:
# Direct pipeline to the correct files 
DATA_DIR = Path("/Users/arodriguez/Python_/Python_/Environment1/Research/entity_resolution_agent")
ABT_CSV  = DATA_DIR / "Abt.csv"
BUY_CSV  = DATA_DIR / "Buy.csv"

# Load our data files 
ENC = "latin1"  # Encoding used in the original CSV files
abt = pd.read_csv(ABT_CSV, encoding=ENC, engine="python") 
buy = pd.read_csv(BUY_CSV, encoding=ENC, engine="python")

# Show raw tables before any cleaning
print("➜ Before cleaning — Abt sample:")
display(abt.head(3))
print("➜ Before cleaning — Buy sample:")
display(buy.head(3))

# Build a stable unique identifier for each product
abt["prod_id"] = abt["id"].astype(str)
buy["prod_id"] = buy["id"].astype(str)

# Create a unique identifier to know which table a product comes from
abt["source"] = "abt"
buy["source"] = "buy"

# Very Light Cleaning to make the data more consistent (lower-case, strip punctuation and whitespace)
def clean_text(s: str) -> str:
    s = str(s).lower()
    s = re.sub(r"[^a-z0-9 ]", " ", s)
    return re.sub(r"\s+", " ", s).strip()

def brand_from_title(title: str) -> str:
    m = re.match(r"([a-zA-Z0-9]+)", str(title))
    return m.group(1).lower() if m else ""

# Define price buckets based on the price ranges
def price_bucket(price) -> str:
    if price is None or (isinstance(price, float) and math.isnan(price)):
        return ""
    p = float(price)
    if   p <  50:  return "0-49"
    elif p < 100:  return "50-99"
    elif p < 200:  return "100-199"
    elif p < 500:  return "200-499"
    else:          return "500+"

# Apply cleaning functions to both dataframes
for df in (abt, buy):
    # normalize prices
    df["price"] = (
        df["price"]
          .astype(str)
          .str.replace(r"[^\d.]", "", regex=True)
          .replace("", float("nan"))
          .astype(float)
    )
    df["price_bucket"] = df["price"].map(price_bucket)

    # clean titles & extract brand token
    df["title_clean"] = df["name"].map(clean_text)
    df["brand_norm"]  = df["name"].map(brand_from_title)

# Show small sample after cleaning
print("\n➜ After cleaning — Abt sample:")
display(abt[["prod_id","source","name","title_clean","brand_norm","price","price_bucket"]].head(3))
print("➜ After cleaning — Buy sample:")
display(buy[["prod_id","source","name","title_clean","brand_norm","price","price_bucket"]].head(3))

# Concatenate the two dataframes into a single one
df = pd.concat([abt, buy], ignore_index=True)

In this step, we are loading our data and performing very light cleaning on our datasets. We want to create unique indentifiers for the products to ensure that when we are looking for matches we are comparing correctly. This step is important because we want our product name, brands and prices to be uniform. 

In [None]:
# Load the perfectMapping csv file which contains our ground truth pairs
GOLD_CSV = DATA_DIR / "abt_buy_perfectMapping.csv"

# Reading the perfectMapping CSV file and showing the number of rows
gold_raw = pd.read_csv(GOLD_CSV, encoding="latin1", engine="python")
print("Rows in perfectMapping :", len(gold_raw))

# Build a lookup table from our master dataframe which was created in the previous step
orig_to_pid = (
    df[["id", "source", "prod_id"]]          
      .set_index(["source", "id"])["prod_id"]
      .to_dict()
)
# Define a function to convert original IDs to product IDs
def to_pid(src:str, orig_id)->str|None:
    return orig_to_pid.get((src, orig_id))

# Add converted product IDs to the gold_raw DataFrame
gold_raw["pid_abt"] = gold_raw["idAbt"].map(lambda x: to_pid("abt", x))
gold_raw["pid_buy"] = gold_raw["idBuy"].map(lambda x: to_pid("buy", x))

# Drop rows that could not be converted to product IDs 
gold = gold_raw.dropna(subset=["pid_abt", "pid_buy"]).copy()
# Coverting each row to take two product IDs as tuples do prod_1 and prod_2 are treated as the same pair as prod_2 and prod_1
gold_pairs = {tuple(sorted(t)) for t in gold[["pid_abt", "pid_buy"]].values}
print(f" Gold pairs ready : {len(gold_pairs):,}")

Rows in perfectMapping : 1097
✅ Gold pairs ready : 1,097


In this step we are loading our ground truth to be able to test the accuracy and recall of our pipeline. This is important because in our ground truth we only have comparisons between abt -> buy, we don't have buy -> buy pairs.

In [None]:
# Display a few random pairs from the gold standard (ground truth)
for i,(a,b) in enumerate(random.sample(list(gold_pairs), 5),1):
    ta = df.loc[df.prod_id==a, "title_clean"].values[0]
    tb = df.loc[df.prod_id==b, "title_clean"].values[0]
    print(f"{i}.  {ta[:60]}  ↔  {tb[:60]}")

1.  garmin deluxe carrying case black finish 0101023101  ↔  garmin canvas deluxe carry case 010 10231 01
2.  samsung ht bd2t blu ray 7 1 home theater system blu ray dvd   ↔  samsung 7 1 channel blu ray home theater system htbd2txaa
3.  danby ddw497w countertop dishwasher  ↔  danby white countertop dishwasher ddw497wh
4.  panasonic hd 3mos 60gb hard disk drive sd hybrid camcorder w  ↔  panasonic black high defintion 60gb hard disk drive sd hybri
5.  garmin suction cup mount 010 10936 00  ↔  garmin vehicle suction cup mount 0101093600


In [None]:
# Pushing our cleaned data into Neo4j                                
# Connect to Neo4j database through the use of Docker who is hosting our instance
kb = Neo4jGraph(
    url      = "bolt://localhost:7688", # This is the port where Neo4j is running
    username = "neo4j",     
    password = "testpass",
    refresh_schema = False, # Disable automatic schema refresh
    sanitize       = False, # Disable sanitization of input data
    driver_config  = {
        "notifications_min_severity": "NONE"
    }
)

# Create a one-time constraint to ensure prod_id is unique for Product nodes 
kb.query("""
CREATE CONSTRAINT product_pk IF NOT EXISTS
FOR (p:Product) REQUIRE p.prod_id IS UNIQUE
""")

# Prepare the data for ingestion into Neo4j
keep_cols = [
    "prod_id",      # primary key
    "source",       # 'abt' / 'buy'
    "name",         # raw product title 
    "title_clean",  # cleaned-up title
    "brand_norm",   # cleaned-up brand
    "price"         # may be NaN / empty
]

# Concatenate the two dataframes into a single one for ingestion
ingest_df = pd.concat([abt[keep_cols], buy[keep_cols]], ignore_index=True)

# Ensure all columns are of the correct type
for col in ["prod_id", "source", "name", "title_clean", "brand_norm"]:
    ingest_df[col] = ingest_df[col].fillna("").astype(str)

# Ensure price is a float, filling NaNs with 0.0
ingest_df["price"] = ingest_df["price"].fillna(0.0).astype(float)

# Convert the DataFrame to a list of dictionaries for bulk ingestion
rows = ingest_df.to_dict("records")
print("Rows ready →", len(rows))

# Cypher query for bulk ingestion of Product nodes 
cypher = """
UNWIND $batch AS row
MERGE (p:Product {prod_id: row.prod_id})
SET   p.source      = row.source,
      p.name        = row.name,
      p.title_clean = row.title_clean,
      p.brand_norm  = row.brand_norm,
      p.price       = toFloat(row.price)
"""

# Ingest the data in batches to avoid overwhelming the database
BATCH = 5_000
for start in tqdm.tqdm(range(0, len(rows), BATCH), desc="Neo4j ingest"):
    kb.query(cypher, params={"batch": rows[start : start + BATCH]})

print(" Product nodes now:",
      kb.query("MATCH (:Product) RETURN count(*) AS n")[0]["n"])

Rows ready → 2173


Neo4j ingest: 100%|██████████| 1/1 [00:00<00:00,  2.91it/s]

✅  Product nodes now: 2173





Here we are establishing a connection with Docker for it to host our Neo4j instance locally. We then begin preparing our data for ingestion. Firstly we want to create a one-time-uniqueness constraint, what this means is that if we were to add rows to our data our previous ingestion would not be carried over and only the new changes will be ingested. With a cypher is how we communicate to Neo4j how we want our data to be stored. Lastly we want to do this in batches to avoid crashes and ensure that its a smooth process. 

In [None]:
# Creation of helper nodes and relationships for brands and name tokens

# Ensure the DataFrame has the necessary columns for helper nodes
for col in ["brand_norm", "title_clean"]:
    df[col] = df[col].fillna("").astype(str)

# Define a set of stop words to ignore in name tokens (we want to focus on meaningful tokens)
STOP = {"the", "and", "for", "with", "inch", "cm", "mm"}
def name_tokens(title: str):
    for tok in re.split(r"\W+", title.lower()):
        if len(tok) > 2 and tok not in STOP:
            yield tok

# Create constraints for Brand and NameTok nodes to ensure uniqueness
kb.query("CREATE CONSTRAINT brand_pk IF NOT EXISTS FOR (b:Brand)  REQUIRE b.name IS UNIQUE")
kb.query("CREATE CONSTRAINT tok_pk   IF NOT EXISTS FOR (t:NameTok) REQUIRE t.tok  IS UNIQUE")

# Cypher query to create helper nodes and relationships
cypher = """
UNWIND $rows AS row
MATCH (p:Product {prod_id: row.prod_id})

/* Brand helper */
WITH p, row
WHERE row.brand <> ""
MERGE (b:Brand {name: row.brand})
MERGE (p)-[:HAS_BRAND]->(b)

/* Name-token helpers */
WITH p, row
UNWIND row.name_toks AS tok
MERGE (t:NameTok {tok: tok})
MERGE (p)-[:HAS_TOK]->(t)
"""

# Once again, we will ingest the data in batches to avoid overwhelming the database
BATCH = 2_000
sent = 0
payload = []

for rec in tqdm.tqdm(df.itertuples(index=False), total=len(df), desc="helper rows"):
    d = rec._asdict()
    payload.append({
        "prod_id": d["prod_id"],
        "brand":   d["brand_norm"],
        "name_toks": list(name_tokens(d["title_clean"])),
    })
    if len(payload) == BATCH:
        kb.query(cypher, params={"rows": payload})
        sent += len(payload)
        payload.clear()

# Send any remaining payload that didn't fill a complete batch
if payload:
    kb.query(cypher, params={"rows": payload})
    sent += len(payload)

print(f" Helper relationships added for {sent:,} products")
print(" Distinct Brands   :", kb.query('MATCH (:Brand)   RETURN count(*) AS n')[0]['n'])
print(" Distinct Name-Toks:", kb.query('MATCH (:NameTok) RETURN count(*) AS n')[0]['n'])

helper rows: 100%|██████████| 2173/2173 [00:01<00:00, 1936.07it/s]


✅ Helper rels added for 2,173 products
• distinct brands   : 155
• distinct name-toks: 3369


1.	Extract brand (we populated that column during ingestion) ⇒ connects each product to a (:Brand) node.
2.	Tokenise the normalised title into simple word tokens ⇒ connects to (:NameTok) nodes.
3.	Adds uniqueness constraints so duplicates collapse to a single helper node.
4.	Processes the dataframe in chunks so memory usage stays small.

In [None]:
# Establish our Retrieval Cypher Query to fetch the product subgraphs
# How many neighbours per helper, and a hard cap
K_BRAND, K_TOK, HARD_CAP = 15, 8, 120

# Function to retrieve a product subgraph based on two product IDs
def get_product_subgraph(id1, id2) -> str | None:
    id1, id2 = str(id1), str(id2)
    params = {"id1": id1, "id2": id2}
    
    cypher = f"""
    MATCH (a:Product {{prod_id: $id1}})
    MATCH (b:Product {{prod_id: $id2}})

    OPTIONAL MATCH (a)-[:HAS_BRAND]->(brA:Brand)
    OPTIONAL MATCH (b)-[:HAS_BRAND]->(brB:Brand)
    OPTIONAL MATCH (a)-[:HAS_TOK]->(tA:NameTok)
    OPTIONAL MATCH (b)-[:HAS_TOK]->(tB:NameTok)

    WITH a,b,
         coalesce(
           apoc.coll.toSet(collect(brA)+collect(brB)),
           collect(distinct brA)+collect(distinct brB)
         ) AS brands,
         coalesce(
           apoc.coll.toSet(collect(tA)+collect(tB)),
           collect(distinct tA)+collect(distinct tB)
         ) AS toks

    // ── rank brand neighbours by price distance ──────────────────────────────
    UNWIND brands AS br
    MATCH (br)<-[:HAS_BRAND]-(p1:Product)
    WHERE NOT p1.prod_id IN [$id1,$id2]
    WITH a,b,brands,toks,p1
    ORDER BY abs(p1.price - a.price)
    WITH a,b,brands,toks,
         collect(distinct p1)[0..{K_BRAND}] AS via_brand

    // ── rank token neighbours by title Levenshtein distance ──────────────────
    UNWIND toks AS t
    MATCH (t)<-[:HAS_TOK]-(p2:Product)
    WHERE NOT p2.prod_id IN [$id1,$id2]
    WITH a,b,brands,toks,via_brand,p2
    ORDER BY apoc.text.levenshteinDistance(a.title_clean, p2.title_clean)
    WITH a,b,brands,toks,via_brand,
         collect(distinct p2)[0..{K_TOK}] AS via_tok

    WITH a,b,
         brands      AS brand_nodes,
         toks        AS tok_nodes,
         apoc.coll.toSet(via_brand + via_tok)[0..{HARD_CAP}] AS others
    RETURN
         [a,b]       AS products,
         brand_nodes AS brands,
         tok_nodes   AS tokens,
         others      AS neighbours
    """

    result = kb.query(cypher, params)
    if not result:
        return None
    rec = result[0]

    def serialise(node):
        return {"labels": list(node.labels), **node._properties} if hasattr(node, "labels") else node

    payload = {
        "products"  : [serialise(n) for n in rec["products"]],
        "brands"    : [serialise(n) for n in rec["brands"]],
        "tokens"    : [serialise(n) for n in rec["tokens"]],
        "neighbours": [serialise(n) for n in rec["neighbours"]],
    }
    return json.dumps(payload, ensure_ascii=False, indent=2)

# Sanity check to ensure that we are fetching one product from each file
idA = random.choice([i for i in abt["prod_id"]])
idB = random.choice([i for i in buy["prod_id"]])
print(get_product_subgraph(idA, idB)[:500], "…")

{
  "products": [
    {
      "price": 16.0,
      "name": "Canon Cyan Photo Ink Cartridge - Cyan - CLI8PC",
      "source": "abt",
      "title_clean": "canon cyan photo ink cartridge cyan cli8pc",
      "brand_norm": "canon",
      "prod_id": "20458"
    },
    {
      "price": 0.0,
      "name": "LG 25.0 Cu.Ft. Total Capacity",
      "source": "buy",
      "title_clean": "lg 25 0 cu ft total capacity",
      "brand_norm": "lg",
      "prod_id": "208114672"
    }
  ],
  "brands": [
    {
      …


In [None]:
# LangGraph — Multi-Agent Entity Resolution System

# Setup LangGraph multi-agent system for entity resolution
# This creates a workflow with two AI agents (A & B) plus a manager to decide if products match

from __future__ import annotations
import json, re, random, textwrap, hashlib
from datetime import datetime
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END

# Configure decision thresholds for the Manager agent
PB_HIGH = 0.60   # If Agent-B confidence ≥ 0.60 → MATCH
PB_LOW  = 0.40   # If Agent-B confidence ≤ 0.40 → NO_MATCH

# Initialize our main language model for the agents
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# Dynamic Few-Shot Learning: Pull human-labeled examples from Neo4j
# This helps the AI agents learn from previous human decisions
HUMAN_LIMIT = 5
human_exs = kb.query(f"""
MATCH (h:HumanLabel)-[:RESOLVES]->(a:Product),
      (h)-[:RESOLVES]->(b:Product)
WHERE h.label IN ['MATCH','NO_MATCH']
RETURN h.label AS label,
       a.title_clean AS titleA, a.brand_norm AS brandA, a.price AS priceA,
       b.title_clean AS titleB, b.brand_norm AS brandB, b.price AS priceB
LIMIT {HUMAN_LIMIT}
""")

# Build dynamic examples for Agent B from human labels
# Convert human decisions into training examples with probability scores
dynamic_B = []
for ex in human_exs:
    # Calculate price difference as percentage
    gap = abs(ex["priceA"] - ex["priceB"]) / max(ex["priceA"] or 1, ex["priceB"] or 1) * 100
    blob = json.dumps({
        "titleA": ex["titleA"], "brandA": ex["brandA"],
        "titleB": ex["titleB"], "brandB": ex["brandB"],
        "price_gap_pct": round(gap,1)
    })
    # Convert human label to probability: MATCH=1.00, NO_MATCH=0.00
    p = 1.00 if ex["label"] == "MATCH" else 0.00
    dynamic_B.append(f"{p:.2f}\n{blob}")

# Static training examples for Agent A (categorical decisions)
# These are hardcoded examples to guide Agent A's YES/NO/UNSURE decisions
FEWSHOT_A = """\
Example-1 (same)       → YES – identical title+brand
{"title_clean":"sony cyber-shot dsc-h3 digital camera 8mp","brand_norm":"sony","price":249.99}
{"title_clean":"sony cyber-shot dsc-h3 8 megapixel camera","brand_norm":"sony","price":249.99}

Example-2 (different)  → NO – brands differ
{"title_clean":"apple iphone 12 64gb black","brand_norm":"apple","price":799}
{"title_clean":"samsung galaxy s21 128gb phantom gray","brand_norm":"samsung","price":799}

Example-3 (uncertain)  → UNSURE – title overlap / price far
{"title_clean":"hp pavilion 15-eg laptop i7","brand_norm":"hp","price":699}
{"title_clean":"hp pavilion laptop 15.6 inch","brand_norm":"hp","price":999}
"""

# Combined training examples for Agent B (probability scoring)
# Static examples + dynamic examples from human labels
FEWSHOT_B = """\
0.95
{"titleA":"sony cyber-shot dsc-h3 digital camera 8mp","brandA":"sony",
 "titleB":"sony cyber-shot dsc-h3 8-megapixel camera","brandB":"sony",
 "price_gap_pct":0.0}
…
0.05
{"titleA":"canon eos r10 mirrorless camera","brandA":"canon",
 "titleB":"dell inspiron 15 laptop","brandB":"dell",
 "price_gap_pct":250.0}
""" + "\n\n" + "\n\n".join(dynamic_B)

# Define the state structure that flows through our LangGraph workflow
# This tracks all information as it moves between agents
class ERState(BaseModel):
    id1: str                    # First product ID
    id2: str                    # Second product ID
    graph_ctx    : dict | None = None    # Neo4j subgraph context
    human_label  : str  | None = None    # Human override if exists
    agentA_lbl   : str  | None = None    # Agent A's YES/NO/UNSURE decision
    agentA_reason: str  | None = None    # Agent A's reasoning
    agentB_prob  : float| None = None    # Agent B's probability score (0-1)
    decision     : str  | None = None    # Final manager decision

# Helper function: Check if humans have already labeled this pair
# Looks in both directions (a,b) and (b,a) since pairs are bidirectional
def get_human_label(a: str, b: str) -> str | None:
    rec = kb.query("""
        MATCH (h:HumanLabel {id1:$a,id2:$b}) RETURN h.label AS lbl
        UNION
        MATCH (h:HumanLabel {id1:$b,id2:$a}) RETURN h.label AS lbl
    """, {"a": a, "b": b})
    return rec[0]["lbl"] if rec else None

# Step 1: Retrieve context and check for human labels
# Only processes Abt↔Buy pairs (skips same-source comparisons)
def retrieve_fn(s: ERState):
    # Extract source from product IDs (format: "source_id")
    src1, src2 = s.id1.split("_",1)[0], s.id2.split("_",1)[0]
    
    # Skip if both products from same source (abt-abt or buy-buy)
    if src1 == src2:
        return {"graph_ctx": None, "human_label": None}
    
    # Check if humans have already labeled this pair
    hl  = get_human_label(s.id1, s.id2)
    
    # Get Neo4j subgraph context only if no human label exists
    ctx = None if hl else get_product_subgraph(s.id1, s.id2)
    return {"graph_ctx": json.loads(ctx) if ctx else None, "human_label": hl}

# Step 2: Agent A - Categorical matcher (YES/UNSURE/NO decisions)
# Uses few-shot examples to make quick categorical decisions
def agentA_fewshot(s: ERState):
    # Return UNSURE if no context available
    if s.graph_ctx is None:
        return {"agentA_lbl":"UNSURE","agentA_reason":"missing context"}
    
    # Build prompt with guidelines and examples
    prompt = f"""You are **Agent A** (categorical matcher).

Return ONE line:
YES – …reason…    /    UNSURE – …reason…    /    NO – …reason…
(≤10 words after the dash)

Guidelines
• Exact same brand + very similar titles → YES
• Completely different brands            → NO
• Same brand but price gap > 30% OR fuzzy title → UNSURE

{FEWSHOT_A}

Pair ↓
{s.graph_ctx}
Answer:"""
    
    # Get LLM response and parse the decision
    raw = llm.invoke(prompt).content.strip()
    lbl,rsn = "UNSURE",""
    
    # Parse the response format "DECISION – reason"
    for sep in ("–","-","—",":"):
        if sep in raw:
            lbl,rsn = [x.strip() for x in raw.split(sep,1)]
            break
    
    lbl=lbl.upper()
    # Validate the decision label
    if lbl not in {"YES","UNSURE","NO"}: 
        lbl,rsn="UNSURE","unrecognised"
    
    return {"agentA_lbl":lbl,"agentA_reason":" ".join(rsn.split()[:10])}

# Step 3: Agent B - Probability scorer (0.0 to 1.0 confidence)
# Uses dynamic examples to output a precise match probability
def agentB_prob(s: ERState):
    # Return neutral probability if no context
    if s.graph_ctx is None:
        return {"agentB_prob":0.50}
    
    # Extract product information and calculate price gap
    a,b = s.graph_ctx["products"]
    gap = abs((a.get("price") or 0)-(b.get("price") or 0)) / max(a.get("price") or 1, b.get("price") or 1)*100
    
    # Create comparison blob for the LLM
    blob = json.dumps({
        "titleA":a.get("title_clean",""), "brandA":a.get("brand_norm",""),
        "titleB":b.get("title_clean",""), "brandB":b.get("brand_norm",""),
        "price_gap_pct":round(gap,1)
    })
    
    # Build prompt with dynamic examples
    prompt = f"""You are Agent B (0-1 scorer).

Guideline examples:
{FEWSHOT_B}

Pair:
{blob}
Answer:"""
    
    # Parse probability from LLM response
    try:
        p = float(re.findall(r"\d*\.?\d+", llm.invoke(prompt).content)[0])
    except:
        p = 0.5  # Default to neutral if parsing fails
    
    # Ensure probability is between 0 and 1
    return {"agentB_prob":max(0,min(1,p))}

# Step 4: Manager - Makes final decision using hierarchical logic
# Combines Agent A and Agent B outputs with configurable thresholds
def manager_fn(s: ERState):
    # Priority 1: Human override always wins
    if s.human_label:
        return {"decision": s.human_label}

    lbl, p = s.agentA_lbl, s.agentB_prob

    # Priority 2: Strong Agent-B confidence signals
    if p >= PB_HIGH:    # High confidence (≥0.60) → MATCH
        return {"decision": "MATCH"}
    if p <= PB_LOW:     # Low confidence (≤0.40) → NO_MATCH
        return {"decision": "NO_MATCH"}

    # Priority 3: Mid-range confidence → use Agent-A decision
    if lbl == "YES":
        return {"decision": "MATCH"}
    if lbl == "NO":
        return {"decision": "NO_MATCH"}

    # Priority 4: Everything else → need more data
    return {"decision": "NEED_MORE_DATA"}

# Step 5: Persist - Save the resolution decision to Neo4j
# Creates Resolution nodes with relationships to both products
def persist_fn(s: ERState):
    # Create deterministic pair key for consistent storage
    id_a, id_b = sorted((s.id1, s.id2))
    pair_key = hashlib.sha1(f"{id_a}|{id_b}".encode()).hexdigest()

    # Store resolution in Neo4j with all agent outputs
    kb.query("""
    MERGE (r:Resolution {key:$k})
      SET r.id1      = $a,
          r.id2      = $b,
          r.decision = $d,
          r.agentA   = $lbl,
          r.reason   = $rsn,
          r.prob     = $p,
          r.ts       = $ts
    WITH r
    MATCH (a:Product {prod_id:$a}), (b:Product {prod_id:$b})
    // Create bidirectional relationships to both products
    MERGE (r)-[e1:RESOLVES]->(a)
      SET e1.prob = $p, e1.decision = $d
    MERGE (r)-[e2:RESOLVES]->(b)
      SET e2.prob = $p, e2.decision = $d
    """, {
        "k": pair_key,
        "a": id_a, "b": id_b, "d": s.decision,
        "lbl": s.agentA_lbl, "rsn": s.agentA_reason,
        "p": s.agentB_prob,
        "ts": datetime.utcnow().isoformat()
    })
    return {}

# Step 6: Assemble the LangGraph workflow
# Creates a directed graph with nodes (functions) and edges (data flow)
sg = StateGraph(ERState)

# Add all our workflow nodes
sg.add_node("retrieve", retrieve_fn)    # Get context & human labels
sg.add_node("agentA",   agentA_fewshot) # Categorical decisions
sg.add_node("agentB",   agentB_prob)    # Probability scoring
sg.add_node("mgr",      manager_fn)     # Final decision logic
sg.add_node("store",    persist_fn)     # Save to Neo4j

# Define the workflow sequence
sg.set_entry_point("retrieve")         # Start here
sg.add_edge("retrieve", "agentA")       # Retrieve → Agent A
sg.add_edge("retrieve", "agentB")       # Retrieve → Agent B (parallel)
sg.add_edge("agentA",   "mgr")          # Agent A → Manager
sg.add_edge("agentB",   "mgr")          # Agent B → Manager
sg.add_edge("mgr",      "store")        # Manager → Store
sg.add_edge("store",    END)            # Store → End

# Compile the graph into an executable workflow
product_graph = sg.compile()
print("✅  LangGraph re-compiled  (static prompts + human override)")

# Step 7: Test the workflow with a pretty output function
# This function runs a single pair through the entire workflow and displays results
def pretty_smoke(idA, idB):
    # Run the complete LangGraph workflow
    out = product_graph.invoke({"id1": idA, "id2": idB})
    
    # Display formatted results
    print(f"\n🆚  {idA} ↔ {idB}")
    print(f"Agent A  : {out['agentA_lbl']:<6}│ {out['agentA_reason']}")
    print(f"Agent B  : {out['agentB_prob']:.2f}")
    if out.get("human_label"):
        print(f"⚠️  Human override → {out['human_label']}")
    print(f"Manager  : {out['decision']}")
    print("-" * 60)

# Demo: Test with one random product from each catalog
from random import choice
idA = choice([i for i in abt["prod_id"]])  # Random Abt product
idB = choice([i for i in buy["prod_id"]])  # Random Buy product
pretty_smoke(idA, idB)

✅  LangGraph re-compiled  (static prompts + human override)

🆚  36502 ↔ 203136102
Agent A  : NO    │ brands differ
Agent B  : 0.05
Manager  : NO_MATCH
------------------------------------------------------------


What this does
1. Matches the two focal Product nodes (a, b).
2. Collects their Brand and NameTok helper nodes.
3. Pulls in a limited number of other products that share those helpers.
4. Trims everything to a hard cap so the prompt stays small.
5. Serialises to JSON so the LangGraph agents can consume it.

In [None]:
# STEP 6 · Stress Test Evaluation: Testing our pipeline at scale               
###############################################################################

# Run our LangGraph pipeline on a large dataset to measure performance
# This combines candidate generation + LLM evaluation + precision/recall metrics

import asyncio, random, time, tenacity, tqdm, openai, re, json
from collections import Counter
import pandas as pd

# Configuration parameters for the evaluation
CAND_LIMIT      = 5_000   # Maximum candidates (for memory management)
NEG_SAMPLE_SIZE = 500     # How many negative examples to test
RANDOM_SEED     = 42      # For reproducible results
MAX_WORKERS     = 4       # Parallel processing limit
TPM_PADDING     = 1.2     # Rate limiting: pause between API calls  
MAX_RETRIES     = 6       # Retry failed API calls

# Part A: Generate candidate pairs using multiple blocking strategies
# Combine pandas-based blocking with Neo4j graph-based candidate generation

# Strategy 1: Pandas blocking on brand + price bucket
abt_block = abt[["prod_id","brand_norm","price_bucket"]].copy()
buy_block = buy[["prod_id","brand_norm","price_bucket"]].copy()

# Find products with same brand and price range
blk1 = pd.merge(abt_block, buy_block,
                on=["brand_norm","price_bucket"],
                suffixes=("_abt","_buy"))[["prod_id_abt","prod_id_buy"]]

# Strategy 2: Pandas blocking on first token of title
abt_tok = abt[["prod_id","title_clean"]].copy().assign(
    first_tok=lambda df: df["title_clean"].str.split().str[0]
)
buy_tok = buy[["prod_id","title_clean"]].copy().assign(
    first_tok=lambda df: df["title_clean"].str.split().str[0]
)

# Find products with same starting word
blk2 = pd.merge(abt_tok, buy_tok,
                on="first_tok",
                suffixes=("_abt","_buy"))[["prod_id_abt","prod_id_buy"]]

# Combine both pandas strategies
cand_pb = {(a,b) for a,b in zip(blk1["prod_id_abt"], blk1["prod_id_buy"])}
cand_pb |= {(a,b) for a,b in zip(blk2["prod_id_abt"], blk2["prod_id_buy"])}

# Strategy 3: Neo4j graph-based candidate generation
# Find products connected through shared brands or name tokens
neo = kb.query("""
CALL {
  MATCH (a:Product)-[:HAS_BRAND]->(br:Brand)<-[:HAS_BRAND]-(b:Product)
    WHERE a.source='abt' AND b.source='buy'
    RETURN a.prod_id AS id1, b.prod_id AS id2
  UNION
  MATCH (a:Product)-[:HAS_TOK]->(t:NameTok)<-[:HAS_TOK]-(b:Product)
    WHERE a.source='abt' AND b.source='buy'
    RETURN a.prod_id AS id1, b.prod_id AS id2
}
WITH DISTINCT id1, id2
RETURN id1, id2
""")
cand_neo = {(r["id1"], r["id2"]) for r in neo}

# Union all candidate generation strategies
all_cand = list(cand_pb | cand_neo)
print(f"Total candidates after union: {len(all_cand):,}")

# Part B: Create balanced evaluation set
# Include ALL gold standard pairs + random sample of negatives for fair testing

# Separate candidates into positive (gold) and negative examples
gold_cand = [p for p in all_cand if tuple(sorted(p)) in gold_pairs]  # True matches
others    = [p for p in all_cand if tuple(sorted(p)) not in gold_pairs]  # Potential negatives

# Sample negative examples to balance the dataset
needed_negs = min(NEG_SAMPLE_SIZE, len(others))
random.seed(RANDOM_SEED)  # Ensure reproducible results
neg_sample  = random.sample(others, needed_negs)

# Create final evaluation set: all gold pairs + sampled negatives
eval_pairs = gold_cand + neg_sample
print(f"⚡ Evaluating {len(gold_cand)} gold + {len(neg_sample)} negs → total {len(eval_pairs)}")

# Part C: Parallel execution with rate limiting and error handling
# Process many pairs efficiently while respecting OpenAI API limits

# Handle different OpenAI library versions for rate limit errors
try:
    RateLimitErr = openai.RateLimitError
except AttributeError:
    import openai.error as _oe
    RateLimitErr = _oe.RateLimitError

# Wrapper function with automatic retry logic for API failures
@tenacity.retry(
    reraise=True,
    retry=tenacity.retry_if_exception_type(RateLimitErr),
    stop=tenacity.stop_after_attempt(MAX_RETRIES),
    wait=tenacity.wait_exponential(multiplier=2, min=2, max=30),
)
def safe_invoke(a: str, b: str):
    # Run our LangGraph pipeline on one pair
    out = product_graph.invoke({"id1": a, "id2": b})
    time.sleep(TPM_PADDING)  # Rate limiting pause
    return out

# Set up async processing with concurrency control
sem  = asyncio.Semaphore(MAX_WORKERS)  # Limit concurrent API calls
loop = asyncio.get_running_loop()

async def run_all(pairs):
    """Process all pairs in parallel with progress tracking"""
    async def one(pair):
        async with sem:  # Respect concurrency limit
            return await loop.run_in_executor(None, safe_invoke, *pair)
    
    # Create tasks for all pairs
    tasks = [asyncio.create_task(one(p)) for p in pairs]
    results = []
    
    # Process with progress bar
    for fut in tqdm.tqdm(asyncio.as_completed(tasks),
                         total=len(tasks), desc="Evaluating"):
        results.append(await fut)
    return results

# Execute the evaluation
results = await run_all(eval_pairs)

# Part D: Analyze results and calculate performance metrics
# Show decision distribution and compute precision/recall against ground truth

# Show the distribution of final decisions
print("\nDecision counts →", Counter(r["decision"] for r in results))

# Calculate precision and recall metrics
# pred_set = all pairs our system classified as MATCH
pred_set = {tuple(sorted((r["id1"],r["id2"]))) 
            for r in results if r["decision"]=="MATCH"}

# Calculate confusion matrix components
tp = len(pred_set & gold_pairs)    # True Positives: correct MATCH predictions
fp = len(pred_set - gold_pairs)    # False Positives: incorrect MATCH predictions  
fn = len(gold_pairs - pred_set)    # False Negatives: missed MATCH pairs

# Compute standard metrics
precision = tp / (tp + fp) if tp + fp else 0.0  # Of predicted matches, how many were correct?
recall    = tp / (tp + fn) if tp + fn else 0.0  # Of actual matches, how many did we find?
print(f"\nPrecision {precision:0.2f}   |   Recall {recall:0.2f}")

# Part E: Inspect high-confidence predictions for quality analysis
def fetch_pairs(dec: str, limit: int = 10, asc: bool = False):
    """Extract pairs with specific decision for manual inspection"""
    rows = []
    for r in results:
        if r["decision"] != dec:
            continue
        ctx = r.get("graph_ctx")
        # Skip pairs without proper context (shouldn't happen in normal cases)
        if not ctx or "products" not in ctx or len(ctx["products"]) < 2:
            continue
        p1, p2 = ctx["products"]
        rows.append({
            "id1":   r["id1"],
            "id2":   r["id2"],
            "title1": p1.get("title_clean", ""),
            "title2": p2.get("title_clean", ""),
            "prob":  r["agentB_prob"]  # Agent B's confidence score
        })
    # Sort by Agent B probability (high confidence first, or low if asc=True)
    rows = sorted(rows, key=lambda d: d["prob"], reverse=not asc)[:limit]
    return pd.DataFrame(rows)

# Show examples of high-confidence decisions for manual verification
print("\nTop-10 MATCHes by Agent-B probability")
display(fetch_pairs("MATCH"))

print("\nTop-10 NO_MATCHes by Agent-B probability")
display(fetch_pairs("NO_MATCH", asc=True))

Total candidates after union: 156,188
⚡ Evaluating 1091 gold + 500 negs → total 1591


Evaluating: 100%|██████████| 1591/1591 [16:12<00:00,  1.64it/s]



Decision counts → Counter({'MATCH': 1046, 'NO_MATCH': 524, 'NEED_MORE_DATA': 21})

Precision 0.98   |   Recall 0.94

Top-10 MATCHes by Agent-B probability


Unnamed: 0,id1,id2,title1,title2,prob
0,33921,207925431,panasonic black dvd home theater sound system ...,panasonic sc pt660 home theater system,0.95
1,34948,208114675,lg 25 0 cu ft titanium french door bottom free...,lg 25 0 cu ft total capacity,0.95
2,35276,208084021,pioneer kuro 50 black plasma hdtv pdp5020fd,pioneer pdp 5020fd 50 plasma tv,0.95
3,32625,201935116,logitech dinovo media desktop laser keyboard a...,logitech dinovo media desktop laser 967562 0403,0.95
4,36168,208171630,flip video f360 black mino series camcorder f360b,pure digital flip mino digital camcorder f360b,0.95
5,25413,90049795,sony minidv cleaning cassette dvm12cld,sony minidv head cleaner dvm12cld,0.95
6,37183,209656949,apple 32gb black 2nd generation ipod touch mb5...,apple ipod touch 32gb flash portable media pla...,0.95
7,37310,209901966,sony vaio cs series red notebook computer vgnc...,sony vaio cs series vgn cs180j r 14 1 inch not...,0.95
8,33453,207876535,yamaha 7 2 channel black digital home theater ...,yamaha rx v863 home theater receiver rxv863bl,0.95
9,24493,202900030,panasonic 2 line integrated phone system white...,panasonic kx ts3282w corded telephone,0.95



Top-10 NO_MATCHes by Agent-B probability


Unnamed: 0,id1,id2,title1,title2,prob
0,34349,202827684,samsung 37 series 5 lcd black flat panel hdtv ...,sanus visionmount flat panel tv wall mount mf1...,0.0
1,33021,203154818,electrolux harmony series canister vacuum el6985b,harmony el6985a vacuum canister hepa,0.05
2,36931,208370418,speck black toughskin case for iphone 3g iph3g...,3g iphone black toughskin iph3g blk ts,0.05
3,6284,202812567,bose 27028 161 bookshelf pair speakers in whit...,boss 161 speaker,0.05
4,24153,208715855,whirlpool 10 whp1000sq duet washer and dryer w...,pedistal for duet sport electric washer dryer ...,0.05
5,32906,203324965,sirius dock and play universal vehicle kit supv1,directed electronics supv1 car kit,0.05
6,36452,209208655,samsung 52 series 8 lcd black flat panel hdtv ...,ln52a860 52 lcd tv widescreen 1920x1080 hdtv,0.05
7,28340,208456215,whirlpool white front load washer wfw9200swh,whirlpool 27 duet washer horiz axis wp,0.05
8,16329,210521578,whirlpool 24 built in dishwasher du1100ss,whirlpool du1100xtps 24 undercounter dishwashe...,0.05
9,29892,208715736,maytag med5900tw white electric dryer med5900twh,7 0 cu ft super capacity electric dryer med5900tw,0.05


In [12]:
# in your notebook or script, before re‐ingesting:
kb.query("MATCH (n) DETACH DELETE n")
print("Nodes now:", kb.query("MATCH (n) RETURN count(n) AS c")[0]["c"])

Nodes now: 0


In [None]:
# Human-in-the-Loop: Manual labeling of uncertain cases             

# When our AI agents are uncertain (NEED_MORE_DATA), get human expert input
# This creates training data to improve future AI performance

import json, textwrap, random, getpass
from datetime import datetime
from IPython.display import Markdown, display

# Configuration for the human review process
N_REVIEW    = 20    # How many uncertain pairs to review this session
MAX_CHARS   = 450   # Truncate long JSON displays for readability
RANDOM_SEED = 1     # Make the review order reproducible

# Safety checks: ensure we have the required data and functions
assert 'results' in globals(),          "Run batch evaluation first!"
assert callable(get_product_subgraph),  "Get_product_subgraph() missing!"
assert 'kb' in globals(),               "Neo4j connection `kb` missing!"
assert 'df' in globals(),               "Master DataFrame `df` missing!"

# Part A: Collect and prepare uncertain pairs for human review
# Find all pairs where our AI system was uncertain and needs human guidance

# Extract all NEED_MORE_DATA cases from our evaluation results
# Get full context for each pair to help human reviewers make decisions
need_pairs = [
    (r["id1"], r["id2"], get_product_subgraph(r["id1"], r["id2"]))
    for r in results if r["decision"] == "NEED_MORE_DATA"
]

# Check if we have any uncertain cases to review
if not need_pairs:
    print("No NEED_MORE_DATA pairs to review.")
    raise SystemExit

# Randomize order and select subset for this review session
random.Random(RANDOM_SEED).shuffle(need_pairs)
review_set = need_pairs[:N_REVIEW]
print(f"Reviewing {len(review_set)} NEED_MORE_DATA pairs.\n")

labels = []  # Will collect human decisions for storage

# Part B: Interactive human review process
# Present each uncertain pair to human expert for manual classification

for id1, id2, raw in review_set:
    # Display product information for human review
    if raw is not None:
        # Show full Neo4j context (products, brands, neighbors, etc.)
        ctx  = json.loads(raw)
        blob = textwrap.shorten(
            json.dumps(ctx, ensure_ascii=False, indent=2),
            MAX_CHARS, placeholder=" …"
        )
        display(Markdown(f"### {id1} vs {id2}\n```json\n{blob}\n```"))
    else:
        # Fallback: show just the cleaned product titles if no context available
        titleA = df.loc[df.prod_id == id1, "title_clean"].iat[0]
        titleB = df.loc[df.prod_id == id2, "title_clean"].iat[0]
        display(Markdown(f"### {id1} vs {id2}\n```\n{titleA}\nvs\n{titleB}\n```"))

    # Get human decision with input validation
    while True:
        ans = input("[y] MATCH   [n] NO_MATCH   [s] skip → ").strip().lower()
        if ans in {"y", "n", "s"}:
            break
        print("Type y / n / s …")

    # Skip this pair if human chooses to
    if ans == "s":
        continue

    # Collect optional reasoning from human expert
    reason = input("Optional short reason (press Enter to skip): ").strip() or "(n/a)"
    
    # Store the human decision with metadata
    labels.append({
        "id1":    id1,
        "id2":    id2,
        "label":  "MATCH"     if ans == "y" else "NO_MATCH",
        "reason": reason,
        "user":   getpass.getuser(),  # Track who made the decision
        "ts":     datetime.utcnow().isoformat(),  # When it was made
    })
    print("— recorded —\n")

print(f"Collected {len(labels)} human labels.")

# Part C: Store human labels in Neo4j for future use
# Save expert decisions as training data for improving AI performance

if labels:
    # Store human labels as HumanLabel nodes in Neo4j
    kb.query("""
    UNWIND $rows AS row
    MERGE (h:HumanLabel {id1:row.id1, id2:row.id2})
      SET h.label  = row.label,
          h.reason = row.reason,
          h.user   = row.user,
          h.ts     = row.ts
    """, params={"rows": labels})
    print("Labels saved to Neo4j as (:HumanLabel).")
else:
    print("Nothing saved.")

🧐 Reviewing 20 NEED_MORE_DATA pairs.



### 30841 vs 205554287
```
skagen premium steel slimline mesh womens watch 233xsgg
vs
233xsgg skagen
```

### 32059 vs 205131849
```
unreal tournament iii video game for the sony ps3 unrealps3
vs
unreal tournament iii 26991
```

— recorded —



### 35052 vs 210227260
```json
{ "products": [ { "price": 0.0, "name": "Sharp 26' Black LCD HDTV With Built In DVD Player - LC26DV24U", "source": "abt", "title_clean": "sharp 26 black lcd hdtv with built in dvd player lc26dv24u", "brand_norm": "sharp", "prod_id": "35052" }, { "price": 218.63, "name": "Sharp AQUOS BD-HP21U Blu-ray Disc Player", "title_clean": "sharp aquos bd hp21u blu ray disc player", "source": "buy", "brand_norm": "sharp", "prod_id": "210227260" } ], …
```

— recorded —



### 38957 vs 207535265
```
lasonic atsc digital to analog tv converter box lta260
vs
lasonic lta 260 atsc converter box
```

Type y / n / s …
— recorded —



### 34282 vs 204793356
```json
{ "products": [ { "price": 0.0, "name": "Sony Black Soft Carrying Case - LCSX30", "source": "abt", "title_clean": "sony black soft carrying case lcsx30", "brand_norm": "sony", "prod_id": "34282" }, { "price": 0.0, "name": "Sony SRS-BTM30 Wireless Stereo Bluetooth Speaker - SRSBTM30", "title_clean": "sony srs btm30 wireless stereo bluetooth speaker srsbtm30", "source": "buy", "brand_norm": "sony", "prod_id": "204793356" } ], "brands": [ { …
```

— recorded —



### 36084 vs 205664938
```json
{ "products": [ { "price": 18.0, "name": "Canon Deluxe Burgundy Leather Case - 2350B001", "source": "abt", "title_clean": "canon deluxe burgundy leather case 2350b001", "brand_norm": "canon", "prod_id": "36084" }, { "price": 16.97, "name": "Canon PSC-1000 Semi-Hard Leather Case - 2350B001", "title_clean": "canon psc 1000 semi hard leather case 2350b001", "source": "buy", "brand_norm": "canon", "prod_id": "205664938" } ], "brands": [ { "name": …
```

— recorded —



### 30867 vs 205554724
```
seiko quartz le grand sport womens watch sxda04
vs
seiko sxda04
```

### 25036 vs 202502745
```
lego star wars ii the original trilogy video game for the sony psp 023272329396
vs
lego star wars 2 the original trilogy
```

— recorded —



### 38834 vs 209310268
```
nokia t mobile unlocked cellular phone n96
vs
nokia n96 unlocked phone 16gb 5mp camera with carl zeiss optics and dual led flash and auto focus built in gps wifi 002g6q3
```

— recorded —



### 38480 vs 204647332
```
irobot robotic floor washer 74249
vs
irobot roomba scooba robot floor washer
```

— recorded —



### 18441 vs 209002546
```
mosquito magnet defender replacement net mm4000net1
vs
mosquito magnet defender net replacement
```

— recorded —



### 23296 vs 201647442
```
kingdom hearts ii video game for the sony ps2 662248904115
vs
kingdom hearts ii
```

— recorded —



### 36722 vs 209656935
```
applecare protection plan for ipod touch or ipod classic mb591lla
vs
applecare for ipod touch or ipod classic mb591ll a
```

— recorded —



### 38704 vs 209114575
```
vmware fusion 2 for mac vmfm20bx2
vs
vmware fusion 2 vmfm20bx2
```

— recorded —



### 34016 vs 208042911
```
z line portland black tv stand zl2344mu
vs
z line designs zl23 44mu portland flat panel tv stand with integrated mount
```

— recorded —



### 36083 vs 206362623
```json
{ "products": [ { "price": 0.0, "name": "Canon Deluxe Grey Leather Case - 2349B001", "source": "abt", "title_clean": "canon deluxe grey leather case 2349b001", "brand_norm": "canon", "prod_id": "36083" }, { "price": 19.99, "name": "Canon PSC-1000 Semi-Hard Leather Case - 2349B001", "title_clean": "canon psc 1000 semi hard leather case 2349b001", "source": "buy", "brand_norm": "canon", "prod_id": "206362623" } ], "brands": [ { "name": "canon" } …
```

— recorded —



### 24825 vs 207388757
```json
{ "products": [ { "price": 119.0, "name": "Panasonic Plain Paper Fax/Copier With Cordless Phone Answering System - Grey Finish - KXFG2451", "source": "abt", "title_clean": "panasonic plain paper fax copier with cordless phone answering system grey finish kxfg2451", "brand_norm": "panasonic", "prod_id": "24825" }, { "price": 845.48, "name": "Panasonic TH-42PZ80U - 42' Widescreen 1080p Plasma HDTV - 1,000,000:1 Dynamic Contrast Ratio", "source": …
```

— recorded —



### 24648 vs 203491687
```
waring professional cool touch deep fryer black stainless steel finish df100
vs
waring pro deep fryer 3qt black
```

— recorded —



### 37421 vs 209896693
```
at t aliph jawbone ii silver bluetooth headset jawbone2s
vs
aliph jawbone 2 silver bluetooth headset retail 84442vrp
```

— recorded —



### 31032 vs 210251970
```
chestnut hill sound george ipod music system in white chs4001
vs
chestnut hill chs40001 chestnut hill george ipod dock compact stereo system
```

— recorded —

✅ Collected 18 human labels.
📌 Labels saved to Neo4j as (:HumanLabel).


In [None]:
# Part D: Update Resolution decisions with human labels
# Override AI decisions with human expert judgments for better accuracy

# Retrieve all human labels from Neo4j
hl_rows = kb.query("MATCH (h:HumanLabel) RETURN h.id1 AS id1, h.id2 AS id2, h.label AS label")

# Update existing Resolution nodes to use human decisions instead of AI decisions
kb.query("""
UNWIND $rows AS row
MATCH (r:Resolution {id1:row.id1, id2:row.id2})
SET r.decision = row.label
""", params={ "rows": hl_rows })

[]

In [None]:
# Part E: Show updated decision distribution after human input
# Display how human labels changed the overall decision statistics

res_counts = kb.query("""
MATCH (r:Resolution)
OPTIONAL MATCH (h:HumanLabel)
  WHERE (h.id1 = r.id1 AND h.id2 = r.id2)
     OR (h.id1 = r.id2 AND h.id2 = r.id1)
WITH coalesce(h.label, r.decision) AS finalDecision, count(*) AS cnt
RETURN finalDecision AS decision, cnt
""")

# Convert to dictionary and display the updated distribution
counts = {rec["decision"]: rec["cnt"] for rec in res_counts}
print("Updated decision counts →", counts)

Updated decision counts → {'NO_MATCH': 521, 'MATCH': 1065, 'NEED_MORE_DATA': 6}


In [None]:
# Export Results: Create comprehensive reports for analysis          
# Export resolution data with product names for easy review and presentation
import pandas as pd

# Part A: Basic resolution export with product names
# Pull all resolution data with readable product names for manual inspection

rows = kb.query("""
MATCH (r:Resolution)
MATCH (p1:Product {prod_id: r.id1})
MATCH (p2:Product {prod_id: r.id2})
RETURN
  r.id1        AS id1,
  p1.name      AS name1,
  r.id2        AS id2,
  p2.name      AS name2,
  r.decision   AS decision,
  r.agentA     AS agentA,
  r.reason     AS reason,
  r.prob       AS prob,
  r.ts         AS ts
""")

# Convert to DataFrame for easy manipulation and viewing
df_res = pd.DataFrame(rows)
display(df_res.head(10))

# Export to CSV for external analysis (Excel, presentations, etc.)
csv_path = "stress_resolutions_with_names.csv"
df_res.to_csv(csv_path, index=False)
print(f"✅  Saved full table with names to {csv_path}")

Unnamed: 0,id1,name1,id2,name2,decision,agentA,reason,prob,ts
0,10333368,Kensington Orbit Optical Trackball - USB w/PS2...,34309,Sony Bravia Wireless Home Theater System In Bl...,NO_MATCH,NO,brands differ,0.05,2025-06-11T15:17:58.528076
1,203341506,Olympus Slim Leather Case - 202087,25732,Olympus Premium Slim Leather Case In Black - 2...,MATCH,YES,identical brand and very similar titles,0.95,2025-06-11T15:18:48.265649
2,16329,Whirlpool 24' Built-In Dishwasher - DU1100SS,210521578,Whirlpool DU1100XTPS 24' Undercounter Dishwash...,MATCH,UNSURE,similar titles but price gap > 30%,0.85,2025-06-11T15:18:48.504967
3,209114552,Canon PowerShot A1000 IS Digital Camera - Gray...,37123,Canon PowerShot A1000 IS Gray Digital Camera -...,MATCH,YES,identical brand and very similar titles,0.95,2025-06-11T15:18:48.534120
4,202973389,Techcraft Veneto Series ABS32 TV Stand,25813,Tech Craft Avalon Series TV Stand - Black Fini...,MATCH,UNSURE,"different brands, similar titles, price gap ex...",0.85,2025-06-11T15:18:48.713573
5,205753566,Speck Products ToughSkin for iPod classic - IC...,31392,Speck Black ToughSkin iPod Classic Case - ICBLKTS,MATCH,YES,identical brand and very similar titles,0.95,2025-06-11T15:18:50.349899
6,205664913,GE 24775 Amplified Quantum HDTV Antenna,32416,GE Amplified Quantum Antenna - TV24775,MATCH,YES,identical brand and very similar titles,0.95,2025-06-11T15:18:50.442213
7,206808434,Canon VIXIA HF10 High Definition Digital Camco...,33280,Canon Vixia High Definition Camcorder - HF10,MATCH,YES,identical title+brand,0.95,2025-06-11T15:18:50.460754
8,23350,Sony PlayStation 2 8MB Memory Card (2 Pack) - ...,50012447,SONY 7-11719-70670-0 PS2 8 MB Memory Card 2-pk...,MATCH,YES,identical title+brand,0.95,2025-06-11T15:18:50.479788
9,202486964,GE Millennium TV Antenna - 24734,21182,GE Platinum HDTV Millennium TV Antenna - TV24734,MATCH,YES,identical brand and very similar titles,0.95,2025-06-11T15:18:52.122651


✅  Saved full table with names to stress_resolutions_with_names.csv


In [None]:
# Part B: Comprehensive export with human overrides and reasoning
# Create a detailed report that prioritizes human decisions over AI decisions

rows = kb.query("""
MATCH (r:Resolution)
OPTIONAL MATCH (h:HumanLabel)
  WHERE (h.id1 = r.id1 AND h.id2 = r.id2)
     OR (h.id1 = r.id2 AND h.id2 = r.id1)
WITH r, h,
     coalesce(h.label, r.decision) AS finalDecision,
     coalesce(h.reason, r.reason) AS finalReason

// Get readable product names for both products in each pair
MATCH (a:Product {prod_id: r.id1})
MATCH (b:Product {prod_id: r.id2})

RETURN
  r.id1           AS ProductA_id,
  a.name          AS ProductA_name,
  r.id2           AS ProductB_id,
  b.name          AS ProductB_name,
  r.agentA        AS AgentA_lbl,
  r.prob          AS AgentB_prob,
  finalDecision   AS FinalDecision,
  finalReason     AS Reason
""")

import pandas as pd

# Create final results DataFrame with human-prioritized decisions
df = pd.DataFrame(rows)
df.to_csv("stress_resolutions_with_reasons.csv", index=False)
print(f"✅ Wrote {len(df):,} rows to resolutions_with_reasons.csv")

✅ Wrote 1,592 rows to resolutions_with_reasons.csv
