In [86]:
"""
STEP 1  – Ingestion & Baseline Validation
----------------------------------------
Loads an Excel/CSV file into a DataFrame, confirms the schema,
and applies first‑pass dtype + null handling rules.
"""

import pandas as pd                          # pandas gives us DataFrame + Excel utilities
from pathlib import Path                     # convenient cross‑platform path handling


def load_contacts(file_path: str | Path) -> pd.DataFrame:
    """Read a contacts file and return a validated, typed DataFrame."""
    
    file_path = Path(file_path)                                                  # ensure we have a Path object
    if not file_path.exists():                                                   # guard against bad path
        raise FileNotFoundError(f"File not found: {file_path}")
    
    # --- 1.1  Define the required schema -------------------------------------
    required_cols = [                                                            # columns the downstream logic expects
        "Account Name", "Full Name", "Email", "Contact Id",
        "Admin Role", "Primary Contact", "Active Contact",
        "ConnectLink Status", "Connect Link Email",
        "# of cases", "# of opps",
        "Last Activity", "Created Date"
    ]
    
    # (optional) explicit dtype map for critical fields
    dtype_map = {                                                                # enforce string types where ID semantics matter
        "Account Name":  "string",
        "Full Name":     "string",
        "Email":         "string",
        "Contact Id":    "string",
        "Admin Role":    "string",
        "Primary Contact": "boolean",                                            # stored as Pandas BooleanDtype (allows NA)
        "Active Contact": "string",
        "ConnectLink Status": "string",
        "Connect Link Email": "string",
        "# of cases":   "Int64",                                                 # nullable integer
        "# of opps":    "Int64"
    }
    
    date_cols = ["Last Activity", "Created Date"]                                # columns that should parse as dates
    
    # --- 1.2  Load the file ---------------------------------------------------
    df = pd.read_excel(                                                          # read the Excel file into a DataFrame
        file_path,
        dtype=dtype_map,                                                         # apply dtypes where safe
        parse_dates=date_cols,                                                   # let pandas parse these as datetime
        engine="openpyxl"                                                        # use openpyxl backend (already installed)
    )
    
    # --- 1.3  Schema validation ----------------------------------------------
    missing = [c for c in required_cols if c not in df.columns]                 # find any required columns that are absent
    if missing:                                                                 # if list not empty, we cannot proceed
        raise ValueError(f"Missing required columns: {missing}")
    
    # Re‑order columns to a canonical order (optional but nice‑to‑have)
    df = df[required_cols]                                                      # slices DataFrame to the exact order
    
    # --- 1.4  Null / blank handling ------------------------------------------
    df = df.fillna({c: "" for c in df.select_dtypes("string").columns})         # replace NaN in string columns with empty string
    df["Primary Contact"] = df["Primary Contact"].fillna(False)                 # treat missing boolean as False
    # numeric & date columns keep their NA state for now (handled later)
    
    # --- 1.5  Trim whitespace + lower‑case key text fields --------------------
    text_cols = ["Account Name", "Full Name", "Email", "Connect Link Email"]
    for col in text_cols:
        df[col] = (df[col]                                                      # chain string methods safely
                     .str.strip()                                               # remove leading/trailing spaces
                     .str.replace(r"\s+", " ", regex=True)                      # collapse internal multiple spaces
                 )
    
    return df                                                                   # DataFrame is now ready for step 2


In [87]:
"""
STEP 3 – Hierarchy Weighting
----------------------------
Given a DataFrame that already passed STEP 1 (ingestion / validation),
calculate a numeric `rank_score` for every row based on the business
rules you outlined.  Higher `rank_score` ⇒ record is preferred to keep.

Return the same DataFrame with extra helper columns that downstream
logic (duplicate clustering / merge) can use.

Assumes columns are clean, trimmed, and dtype‑safe.
"""

import pandas as pd
from pathlib import Path
from typing import Optional

