# AML Entity Resolution System  End-to-End Demo

This notebook can run the full project end-to-end.

Colab mode: you can upload **only this notebook** and it will download the full repo ZIP from Google Drive.

Pipeline:
1) Download repo (Colab only)
2) Install dependencies
3) Generate demo data + SQLite DB
4) Preprocess + build keys/LSH
5) Train model
6) Offline matching + evaluation
7) Start FastAPI + call `/resolve`


## 0) Setup

If `src/` and `requirements.txt` are not present in the current working directory, the next cell (on Colab) will download and unpack the repo from Google Drive.


In [None]:
from __future__ import annotations

import os
import shutil
import subprocess
import sys
from pathlib import Path
from typing import List, Optional


def running_in_colab() -> bool:
    try:
        import google.colab  # type: ignore
        return True
    except Exception:
        return False


def run_and_show(cmd: List[str], title: Optional[str] = None) -> None:
    if title:
        print("\n" + "=" * 80)
        print(title)
        print("$", " ".join(cmd))
        print("=" * 80)
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.stdout:
        print(result.stdout)
    if result.stderr:
        print("[stderr]\n" + result.stderr)
    result.check_returncode()


def find_repo_root(base: Path) -> Optional[Path]:
    for api_file in base.glob("**/src/api.py"):
        return api_file.parents[1]
    return None


def download_drive_zip(file_id: str, dest: Path) -> None:
    dest.parent.mkdir(parents=True, exist_ok=True)
    run_and_show([sys.executable, "-m", "pip", "install", "-q", "gdown"], title="Install gdown")
    import gdown  # type: ignore
    url = f"https://drive.google.com/uc?id={file_id}"
    print("Downloading:", url)
    gdown.download(url, str(dest), quiet=False, fuzzy=True)
    if not dest.exists() or dest.stat().st_size == 0:
        raise RuntimeError(
            "Download failed or produced an empty file. "
            "Check Drive sharing: 'Anyone with the link' and downloads allowed."
        )


DRIVE_FILE_ID = "12ZzXx72imAvG5wTrzwbJwDYji3Eyvzf-"  
ZIP_PATH = Path("/content/silent-eight-assignment.zip")


if running_in_colab():
    content_root = Path("/content")
    repo_root = find_repo_root(Path.cwd()) or find_repo_root(content_root)
    if repo_root is None:
        download_drive_zip(DRIVE_FILE_ID, ZIP_PATH)
        print("Unpacking ZIP to /content ...")
        shutil.unpack_archive(str(ZIP_PATH), str(content_root))
        repo_root = find_repo_root(content_root)
        if repo_root is None:
            raise RuntimeError("Could not locate repo root under /content after unpack.")
    os.chdir(repo_root)
else:
    cwd = Path.cwd()
    if cwd.name == "notebooks":
        os.chdir(cwd.parent)


project_root = Path.cwd()
print("Project root:", project_root)
print("Python:", sys.executable)
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))


## 1) Install dependencies

On Colab this installs `requirements.txt` from the downloaded repo.


In [None]:
if running_in_colab():
    run_and_show([
        sys.executable, "-m", "pip", "install", "-r", "requirements.txt"
    ], title="Install dependencies from requirements.txt")
else:
    print("Skipping install (not running on Colab).")


## 2) Generate demo data + SQLite database

This writes outputs under `data/`.


In [None]:
run_and_show([sys.executable, "-m", "src.generation"], title="Generate demo data + SQLite DB")

import pandas as pd

messy_csv = project_root / "data" / "messy_data.csv"
if messy_csv.exists():
    messy_df = pd.read_csv(messy_csv)
    print("\nGenerated data preview (data/messy_data.csv):")
    display(messy_df.head(10))
else:
    print("No data/messy_data.csv found")


## 3) Preprocess + build LSH artifacts

Creates normalized columns, blocking keys, DB indexes, and (optionally) LSH artifacts under `models/`.


In [None]:
run_and_show([sys.executable, "-m", "src.preprocessing"], title="Preprocess + build blocking keys/LSH")

import sqlite3
import pandas as pd

db_path = project_root / "data" / "clients.db"
if db_path.exists():
    with sqlite3.connect(db_path) as conn:
        try:
            df = pd.read_sql_query("SELECT * FROM clients_processed LIMIT 5", conn)
            print("\nProcessed table preview (clients_processed):")
            display(df)
        except Exception as e:
            print("Could not read clients_processed preview:", e)
else:
    print("No data/clients.db found after preprocessing.")


## 4) Train the model

Trains the classifier and writes the model artifact under `models/`.


In [None]:
run_and_show([sys.executable, "-m", "src.train_model"], title="Train model")

model_path = project_root / "models" / "entity_resolution_model.pkl"
print("\nModel exists:", model_path.exists(), "->", model_path)


## 5) Offline matching + evaluation

Runs candidate generation + scoring + clustering and outputs evaluation CSVs under `data/`.


In [None]:
run_and_show([sys.executable, "-m", "src.matching"], title="Offline matching + evaluation")

import pandas as pd

data_dir = project_root / "data"
csvs = sorted(data_dir.glob("*.csv")) if data_dir.exists() else []
print("\nData CSV outputs:")
for p in csvs:
    print("-", p.name)