# -------------------------------------------------------------
# Helper:  map free‑text strings to ordinal weights             |
# -------------------------------------------------------------
ADMIN_ROLE_MAP    = {"owner": 3, "admin": 2}                   # anything else → 0
CONNECT_STATUS_MAP = {"A": 2, "I": 1, "U": 0}                  # blank / other → −1
OPPS_CAP          = 99                                         # avoid giant scores

def add_hierarchy_weights(df: pd.DataFrame,
                          today: Optional[pd.Timestamp] = None) -> pd.DataFrame:
    """
    Attach weight columns + aggregate rank_score to *df* and return it.

    Parameters
    ----------
    df : pd.DataFrame
        Cleaned contacts table from STEP 1.
    today : pd.Timestamp, optional
        Override 'now' for deterministic testing; default = current date.

    Returns
    -------
    pd.DataFrame
        Same DataFrame with new columns:
            w_admin, w_primary, w_active, w_connect,
            w_opps, w_last_activity, w_created, rank_score
    """
    # ---------------------------------------------------------
    # 0)  Establish 'today' for date arithmetic                |
    # ---------------------------------------------------------
    if today is None:
        today = pd.Timestamp.today().normalize()               # midnight today

    # ---------------------------------------------------------
    # 1)  Normalise text fields for case/whitespace            |
    # ---------------------------------------------------------
    # lower‑case Admin Role so 'Owner', 'OWNER', etc. match
    df['Admin Role_std']         = df['Admin Role'].str.lower().str.strip()

    # lower‑case Active Contact to catch 'active', 'Active', etc.
    df['Active Contact_std']     = df['Active Contact'].str.lower().str.strip()

    # upper‑case ConnectLink Status so A/I/U are consistent
    df['ConnectLink Status_std'] = df['ConnectLink Status'].str.upper().str.strip()


    # ---------------------------------------------------------
    # 2)  Compute individual weight columns                    |
    # ---------------------------------------------------------
    # -- 2.1 Admin / Owner weight -----------------------------
    df['w_admin'] = (
        df['Admin Role_std']
          .map(ADMIN_ROLE_MAP)                                  # owner→3, admin→2
          .fillna(0)                                            # everything else→0
          .astype(int)
    )

    # -- 2.2 Primary Contact (Boolean) ------------------------
    # pandas BooleanDtype stores True/False/NA; treat NA as False (0)
    df['w_primary'] = df['Primary Contact'].fillna(False).astype(int)

    # -- 2.3 Active Contact -----------------------------------
    df['w_active'] = (df['Active Contact_std'] == 'active').astype(int)

    # -- 2.4 ConnectLink Status -------------------------------
    df['w_connect'] = (
        df['ConnectLink Status_std']
          .map(CONNECT_STATUS_MAP)                              # A→2, I→1, U→0
          .fillna(-1)                                           # blank / unknown→‑1
          .astype(int)
    )

    # -- 2.5 # of Opportunities -------------------------------
    df['w_opps'] = (
        df['# of opps']
          .fillna(0)                                            # NA → 0
          .clip(upper=OPPS_CAP)                                 # cap outliers
          .astype(int)
    )

    # -- 2.6 Last Activity (more recent = better) -------------
    df['days_since_last'] = (today - df['Last Activity']).dt.days
    df['w_last_activity'] = (
        -df['days_since_last']                                  # negate so recent → positive
          .fillna(-9999)                                        # missing date gets big negative weight
          .astype(int)
    )

    # -- 2.7 Created Date (older = better) --------------------
    df['days_since_created'] = (today - df['Created Date']).dt.days
    df['w_created'] = (
        df['days_since_created']
          .fillna(0)                                            # missing date → 0 (neutral)
          .astype(int)
    )

    # ---------------------------------------------------------
    # 3)  Aggregate into a single rank_score                   |
    # ---------------------------------------------------------
    weight_cols = [
        'w_admin', 'w_primary', 'w_active', 'w_connect',
        'w_opps', 'w_last_activity', 'w_created'
    ]

    df['rank_score'] = df[weight_cols].sum(axis=1)              # simple additive model

    # Optional: drop helper std/Δ columns to keep DataFrame tidy
    df.drop(columns=[
        'Admin Role_std', 'Active Contact_std',
        'ConnectLink Status_std', 'days_since_last', 'days_since_created'
    ], inplace=True)

    return df


In [88]:
df = load_contacts("../data/duplicate_contacts_small.xlsx")  # STEP 1 from earlier
df = add_hierarchy_weights(df)                               # STEP 3 here
df.head()[['Account Name', 'Full Name', 'rank_score']]       # quick sanity check


Unnamed: 0,Account Name,Full Name,rank_score
0,Westdale School,Randall Knox,4077
1,Westdale School,Randall Knox,908
2,Westdale School,Randall Knox,3849
3,Westdale School,Randall Knox,5718
4,Westdale School,Diana Reed,6151


In [90]:
"""
STEP 3b – Generate hierarchy‑comparison tag
-------------------------------------------
Creates a compact, lexicographically sortable tag that encodes every gate
in the business ladder.  Any record whose Admin Role is *Owner* or *Admin*
is flagged as `is_privileged=True` and given a sentinel tag; these rows are
removed from rank competitions and always kept.

Assumes: DataFrame columns already cleaned (trimmed / lower‑ or upper‑cased
where relevant) and date columns parsed as datetime64[ns] (see STEP 1).

Usage
-----
df = load_contacts(...)              # STEP 1
df = add_comparison_tag(df)          # this step
"""
from __future__ import annotations
import pandas as pd
from datetime import datetime

# -------------------------------------------------------------
# Helper – map ConnectLink → tier (string for easy concat)     |
# -------------------------------------------------------------
CONNECT_TIER = {
    "A": "3",        # Best
    "I": "2",        # Middle  (ties U initially)
    "U": "2",        # Middle  – may be demoted later
    "":  "1"         # Blank / unknown
}