preview = data_dir / "manual_review_cases.csv"
if preview.exists():
    df = pd.read_csv(preview)
    print("\nPreview of manual_review_cases.csv:")
    display(df.head(5))


## 6) Start the FastAPI service

In [None]:
import time

db_path = project_root / "data" / "clients.db"
model_path = project_root / "models" / "entity_resolution_model.pkl"
lsh_index_path = project_root / "models" / "lsh_index.pkl"

env = os.environ.copy()
env["ER_DB_PATH"] = str(db_path)
env["ER_MODEL_PATH"] = str(model_path)
env["ER_LSH_INDEX_PATH"] = str(lsh_index_path)

host = "0.0.0.0" if running_in_colab() else "127.0.0.1"
proc = subprocess.Popen(
    [sys.executable, "-m", "uvicorn", "src.api:app", "--host", host, "--port", "8000"],
    env=env,
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    text=True,
 )

time.sleep(1.5)
print("Server started (pid):", proc.pid)

if running_in_colab():
    from google.colab import output  # type: ignore
    output.serve_kernel_port_as_iframe(8000, path="docs")
else:
    print("Docs:    http://127.0.0.1:8000/docs")
    print("Health:  http://127.0.0.1:8000/health")
    print("Metrics: http://127.0.0.1:8000/metrics")


## 7) Call the API (4 illustrative examples)

This section pulls a couple of real rows from the generated SQLite DB and then calls `/resolve` with:
- **Likely match**: consistent name + national ID from the same entity.
- **Harder case**: some fields omitted, different ID, different spelling.
- **Conflict (review)**: **national ID from one entity** combined with a **strong name match for a different entity**.
- **No match**: new entity, but happens to have the same name

In [None]:
import requests

base_url = "http://127.0.0.1:8000"

print("GET /health")
print(requests.get(f"{base_url}/health", timeout=10).json())

In [None]:
print("POST /resolve (easy case)")
payload = {
    "first_name": messy_df.iloc[0]["first_name"],
    "last_name": messy_df.iloc[0]["last_name"],
    "dob": messy_df.iloc[0]["dob"],
    "national_id": messy_df.iloc[0]["national_id"],
    "email": messy_df.iloc[0]["email"],
    "address": messy_df.iloc[0]["address"],
    "phone_number": messy_df.iloc[0]["phone_number"],
}
resp = requests.post(f"{base_url}/resolve", json=payload, timeout=30)
resp.raise_for_status()
result = resp.json()
print("Status:", result.get("status"))
print("Candidates checked:", result.get("candidates_checked"))
print("Processing time (ms):", result.get("processing_time_ms"))
print("Best match:", result.get("best_match"))

In [None]:
print("\nPOST /resolve (harder case, some fields omitted, different ID, different spelling)")

payload = {
    "first_name": "Alex", # different spelling
    "last_name": messy_df.iloc[0]["last_name"],
    "dob": None,
    "national_id": "032415893581", # different random ID
    "email": messy_df.iloc[0]["email"],
    "phone_number": messy_df.iloc[0]["phone_number"],
}

print("POST /resolve")
resp = requests.post(f"{base_url}/resolve", json=payload, timeout=30)
resp.raise_for_status()
result = resp.json()
print("Status:", result.get("status"))
print("Candidates checked:", result.get("candidates_checked"))
print("Processing time (ms):", result.get("processing_time_ms"))
print("Best match:", result.get("best_match"))

In [None]:
print("\nPOST /resolve (harder case, Name match to one entity, ID match to a different entity)")

# everything belonging to one entity except national id
payload = {
    "first_name": messy_df.iloc[0]['first_name'], #
    "last_name": messy_df.iloc[0]["last_name"],
    "dob": messy_df.iloc[0]['dob'],
    "national_id": messy_df.iloc[100]['national_id'],
    "email": messy_df.iloc[0]["email"],
    "phone_number": messy_df.iloc[0]["phone_number"],
}

print("POST /resolve")
resp = requests.post(f"{base_url}/resolve", json=payload, timeout=30)
resp.raise_for_status()
result = resp.json()
print("Status:", result.get("status"))
print("Candidates checked:", result.get("candidates_checked"))
print("Processing time (ms):", result.get("processing_time_ms"))
print("Best match:", result.get("best_match"))

In [None]:
print("\nPOST /resolve (harder case, new person that happens to have the same name)")

# everything belonging to one entity except national id
payload = {
    "first_name": messy_df.iloc[0]['first_name'], #
    "last_name": messy_df.iloc[0]["last_name"],
    "dob": "11-12-1999",
    "national_id": "02349872645",
    "email": "email.email@gmail.com",
    "phone_number": "111 111 111",
}

print("POST /resolve")
resp = requests.post(f"{base_url}/resolve", json=payload, timeout=30)
resp.raise_for_status()
result = resp.json()
print("Status:", result.get("status"))
print("Candidates checked:", result.get("candidates_checked"))
print("Processing time (ms):", result.get("processing_time_ms"))
print("Best match:", result.get("best_match"))

## 8) Stop the server

Run when you're done.


In [None]:
if "proc" in globals() and proc and proc.poll() is None:
    proc.terminate()
    try:
        proc.wait(timeout=5)
    except Exception:
        proc.kill()
    print("Server stopped.")
else:
    print("Server is not running.")