def add_comparison_tag(df: pd.DataFrame,
                       today: datetime | None = None) -> pd.DataFrame:
    """
    Return *df* with two new columns:
        • is_privileged  (bool)  – Owner/Admin rows
        • hier_tag       (str)   – hierarchy comparison key
    """
    df = df.copy()                                               # avoid mutating caller’s frame
    
    if today is None:                                            # allow deterministic unit tests
        today = pd.Timestamp.today().normalize()
    
    # ---------------------------------------------------------
    # 1) Privileged flag  (Owner OR Admin)                     |
    # ---------------------------------------------------------
    df["Admin Role_std"] = (
        df["Admin Role"].astype(str).str.lower().str.strip()
    )
    df["is_privileged"] = df["Admin Role_std"].isin({"owner", "admin"})
    
    # For privileged rows we’ll later assign tag 'PRIV'; for others we build full tag.
    
    # ---------------------------------------------------------
    # 2) Primary / Active bits                                |
    # ---------------------------------------------------------
    df["primary_bit"] = df["Primary Contact"].fillna(False).astype(int)         # 1/0
    df["active_bit"]  = (df["Active Contact"]
                           .astype(str)
                           .str.lower()
                           .str.strip()
                           .eq("active")
                           .astype(int))                                        # 1 if 'active'
    
    # ---------------------------------------------------------
    # 3) ConnectLink tier                                     |
    # ---------------------------------------------------------
    df["connect_raw"] = df["ConnectLink Status"].astype(str).str.upper().str.strip()
    df["connect_tier"] = df["connect_raw"].map(CONNECT_TIER).fillna("1")        # default blank tier
    
    # ---------------------------------------------------------
    # 4) Opportunities bucket                                 |
    # ---------------------------------------------------------
    opps = df["# of opps"].fillna(0).astype(int)
    df["opps_bucket"] = pd.cut(
        opps,
        bins=[-1, 0, 3, float("inf")],            # (-1,0]  (0,3]  (3,∞)
        labels=["Z", "L", "H"]                    # Zero, Low, High(≥4)
    ).astype(str)
    
    # ---------------------------------------------------------
    # 5) Last‑activity tier                                   |
    # ---------------------------------------------------------
    days_since_last = (today - df["Last Activity"]).dt.days
    df["activity_tier"] = pd.cut(
        days_since_last,
        bins=[-float("inf"), 365, 912, float("inf")],    # recent, mid, stale
        labels=["1", "2", "3"]                          # smaller = better
    ).astype(str)
    df["activity_tier"] = df["activity_tier"].where(~days_since_last.isna(), "4")  # '4' for NaT
    
    # ---------------------------------------------------------
    # 6) Conditional demotion for Connect = 'U'               |
    #     • If U has opps_bucket == 'Z' AND activity_tier in  |
    #       {'3','4'}, downgrade its connect_tier to '1'.     |
    # ---------------------------------------------------------
    mask_demote_u = (
        (df["connect_raw"] == "U") &
        (df["opps_bucket"] == "Z") &
        (df["activity_tier"].isin({"3", "4"}))
    )
    df.loc[mask_demote_u, "connect_tier"] = "1"                     # now ties blank
    
    # ---------------------------------------------------------
    # 7) Email‑presence bit  (only breaks ties later but       |
    #    encode now for consistency)                           |
    # ---------------------------------------------------------
    df["email_bit"] = (
        df["Email"]
          .astype(str)
          .str.strip()
          .ne("")                # non‑empty string?
          .astype(int)
    )
    
    # ---------------------------------------------------------
    # 8) Created‑date rank  (older = bigger number)            |
    # ---------------------------------------------------------
    days_since_created = (today - df["Created Date"]).dt.days.astype("Int64")
    # cap to 5 digits; pad to fixed width so lexicographic sort works
    df["created_rank"] = (
        days_since_created
          .fillna(0)
          .clip(lower=0, upper=99999)
          .astype(int)
          .astype(str)
          .str.zfill(5)          # e.g., 00234  > 00007
    )
    
    # ---------------------------------------------------------
    # 9) Compose hierarchy tag                                |
    #    Format: role|primary|active|connect|opps|activity|email|created |
    #    For 'Privileged' rows we set tag = 'PRIV' to bubble   |
    #    them out of comparisons (they are auto‑kept).         |
    # ---------------------------------------------------------
    df["hier_tag"] = (
        df["primary_bit"].astype(str)   + "|" +
        df["active_bit"].astype(str)    + "|" +
        df["connect_tier"]              + "|" +
        df["opps_bucket"]               + "|" +
        df["activity_tier"]             + "|" +
        df["email_bit"].astype(str)     + "|" +
        df["created_rank"]
    )
    
    # overwrite with sentinel for Owner/Admin rows
    df.loc[df["is_privileged"], "hier_tag"] = "PRIV"
    
    # Tidy up helper columns if desired
    df.drop(columns=[
        "Admin Role_std", "primary_bit", "active_bit", "connect_raw",
        "connect_tier", "opps_bucket", "activity_tier", "email_bit",
        "created_rank"
    ], inplace=True)
    
    return df


In [91]:
"""
STEP 4 – Duplicate‑Candidate Generation
--------------------------------------
Goal:  within each Account Name group, discover records that likely
       refer to the same person by layering fast “blocking” keys
       (exact matches) and slower fuzzy comparisons (≈ matches).

Outputs
-------
• Adds `dupe_cluster_id`  — ID shared by rows judged to be duplicates
• Keeps privileged Owner/Admin rows in scope (they can still collide)
• No row is dropped; downstream logic decides which one to keep.

Dependencies
------------
rapidfuzz  (pip install rapidfuzz)
pandas
"""
import pandas as pd
from rapidfuzz import fuzz, distance
from itertools import combinations

def add_duplicate_cluster_ids(df_in: pd.DataFrame,
                              name_sim_threshold: int = 95,
                              email_edit_distance: int = 1) -> pd.DataFrame:
    """
    Tighter duplicate‑candidate generation:
      • Domain‑anchored email fuzzy matching
      • Stricter name matching (token_sort_ratio ≥ 95, length diff ≤ 2)
    """
    df = _prep_normalised_fields(df_in)   # adds email_norm, name_norm, sfi_key
    uf = UnionFind()

    for acct, grp in df.groupby("Account Name"):
        idxs = grp.index.tolist()

        # 1) exact‑email blocking (unchanged)
        for _, eb in grp.groupby("email_norm"):
            ids = list(eb.index)
            for i in ids[1:]:
                uf.union(ids[0], i)

        # 2) surname‑first‑initial blocking (unchanged)
        for _, sb in grp.groupby("sfi_key"):
            ids = list(sb.index)
            for i in ids[1:]:
                uf.union(ids[0], i)

        # 3) domain‑anchored near‑email blocking
        #    only union if local-parts are within distance AND same domain
        for domain, dg in grp.groupby(grp["email_norm"].str.split("@").str[1].fillna("")):
            ids = list(dg.index)
            # compare every pair within this domain
            for i, j in combinations(ids, 2):
                local_i = df.at[i, "email_norm"].split("@")[0]
                local_j = df.at[j, "email_norm"].split("@")[0]
                if distance.Levenshtein.distance(local_i, local_j) <= email_edit_distance:
                    uf.union(i, j)

        # 4) targeted fuzzy checks for any remaining pairs
        for i, j in combinations(idxs, 2):
            if uf.find(i) == uf.find(j):
                continue

            e1, e2 = df.at[i, "email_norm"], df.at[j, "email_norm"]
            # – only fuzzy‑merge on email if domains match
            dom1 = e1.split("@")[1] if "@" in e1 else ""
            dom2 = e2.split("@")[1] if "@" in e2 else ""
            if dom1 == dom2:
                local1, local2 = e1.split("@")[0], e2.split("@")[0]
                if distance.Levenshtein.distance(local1, local2) <= email_edit_distance:
                    uf.union(i, j)
                    continue

            # – stricter name matching: order‑aware + length guard
            n1, n2 = df.at[i, "name_norm"], df.at[j, "name_norm"]
            if (abs(len(n1) - len(n2)) <= 2 and
                fuzz.token_sort_ratio(n1, n2) >= name_sim_threshold):
                uf.union(i, j)

    # 5) cluster ID assignment (unchanged)
    root_to_cluster = {}
    counter = 1
    clusters = []
    for idx in df.index:
        r = uf.find(idx)
        if r not in root_to_cluster:
            root_to_cluster[r] = f"C{counter:05d}"
            counter += 1
        clusters.append(root_to_cluster[r])
    df["dupe_cluster_id"] = clusters

    # clean up
    df.drop(columns=["email_norm", "name_norm", "sfi_key"], inplace=True)
    return df


In [92]:
df = load_contacts("../data/duplicate_contacts_small.xlsx")   # Step 1
df = add_comparison_tag(df)                                # Step 3b
df = add_duplicate_cluster_ids(df)                         # Step 4   ← here


In [93]:
"""
STEP 5 – Canonical selection & merge mapping
-------------------------------------------
Annotates the DataFrame with canonical flags, merge targets, and
resolution status, following the decision ladder described above.
"""

import pandas as pd

def assign_canonical_records(df_in: pd.DataFrame) -> pd.DataFrame:
    """Return a copy of df_in with canonical/merge annotations (fixed version)."""
    
    df = df_in.copy()
    df["is_canonical"]          = False
    df["canonical_contact_id"]  = None
    df["resolution_status"]     = None
    
    # helper: pick earliest Created Date; if tie, lowest DataFrame index
    def _pick_primary(rows: pd.DataFrame):
        # Sort by Created Date, then by actual index (implicit tie‑breaker)
        return rows.sort_values("Created Date").iloc[0]
    
    # ------------------------------------------------------------------
    for cid, sub_idx in df.groupby("dupe_cluster_id").groups.items():
        sub = df.loc[sub_idx]
        n_rows = len(sub)
        
        # — 0) singleton
        if n_rows == 1:
            idx = sub.index[0]
            df.at[idx, "is_canonical"]         = True
            df.at[idx, "canonical_contact_id"] = df.at[idx, "Contact Id"]
            df.at[idx, "resolution_status"]    = "single_record"
            continue
        
        # — 1) privileged siphon
        priv_rows = sub[sub["is_privileged"]]
        non_priv  = sub[~sub["is_privileged"]]
        
        if not priv_rows.empty:
            primary_priv = _pick_primary(priv_rows)
            prim_id      = primary_priv["Contact Id"]
            
            # mark privileged rows
            df.loc[priv_rows.index, "is_canonical"]         = True
            df.loc[priv_rows.index, "canonical_contact_id"] = priv_rows["Contact Id"]
            df.loc[priv_rows.index, "resolution_status"]    = "keep_privileged"
            
            # merge others into the primary privileged row
            df.loc[non_priv.index, "is_canonical"]         = False
            df.loc[non_priv.index, "canonical_contact_id"] = prim_id
            df.loc[non_priv.index, "resolution_status"]    = "merge_into_privileged"
            continue
        
        # — 2) no privileged rows
        sorted_sub = sub.sort_values("hier_tag", ascending=False)
        top_tag    = sorted_sub.iloc[0]["hier_tag"]
        top_rows   = sorted_sub[sorted_sub["hier_tag"] == top_tag]
        
        if len(top_rows) == 1:
            winner_idx = top_rows.index[0]
            win_id     = top_rows.iloc[0]["Contact Id"]
            
            df.at[winner_idx, "is_canonical"]         = True
            df.at[winner_idx, "canonical_contact_id"] = win_id
            df.at[winner_idx, "resolution_status"]    = "keep"
            
            losers_idx = sorted_sub.index.difference([winner_idx])
            df.loc[losers_idx, "is_canonical"]         = False
            df.loc[losers_idx, "canonical_contact_id"] = win_id
            df.loc[losers_idx, "resolution_status"]    = "merge"
        else:
            tie_indices   = top_rows.index
            first_tie_id  = top_rows.iloc[0]["Contact Id"]
            
            df.loc[tie_indices, "is_canonical"]          = True
            df.loc[tie_indices, "canonical_contact_id"]  = tie_indices.map(df["Contact Id"])
            df.loc[tie_indices, "resolution_status"]     = "needs_review"
            
            rest = sorted_sub.index.difference(tie_indices)
            df.loc[rest, "is_canonical"]         = False
            df.loc[rest, "canonical_contact_id"] = first_tie_id
            df.loc[rest, "resolution_status"]    = "merge"
    
    df["canonical_contact_id"] = df["canonical_contact_id"].astype(str)
    return df

In [94]:
"""
STEP 6 – Export workbook + merge log
------------------------------------
Takes the DataFrame returned by STEP 5 and writes a single Excel file
with three sheets:
  • master_contacts
  • change_log
  • needs_review
"""

import pandas as pd
from pathlib import Path

def export_dedupe_results(df_in: pd.DataFrame,
                          out_path: str | Path = "output/deduped_contacts.xlsx"):
    """
    Write *df_in* to a multi‑sheet Excel workbook summarising the dedupe run.
    
    Parameters
    ----------
    df_in : pd.DataFrame
        Contact table after STEP 5 (has canonical_contact_id, resolution_status, etc.).
    out_path : str or pathlib.Path, optional
        Where to write the workbook.
    """
    out_path = Path(out_path)
    out_path.parent.mkdir(parents=True, exist_ok=True)     # create output/ dir
    
    # -----------------------------------------
    # 1) Master sheet  – ready for CRM import
    # -----------------------------------------
    master_cols = [
        "Account Name", "Full Name", "Email", "Contact Id",
        "canonical_contact_id", "resolution_status",
        "dupe_cluster_id", "is_canonical", "hier_tag"
    ]
    master_sheet = df_in[master_cols].sort_values(
        ["dupe_cluster_id", "is_canonical"], ascending=[True, False]
    )
    
    # -----------------------------------------
    # 2) Change‑log  – one row per merged record
    # -----------------------------------------
    merge_mask = df_in["resolution_status"].isin(
        ["merge", "merge_into_privileged"]
    )
    change_log = (
        df_in.loc[merge_mask,
                  ["dupe_cluster_id", "Contact Id",
                   "canonical_contact_id", "resolution_status",
                   "hier_tag"]]
          .rename(columns={"Contact Id": "old_contact_id"})
          .sort_values("dupe_cluster_id")
    )
    
    # -----------------------------------------
    # 3) Needs‑review sheet
    # -----------------------------------------
    review_sheet = df_in[df_in["resolution_status"] == "needs_review"] \
                     .sort_values("dupe_cluster_id")
    
    # -----------------------------------------
    # 4) Write to Excel (openpyxl engine)
    # -----------------------------------------
    with pd.ExcelWriter(out_path, engine="openpyxl") as xl:
        master_sheet.to_excel(xl, sheet_name="master_contacts", index=False)
        change_log.to_excel(xl, sheet_name="change_log",     index=False)
        review_sheet.to_excel(xl, sheet_name="needs_review", index=False)
    
    print(f"✅  Dedupe workbook written to {out_path.resolve()}")


In [95]:
import pandas as pd
from rapidfuzz import distance

def apply_email_merge_or_inactivate(df: pd.DataFrame,
                                    max_email_dist: int = 1) -> pd.DataFrame:
    """
    For each non-canonical row:
      • If any canonical share the same email domain AND
        their local-part is within `max_email_dist`, merge into the
        best one (highest hier_tag).  
      • Otherwise mark resolution_status = "inactive".

    Assumes df has:
      • email_norm       (lowercased, stripped full email)
      • dupe_cluster_id
      • is_canonical
      • hier_tag
      • Contact Id
      • resolution_status (from Step 5)
    """
    df = df.copy()

    # 1) Build map: cluster_id -> list of canonical candidates
    can_lookup: dict[str, list[tuple[int,str,str,str,str]]] = {}
    canon = df[df["is_canonical"]]
    for cid, sub in canon.groupby("dupe_cluster_id"):
        # each tuple: (true_idx, email_norm, hier_tag, Contact Id, resolution_status)
        can_lookup[cid] = [
            (idx, sub.at[idx, "email_norm"], sub.at[idx, "hier_tag"],
                   sub.at[idx, "Contact Id"], sub.at[idx, "resolution_status"])
            for idx in sub.index
        ]

    # 2) Process every non-canonical row
    for idx, row in df[~df["is_canonical"]].iterrows():
        cid        = row["dupe_cluster_id"]
        my_email   = row["email_norm"] or ""
        domain, *local = my_email.split("@")
        local = local[0] if local else ""

        best_match = None  # will hold (can_idx, hier_tag, can_cid, can_status)
        for can_idx, can_email, can_tag, can_cid, can_status in can_lookup.get(cid, []):
            # split candidate
            dom2, *loc2 = can_email.split("@")
            loc2 = loc2[0] if loc2 else ""
            # only compare if domains are exact
            if domain == dom2:
                if distance.Levenshtein.distance(local, loc2) <= max_email_dist:
                    # is a valid match
                    # pick the one with higher hier_tag
                    if (best_match is None) or (can_tag > best_match[1]):
                        best_match = (can_idx, can_tag, can_cid, can_status)

        if best_match:
            _, _, target_cid, target_status = best_match
            df.at[idx, "canonical_contact_id"] = target_cid
            # preserve privileged semantics
            df.at[idx, "resolution_status"] = (
                "merge_into_privileged"
                if target_status == "keep_privileged"
                else "merge"
            )
        else:
            # no close‑enough email → inactivate
            df.at[idx, "canonical_contact_id"] = None
            df.at[idx, "resolution_status"]       = "inactive"

    return df


In [96]:
df = load_contacts("../data/duplicate_contacts_small.xlsx")   # Step 1
df = add_comparison_tag(df)                                # Step 3b
df = add_duplicate_cluster_ids(df)                         # Step 4
df = assign_canonical_records(df)                          # Step 5
export_dedupe_results(df, "output/deduped_contacts.xlsx")  # Step 6


✅  Dedupe workbook written to C:\Users\Elioa\OneDrive\Projects\data-dedup-pilot\notebooks\output\deduped_contacts.xlsx
