In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install sentence-transformers scikit-learn openpyxl



In [None]:
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer


In [None]:
import pandas as pd
data = [
    ["Policy Source","policy_source"],
    ["Reporting period","reporting_period"],
    ["Master Policy Reference","master_policy_reference"],
    ["MASTER Policy Inception","master_policy_inception"],
    ["MASTER Policy Expiry","master_policy_expiration"],
    ["Policy Number","policy_number"],
    ["Type of Inward risk Policy","type_of_inward_risk_policy"],
    ["Assured Legal Name","assured_legal_name"],
    ["Country","insured_country"],
    ["Coverage of Risk","coverage_of_risk"],
    ["Issuing Branch","issuing_branch"],
    ["Line of Business","line_of_business"],
    ["Sub Class (Primary Casualty)","sub_class"],
    ["Effective Date","cover_inception_date"],
    ["Expiry Date","cover_expiration_date"],
    ["Policy Limits","policy_limits"],
    ["Original Currency","original_currency"],
    ["Gross Premium (Original Currency)","gross_premium_original"],
    ["Local Retention","local_retention"],
    ["Gross Premium after Localised Retention","gross_premium_after_localised_retention"],
    ["Ceded Share %","ceded_share"],
    ["Ceded Premium (Original)","ceded_premium_original"],
    ["Ceding Commission %","ceding_commission_percentage"],
    ["Ceding Commission","ceding_commission"],
    ["Brokerage %","brokerage_percentage"],
    ["Brokerage","brokerage"],
    ["Tax %","tax_percentage"],
    ["Premium Tax","premium_tax"],
    ["Other Deductions %","other_deductions_percentage"],
    ["Other Deductions","other_deductions"],
    ["Settlement Currency","settlement_currency"],
    ["ROE","roe"],
    ["Net Premium","net_premium"],
]
df = pd.DataFrame(data, columns=["orig_header", "canonical_truth"])
file_path = "gold_mapping.csv"
df.to_csv(file_path, index=False)


In [None]:
import os, json, random
from typing import List, Dict, Tuple
import numpy as np
import pandas as pd
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

def read_any(path: str) -> Dict[str, pd.DataFrame]:
    if path.lower().endswith(".csv"):
        return {"Sheet1": pd.read_csv(path)}
    elif path.lower().endswith((".xlsx", ".xls")):
        x = pd.ExcelFile(path)
        out = {}
        for s in x.sheet_names:
            try:
                out[s] = pd.read_excel(path, sheet_name=s)
            except Exception:
                pass
        return out
    else:
        raise ValueError("Unsupported file type: " + path)

def clean_text(x: str) -> str:
    if not isinstance(x, str):
        x = str(x)
    return x.strip().replace("\n"," ").replace("\r"," ")

def sample_values(series: pd.Series, k: int = 30) -> List[str]:
    vals = [str(v) for v in series.dropna().astype(str).values]
    if not vals:
        return []
    if len(vals) > k:
        random.seed(42)
        vals = random.sample(vals, k)
    return vals

def make_embed_text(header: str, values: List[str], max_chars: int = 500) -> str:
    t = clean_text(header)
    if values:
        joined = " | ".join([clean_text(v) for v in values])
        t = f"{t} [SEP] {joined[:max_chars]}"
    return t

PREMIUM_SCHEMA = {
    "policy_source": ["Policy Source","Source","UW System Name"],
    "reporting_period": ["Reporting period","Reporting Period"],
    "master_policy_reference": ["Master Policy Reference","MASTER Policy Reference"],
    "master_policy_inception": ["MASTER Policy Inception","Master Policy Inception"],
    "master_policy_expiration": ["MASTER Policy Expiry","Master Policy Expiry","MASTER Policy Expiration"],
    "policy_number": ["Policy Number","Policy No","Número de póliza","N° DE POL","Local Policy Number"],
    "assured_legal_name": ["Assured Legal Name","Assured Name","Insured Name","Local Insured Name"],
    "issuing_branch": ["Issuing Branch","Producing Branch/entity","Producing Branch","Branch"],
    "line_of_business": ["Line of Business","LOB","Policy LOB","General Casualty","Casualty","Property"],
    "sub_class": ["Sub Class","Sub Class (Primary Casualty)","Subclass","SubClass"],
    "type_of_inward_risk_policy": ["Type of Inward risk Policy","Type of Inward risk","Type of inward risk policy","Facultative"],
    "insured_country": ["Country","Risk Location Country","Insured Location","Risk Country"],
    "coverage_of_risk": ["Coverage of Risk","Coverage","Cobertura"],
    "cover_inception_date": ["Effective Date","Cover Inception Date","Inception Date","Start Date","VIG DESDE"],
    "cover_expiration_date": ["Expiry Date","Cover Expiration Date","Expiration Date","End Date","VIG HASTA"],
    "policy_limits": ["Policy Limits","Sum Insured","Limit"],
    "original_currency": ["Original Currency","Claim Original Currency Code","Currency (Original)","Moneda Original"],
    "gross_premium_original": ["Gross Premium (Original Currency)","Gross Premium Original","Gross Premium (Original)"],
    "local_retention": ["Local Retention","Retention Local","Retención Local"],
    "gross_premium_after_localised_retention": ["Gross Premium after Localised Retention","Gross Premium after Localized Retention"],
    "ceded_share": ["Ceded Share %","Ceded Share","% Cesión","% CEDIDA","Ceded %"],
    "ceded_premium_original": ["Ceded Premium (Original)","Ceded Premium Original"],
    "ceding_commission_percentage": ["Ceding Commission %","Ceding commission %","Comisión Cesión %"],
    "ceding_commission": ["Ceding Commission","Ceding commission","Comisión Cesión"],
    "brokerage_percentage": ["Brokerage %","Brokerage Percentage","Comisión Broker %","Brokerage Commission %"],
    "brokerage": ["Brokerage","Brokerage Commission","Comisión de Corretaje","Broker Commission"],
    "tax_percentage": ["Tax %","Tax Percentage"],
    "premium_tax": ["Premium Tax","IPT","Impuesto sobre Primas","Montant IPT"],
    "other_deductions_percentage": ["Other Deductions %","Deducciones %","Otras Deducciones %"],
    "other_deductions": ["Other Deductions","Deducciones","Otras Deducciones"],
    "settlement_currency": ["Settlement Currency"],
    "roe": ["ROE","Rate of Exchange","Exchange Rate"],
    "net_premium": ["Net Premium","Prima Neta","P NETA","Premium Net"]
}

In [None]:
class HeaderMapper:
    def __init__(self, model_name: str, canonical_map):
        print(f"Loading model: {model_name}")
        self.model = SentenceTransformer(model_name)
        self.alias_texts = []
        self.alias_keys = []
        for canon, aliases in canonical_map.items():
            fake_alias = canon.replace("_"," ")
            for a in [fake_alias] + aliases:
                t = clean_text(a)
                if t:
                    self.alias_texts.append(t)
                    self.alias_keys.append(canon)
        print("Encoding canonical aliases ...")
        self.alias_emb = self.model.encode(self.alias_texts, convert_to_numpy=True, normalize_embeddings=True)

    def map_headers(self, df: pd.DataFrame, use_values=True, threshold=0.75, topk=3):
        out = {}
        for h in df.columns:
            try:
                vals = sample_values(df[h], 30) if use_values else []
                txt = make_embed_text(str(h), vals)
                h_emb = self.model.encode([txt], convert_to_numpy=True, normalize_embeddings=True)
                sims = cosine_similarity(h_emb, self.alias_emb)[0]
                idx = np.argsort(-sims)[:topk]
                cands = [(self.alias_keys[i], float(sims[i])) for i in idx]
                best_canon, best_score = cands[0]
                chosen = best_canon if best_score >= threshold else "UNKNOWN"
                out[str(h)] = (chosen, best_score, cands)
            except Exception:
                out[str(h)] = ("ERROR", 0.0, [("ERR", 0.0)])
        return out

def load_gold(path_csv: str):
    df = pd.read_csv(path_csv)
    return {clean_text(r.orig_header): clean_text(r.canonical_truth) for _, r in df.iterrows()}

def score_predictions(pred_map, gold_map):
    total = len(gold_map)
    correct = 0; unknown = 0
    for k, truth in gold_map.items():
        pred = pred_map.get(k, ("MISSING",0.0,[]))[0]
        if pred == truth:
            correct += 1
        elif pred == "UNKNOWN":
            unknown += 1
    wrong = total - correct - unknown
    acc = round(correct/total, 4) if total else 0.0
    return dict(total=total, correct=correct, unknown=unknown, wrong=wrong, accuracy=acc)


In [None]:
import numpy as np
from datetime import datetime, timedelta

def make_sample_premium_xlsx(path="/content/bdx_premium_sample.xlsx", n=25, seed=7):
    random.seed(seed); np.random.seed(seed)
    countries = ["Portugal","Poland","Czech Republic","Denmark","Sweden","Austria","Hungary","Greece","Finland","Lithuania"]
    branches = ["Madrid","Warsaw","Prague","Copenhagen","Stockholm","Vienna","Budapest","Athens","Helsinki","Vilnius"]
    lobs = ["General Casualty","Property","Public/Products Liability","Employer's Liability"]
    subclasses = ["Primary Casualty","Product Casualty","Cyber","Marine"]
    currencies = ["EUR","GBP","USD","NOK","CHF","PLN","SEK","DKK"]
    coverage_opts = ["Worldwide","EU Only","Worldwide except USA/Canada","EMEA"]
    types_inward = ["Facultative","Open Cover","Binder"]
    assured_names = ["Novaterra Labs","Amberline Systems","Orion Textiles","HelioData Group","SilverPeak Mobility","Northport Analytics","LumaCore Retail","GreenForge Foods","ClearDock Shipping","ZephyrWorks"]
    def rnd_date(start=datetime(2025,1,1), end=datetime(2025,3,31)):
        delta = end - start
        return start + timedelta(days=random.randint(0, delta.days))
    rows = []
    for i in range(1, n+1):
        src = "UW System Name (GenX)" if i%2==0 else "UW System Name (Sigma)"
        period = f"{random.choice(['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec'])}-25"
        master_ref = f"FAC-{2025}-{100000+i:06d}"
        master_incep = rnd_date(); master_exp = master_incep + timedelta(days=random.choice([335,364,366,395]))
        polnum = f"{random.randint(100,999)}-{random.choice(['AB','PO','QW','XZ','LM'])}{random.randint(100000,999999)}{random.randint(2001,2099)}"
        inward_type = random.choice(types_inward); assured = random.choice(assured_names)
        country = random.choice(countries); coverage = random.choice(coverage_opts)
        branch = random.choice(branches); lob = random.choice(lobs); subc = random.choice(subclasses)
        eff = rnd_date(); exp = eff + timedelta(days=random.choice([335,364,366,395]))
        limit = random.choice([1_000_000, 2_000_000, 3_000_000, 5_000_000])
        orig_ccy = random.choice(currencies); gross_orig = round(np.random.uniform(20000,120000),2)
        retention = random.choice([0,10000,20000,30000]); gross_after_local = round(gross_orig - retention, 2)
        ceded_share = random.choice([0.3,0.4,0.5,0.6]); ceded_prem = round(gross_after_local * ceded_share, 2)
        ced_comm_pct = random.choice([0.15,0.2,0.25,0.3]); ced_comm = round(ceded_prem * ced_comm_pct, 2)
        brok_pct = random.choice([0.0,0.05,0.1,0.125,0.15]); brokerage = round(gross_after_local * brok_pct, 2)
        tax_pct = random.choice([0.0,0.01,0.05,0.1]); prem_tax = round(gross_after_local * tax_pct, 2)
        other_ded_pct = random.choice([0.0,0.005,0.01]); other_ded = round(gross_after_local * other_ded_pct, 2)
        sett_ccy = random.choice(currencies); roe = round(np.random.uniform(0.75,1.25),4)
        net_premium = round(gross_after_local - ced_comm - brokerage - prem_tax - other_ded, 2)
        rows.append({
            "Policy Source": src, "Reporting period": period, "Master Policy Reference": master_ref,
            "MASTER Policy Inception": master_incep.strftime("%m/%d/%Y"), "MASTER Policy Expiry": master_exp.strftime("%m/%d/%Y"),
            "Policy Number": polnum, "Type of Inward risk Policy": inward_type, "Assured Legal Name": assured,
            "Country": country, "Coverage of Risk": coverage, "Issuing Branch": branch, "Line of Business": lob,
            "Sub Class (Primary Casualty)": subc, "Effective Date": eff.strftime("%m/%d/%Y"),
            "Expiry Date": exp.strftime("%m/%d/%Y"), "Policy Limits": f"{limit:,.0f}", "Original Currency": orig_ccy,
            "Gross Premium (Original Currency)": gross_orig, "Local Retention": retention,
            "Gross Premium after Localised Retention": gross_after_local, "Ceded Share %": f"{int(ceded_share*100)}%",
            "Ceded Premium (Original)": ceded_prem, "Ceding Commission %": f"{int(ced_comm_pct*100)}%", "Ceding Commission": ced_comm,
            "Brokerage %": f"{int(brok_pct*100)}%", "Brokerage": brokerage, "Tax %": f"{int(tax_pct*100)}%",
            "Premium Tax": prem_tax, "Other Deductions %": f"{int(other_ded_pct*100)}%", "Other Deductions": other_ded,
            "Settlement Currency": sett_ccy, "ROE": roe, "Net Premium": net_premium
        })
    df = pd.DataFrame(rows)
    df_var = df.rename(columns={"Assured Legal Name":"Assured Name","Country":"Risk Location Country",
                                "Gross Premium (Original Currency)":"Gross Premium Original",
                                "Ceding Commission %":"Ceding commission %","Brokerage %":"Brokerage Commission %"})
    with pd.ExcelWriter(path, engine="openpyxl") as writer:
        df.to_excel(writer, index=False, sheet_name="Premium")
        df_var.to_excel(writer, index=False, sheet_name="Premium_Variant")
    print(path)

make_sample_premium_xlsx()

/content/bdx_premium_sample.xlsx


In [None]:
data_path = "/content/bdx_premium_sample.xlsx"
save_dir = "/content/out"
model_name = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
threshold = 0.75
use_values = True
gold_csv = "/content/gold_mapping.csv"  # set path if available

In [None]:
os.makedirs(save_dir, exist_ok=True)
sheets = read_any(data_path)
print(f"Found {len(sheets)} sheet(s)")
mapper = HeaderMapper(model_name, PREMIUM_SCHEMA)

Found 2 sheet(s)
Loading model: sentence-transformers/paraphrase-multilingual-mpnet-base-v2


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/723 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.11G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/402 [00:00<?, ?B/s]

sentencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Encoding canonical aliases ...


In [None]:
cleaned_path = os.path.join(save_dir, os.path.splitext(os.path.basename(data_path))[0] + "_cleaned.xlsx")
report_path = os.path.join(save_dir, os.path.splitext(os.path.basename(data_path))[0] + "_mapping_report.csv")


In [None]:
writer = pd.ExcelWriter(cleaned_path, engine="openpyxl")
all_reports = []

for sname, df in sheets.items():
    print(f"\n--- Sheet: {sname} ---")
    pred_map = mapper.map_headers(df, use_values=use_values, threshold=threshold)
    for h, (c, sc, cand) in pred_map.items():
        print(f"{h:40s} -> {c:35s} (score={sc:.3f})   top={cand}")
    rows = []
    for h, (c, sc, cand) in pred_map.items():
        rows.append({"sheet": sname, "original_header": h, "predicted_canonical": c,
                     "score": round(sc,4), "top_candidates": "; ".join([f"{a}:{round(b,3)}" for a,b in cand])})
    rep_df = pd.DataFrame(rows)
    all_reports.append(rep_df)
    rename_map = {h: c for h, (c,_,_) in pred_map.items() if c not in ("UNKNOWN","ERROR")}
    df.rename(columns=rename_map).to_excel(writer, sheet_name=sname[:31], index=False)

writer.close()
pd.concat(all_reports, ignore_index=True).to_csv(report_path, index=False)
print(f"\nSaved cleaned Excel: {cleaned_path}")
print(f"Saved mapping report: {report_path}")



--- Sheet: Premium ---
Policy Source                            -> UNKNOWN                             (score=0.637)   top=[('policy_source', 0.6373077630996704), ('policy_source', 0.42295047640800476), ('policy_source', 0.4118592143058777)]
Reporting period                         -> UNKNOWN                             (score=0.624)   top=[('reporting_period', 0.6244884729385376), ('reporting_period', 0.6233212947845459), ('reporting_period', 0.6111928224563599)]
Master Policy Reference                  -> UNKNOWN                             (score=0.668)   top=[('master_policy_reference', 0.6678569316864014), ('master_policy_reference', 0.6620012521743774), ('master_policy_reference', 0.659920334815979)]
MASTER Policy Inception                  -> UNKNOWN                             (score=0.687)   top=[('master_policy_inception', 0.6870545744895935), ('master_policy_reference', 0.6532520651817322), ('master_policy_expiration', 0.6489940881729126)]
MASTER Policy Expiry              

In [None]:
if gold_csv and os.path.exists(gold_csv):
    gold_map = load_gold(gold_csv)
    merged = {}
    for rep in all_reports:
        for _, r in rep.iterrows():
            merged[clean_text(r['original_header'])] = (r['predicted_canonical'], r['score'], [])
    metrics = score_predictions(merged, gold_map)
    print("\n=== Accuracy Summary ===")
    for k,v in metrics.items():
        print(f"{k:10s}: {v}")
    print(f"\nMeaning: {metrics['accuracy']*100:.1f}% of tested columns ({metrics['correct']}/{metrics['total']}) mapped correctly.")
else:
    print("\n(no gold CSV provided — skipped accuracy)")


=== Accuracy Summary ===
total     : 33
correct   : 11
unknown   : 22
wrong     : 0
accuracy  : 0.3333

Meaning: 33.3% of tested columns (11/33) mapped correctly.


In [2]:

import os, re, json, random, unicodedata
from typing import List, Dict, Tuple
import numpy as np
import pandas as pd
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from dateutil.parser import parse as dateparse

def normalize_text(s: str) -> str:
    if not isinstance(s, str):
        s = str(s)
    s = unicodedata.normalize('NFKD', s)
    s = ''.join(c for c in s if not unicodedata.combining(c))
    s = s.lower()
    s = re.sub(r"[^a-z0-9%./\-]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def clean_header_for_embed(h: str) -> str:
    return normalize_text(h)

ISO_CURRENCIES = set(["aed", "afn", "all", "amd", "ang", "aoa", "ars", "aud", "awg", "azn", "bam", "bbd", "bdt", "bgn", "bhd", "bif", "bmd", "bnd", "bob", "brl", "bsd", "btn", "bwp", "byr", "bzd", "cad", "cdf", "chf", "clp", "cny", "cop", "crc", "cup", "cve", "czk", "djf", "dkk", "dop", "dzd", "egp", "ern", "etb", "eur", "fjd", "fkp", "gbp", "gel", "ghs", "gip", "gmd", "gnf", "gtq", "gyd", "hkd", "hnl", "hrk", "htg", "huf", "idr", "ils", "inr", "iqd", "irr", "isk", "jmd", "jod", "jpy", "kes", "kgs", "khr", "kmf", "kpw", "krw", "kwd", "kyd", "kzt", "lak", "lbp", "lkr", "lrd", "lsl", "lyd", "mad", "mdl", "mga", "mkd", "mmk", "mnt", "mop", "mur", "mvr", "mwk", "mxn", "myr", "mzn", "nad", "ngn", "nio", "nok", "npr", "nzd", "omr", "pab", "pen", "pgk", "php", "pkr", "pln", "pyg", "qar", "ron", "rsd", "rub", "rwf", "sar", "sbd", "scr", "sdg", "sek", "sgd", "shp", "sll", "sos", "srd", "std", "svc", "syp", "szl", "thb", "tjs", "tmt", "tnd", "top", "try", "ttd", "twd", "tzs", "uah", "ugx", "usd", "uyu", "uzs", "vef", "vnd", "vuv", "wst", "xaf", "xcd", "xdr", "xof", "xpf", "yer", "zar", "zmw"])

def frac_parseable_dates(vals: List[str], sample=50) -> float:
    if not vals: return 0.0
    cnt = 0; tot = 0
    for v in vals[:sample]:
        v2 = v.strip()
        try:
            _ = dateparse(v2, dayfirst=False, fuzzy=True)
            cnt += 1
        except Exception:
            pass
        tot += 1
    return cnt / max(tot,1)

def frac_numeric(vals: List[str], sample=50) -> float:
    if not vals: return 0.0
    cnt = 0; tot = 0
    for v in vals[:sample]:
        v2 = v.replace(",","").replace(" ","")
        try:
            float(re.sub(r"[^0-9.\-]", "", v2))
            if re.search(r"\d", v2):
                cnt += 1
        except Exception:
            pass
        tot += 1
    return cnt / max(tot,1)

def frac_percent(vals: List[str], sample=50) -> float:
    if not vals: return 0.0
    cnt = 0; tot = 0
    for v in vals[:sample]:
        if "%" in v: cnt += 1
        tot += 1
    return cnt / max(tot,1)

def frac_currency_codes(vals: List[str], sample=50) -> float:
    if not vals: return 0.0
    cnt = 0; tot = 0
    for v in vals[:sample]:
        tok = normalize_text(v).split()
        if any(t in ISO_CURRENCIES for t in tok):
            cnt += 1
        tot += 1
    return cnt / max(tot,1)

def detect_column_signature(series: pd.Series) -> Dict[str, float]:
    vals = [str(v) for v in series.dropna().astype(str).values[:200]]
    return {
        "date_ratio": frac_parseable_dates(vals),
        "numeric_ratio": frac_numeric(vals),
        "percent_ratio": frac_percent(vals),
        "currency_ratio": frac_currency_codes(vals),
        "sample_count": len(vals)
    }

def sample_values(series: pd.Series, k: int = 30) -> List[str]:
    vals = [str(v) for v in series.dropna().astype(str).values]
    if not vals: return []
    if len(vals) > k:
        random.seed(42); vals = random.sample(vals, k)
    return vals

def make_embed_text(header: str, values: List[str], max_chars: int = 500) -> str:
    h = clean_header_for_embed(header)
    if values:
        joined = " | ".join([normalize_text(v) for v in values])
        return f"{h} [SEP] {joined[:max_chars]}"
    return h

def read_any(path: str) -> Dict[str, pd.DataFrame]:
    if path.lower().endswith(".csv"):
        return {"Sheet1": pd.read_csv(path)}
    elif path.lower().endswith((".xlsx", ".xls")):
        x = pd.ExcelFile(path)
        out = {}
        for s in x.sheet_names:
            try:
                out[s] = pd.read_excel(path, sheet_name=s)
            except Exception:
                pass
        return out
    else:
        raise ValueError("Unsupported file type: " + path)

class HeaderMapperV2:
    def __init__(self, model_name: str, schema: Dict[str, dict],
                 w_embed_header=0.7, w_embed_values=0.3, w_type=0.2, w_hints=0.1):
        self.model = SentenceTransformer(model_name)
        self.schema = schema
        self.w_embed_header = w_embed_header
        self.w_embed_values = w_embed_values
        self.w_type = w_type
        self.w_hints = w_hints

        self.alias_texts = []
        self.alias_keys = []
        self.field_meta = {}
        for key, obj in schema.items():
            aliases = obj.get("aliases", [])
            exp_type = obj.get("expected_type","text")
            hints = [normalize_text(h) for h in obj.get("hints", [])]
            self.field_meta[key] = {"expected_type": exp_type, "hints": hints}
            canon_as_alias = normalize_text(key.replace("_"," "))
            for a in [canon_as_alias] + aliases:
                self.alias_texts.append(normalize_text(a))
                self.alias_keys.append(key)

        self.alias_emb = self.model.encode(self.alias_texts, convert_to_numpy=True, normalize_embeddings=True)

    def _type_compat_score(self, key: str, sig: Dict[str, float]) -> float:
        exp = self.field_meta[key]["expected_type"]
        d, n, p, c = sig["date_ratio"], sig["numeric_ratio"], sig["percent_ratio"], sig["currency_ratio"]
        if exp == "date":
            return d
        if exp == "amount":
            return max(0.0, n - 0.3*p)
        if exp == "percent":
            return max(p, min(1.0, n*0.6))
        if exp == "currency":
            return c
        if exp == "fx_rate":
            return n
        return max(0.0, 1.0 - n)

    def _hints_bonus(self, key: str, header_norm: str) -> float:
        hints = self.field_meta[key]["hints"]
        if not hints: return 0.0
        score = 0.0
        for h in hints:
            if h in header_norm:
                score += 0.2
        return min(score, 0.6)

    def map_headers(self, df: pd.DataFrame, threshold_hi=0.80, threshold_lo=0.72,
                    use_values=True, topk=5) -> Dict[str, Tuple[str, float, List[Tuple[str, float]]]]:
        out = {}
        for h in df.columns:
            try:
                header_norm = clean_header_for_embed(str(h))
                vals = sample_values(df[h], 30) if use_values else []
                sig = detect_column_signature(df[h])
                txt_header = header_norm
                txt_with_vals = make_embed_text(str(h), vals) if use_values else header_norm
                emb_header = self.model.encode([txt_header], convert_to_numpy=True, normalize_embeddings=True)
                emb_with_vals = self.model.encode([txt_with_vals], convert_to_numpy=True, normalize_embeddings=True)

                sims_h = cosine_similarity(emb_header, self.alias_emb)[0]
                sims_v = cosine_similarity(emb_with_vals, self.alias_emb)[0]

                scores_by_key = {}
                # examine many aliases to collect per-key max
                for i in np.argsort(-sims_h)[:300]:
                    key = self.alias_keys[i]
                    base = self.w_embed_header * float(sims_h[i]) + self.w_embed_values * float(sims_v[i])
                    type_score = self._type_compat_score(key, sig)
                    hint_score = self._hints_bonus(key, header_norm)
                    combined = base + self.w_type * type_score + self.w_hints * hint_score
                    if key not in scores_by_key or combined > scores_by_key[key]:
                        scores_by_key[key] = combined

                ranked = sorted(scores_by_key.items(), key=lambda z: -z[1])[:topk]
                best_key, best_score = ranked[0]
                if best_score >= threshold_hi:
                    chosen = best_key
                elif best_score >= threshold_lo and self._type_compat_score(best_key, sig) >= 0.5:
                    chosen = best_key
                else:
                    chosen = "UNKNOWN"

                out[str(h)] = (chosen, float(best_score), ranked)
            except Exception:
                out[str(h)] = ("ERROR", 0.0, [("ERR", 0.0)])
        return out

def load_gold(path_csv: str) -> Dict[str,str]:
    df = pd.read_csv(path_csv)
    return {str(r.orig_header).strip(): str(r.canonical_truth).strip() for _, r in df.iterrows()}

def score_predictions(pred_map: Dict[str, Tuple[str,float,list]], gold_map: Dict[str,str]):
    total = len(gold_map)
    correct = 0; unknown = 0
    for k, truth in gold_map.items():
        pred = pred_map.get(k, ("MISSING",0.0,[]))[0]
        if pred == truth:
            correct += 1
        elif pred == "UNKNOWN":
            unknown += 1
    wrong = total - correct - unknown
    acc = round(correct/total, 4) if total else 0.0
    return dict(total=total, correct=correct, unknown=unknown, wrong=wrong, accuracy=acc)


In [3]:

import numpy as np, random
from datetime import datetime, timedelta

def make_sample_premium_xlsx(path="/content/bdx_premium_sample.xlsx", n=25, seed=7):
    random.seed(seed); np.random.seed(seed)
    countries = ["Portugal","Poland","Czech Republic","Denmark","Sweden","Austria","Hungary","Greece","Finland","Lithuania"]
    branches = ["Madrid","Warsaw","Prague","Copenhagen","Stockholm","Vienna","Budapest","Athens","Helsinki","Vilnius"]
    lobs = ["General Casualty","Property","Public/Products Liability","Employer's Liability"]
    subclasses = ["Primary Casualty","Product Casualty","Cyber","Marine"]
    currencies = ["EUR","GBP","USD","NOK","CHF","PLN","SEK","DKK"]
    coverage_opts = ["Worldwide","EU Only","Worldwide except USA/Canada","EMEA"]
    types_inward = ["Facultative","Open Cover","Binder"]
    assured_names = [
        "Novaterra Labs","Amberline Systems","Orion Textiles","HelioData Group","SilverPeak Mobility",
        "Northport Analytics","LumaCore Retail","GreenForge Foods","ClearDock Shipping","ZephyrWorks"
    ]

    def rnd_date(start=datetime(2025,1,1), end=datetime(2025,3,31)):
        delta = end - start
        return start + timedelta(days=random.randint(0, delta.days))

    rows = []
    for i in range(1, n+1):
        src = "UW System Name (GenX)" if i%2==0 else "UW System Name (Sigma)"
        period = f"{random.choice(['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec'])}-25"
        master_ref = f"FAC-{2025}-{100000+i:06d}"
        master_incep = rnd_date(); master_exp = master_incep + timedelta(days=random.choice([335,364,366,395]))
        polnum = f"{random.randint(100,999)}-{random.choice(['AB','PO','QW','XZ','LM'])}{random.randint(100000,999999)}{random.randint(2001,2099)}"
        inward_type = random.choice(types_inward)
        assured = random.choice(assured_names)
        country = random.choice(countries)
        coverage = random.choice(coverage_opts)
        branch = random.choice(branches)
        lob = random.choice(lobs)
        subc = random.choice(subclasses)
        eff = rnd_date(); exp = eff + timedelta(days=random.choice([335,364,366,395]))
        limit = random.choice([1_000_000, 2_000_000, 3_000_000, 5_000_000])
        orig_ccy = random.choice(currencies)
        gross_orig = round(np.random.uniform(20000, 120000),2)
        retention = random.choice([0, 10000, 20000, 30000])
        gross_after_local = round(gross_orig - retention, 2)
        ceded_share = random.choice([0.3, 0.4, 0.5, 0.6])
        ceded_prem = round(gross_after_local * ceded_share, 2)
        ced_comm_pct = random.choice([0.15, 0.2, 0.25, 0.3])
        ced_comm = round(ceded_prem * ced_comm_pct, 2)
        brok_pct = random.choice([0.0, 0.05, 0.1, 0.125, 0.15])
        brokerage = round(gross_after_local * brok_pct, 2)
        tax_pct = random.choice([0.0, 0.01, 0.05, 0.1])
        prem_tax = round(gross_after_local * tax_pct, 2)
        other_ded_pct = random.choice([0.0, 0.005, 0.01])
        other_ded = round(gross_after_local * other_ded_pct, 2)
        sett_ccy = random.choice(currencies)
        roe = round(np.random.uniform(0.75, 1.25),4)
        net_premium = round(gross_after_local - ced_comm - brokerage - prem_tax - other_ded, 2)

        rows.append({
            "Policy Source": src, "Reporting period": period, "Master Policy Reference": master_ref,
            "MASTER Policy Inception": master_incep.strftime("%m/%d/%Y"),
            "MASTER Policy Expiry": master_exp.strftime("%m/%d/%Y"),
            "Policy Number": polnum, "Type of Inward risk Policy": inward_type,
            "Assured Legal Name": assured, "Country": country, "Coverage of Risk": coverage,
            "Issuing Branch": branch, "Line of Business": lob, "Sub Class (Primary Casualty)": subc,
            "Effective Date": eff.strftime("%m/%d/%Y"), "Expiry Date": exp.strftime("%m/%d/%Y"),
            "Policy Limits": f"{limit:,.0f}", "Original Currency": orig_ccy,
            "Gross Premium (Original Currency)": gross_orig, "Local Retention": retention,
            "Gross Premium after Localised Retention": gross_after_local, "Ceded Share %": f"{int(ceded_share*100)}%",
            "Ceded Premium (Original)": ceded_prem, "Ceding Commission %": f"{int(ced_comm_pct*100)}%",
            "Ceding Commission": ced_comm, "Brokerage %": f"{int(brok_pct*100)}%",
            "Brokerage": brokerage, "Tax %": f"{int(tax_pct*100)}%", "Premium Tax": prem_tax,
            "Other Deductions %": f"{int(other_ded_pct*100)}%", "Other Deductions": other_ded,
            "Settlement Currency": sett_ccy, "ROE": roe, "Net Premium": net_premium
        })
    df = pd.DataFrame(rows)
    df_var = df.rename(columns={
        "Assured Legal Name": "Assured Name",
        "Country": "Risk Location Country",
        "Gross Premium (Original Currency)": "Gross Premium Original",
        "Ceding Commission %": "Ceding commission %",
        "Brokerage %": "Brokerage Commission %",
    })
    with pd.ExcelWriter(path, engine="openpyxl") as writer:
        df.to_excel(writer, index=False, sheet_name="Premium")
        df_var.to_excel(writer, index=False, sheet_name="Premium_Variant")
    return path

sample_path = make_sample_premium_xlsx()
sample_path


'/content/bdx_premium_sample.xlsx'

In [5]:

data_path = "/content/bdx_premium_sample.xlsx"
schema_json = "/content/premium_schema_large.json"
save_dir = "/content/out"
model_name = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
threshold_hi = 0.80
threshold_lo = 0.72
use_values = True
gold_csv = "/content/gold_mapping.csv"

os.makedirs(save_dir, exist_ok=True)

with open(schema_json, "r", encoding="utf-8") as f:
    schema = json.load(f)

sheets = read_any(data_path)
print(f"Found {len(sheets)} sheet(s)")

mapper = HeaderMapperV2(model_name, schema)

cleaned_path = os.path.join(save_dir, os.path.splitext(os.path.basename(data_path))[0] + "_cleaned.xlsx")
report_path = os.path.join(save_dir, os.path.splitext(os.path.basename(data_path))[0] + "_mapping_report.csv")

writer = pd.ExcelWriter(cleaned_path, engine="openpyxl")
all_reports = []

for sname, df in sheets.items():
    print(f"\n--- Sheet: {sname} ---")
    pred_map = mapper.map_headers(df, threshold_hi=threshold_hi, threshold_lo=threshold_lo, use_values=use_values)
    for h, (c, sc, cand) in pred_map.items():
        print(f"{h:40s} -> {c:35s} (score={sc:.3f})  top={[ (a, round(b,3)) for a,b in cand[:3] ]}")
    rows = []
    for h, (c, sc, cand) in pred_map.items():
        rows.append({
            "sheet": sname,
            "original_header": h,
            "predicted_canonical": c,
            "combined_score": round(sc, 4),
            "top_candidates": "; ".join([f"{a}:{round(b,3)}" for a,b in cand[:5]])
        })
    rep_df = pd.DataFrame(rows)
    all_reports.append(rep_df)
    rename_map = {h: c for h, (c,_,_) in pred_map.items() if c not in ("UNKNOWN","ERROR")}
    df.rename(columns=rename_map).to_excel(writer, sheet_name=sname[:31], index=False)

writer.close()
pd.concat(all_reports, ignore_index=True).to_csv(report_path, index=False)

print(f"\nSaved cleaned Excel: {cleaned_path}")
print(f"Saved mapping report: {report_path}")

if gold_csv and os.path.exists(gold_csv):
    gold_map = load_gold(gold_csv)
    merged = {}
    for rep in all_reports:
        for _, r in rep.iterrows():
            merged[str(r['original_header']).strip()] = (r['predicted_canonical'], r['combined_score'], [])
    metrics = score_predictions(merged, gold_map)
    print("\n=== Accuracy Summary ===")
    for k,v in metrics.items():
        print(f"{k:10s}: {v}")
    print(f"\nMeaning: {metrics['accuracy']*100:.1f}% of tested columns ({metrics['correct']}/{metrics['total']}) mapped correctly.")
else:
    print("\n(no gold CSV provided — skipped accuracy)")


Found 2 sheet(s)

--- Sheet: Premium ---
Policy Source                            -> policy_source                       (score=1.060)  top=[('policy_source', 1.06), ('policy_number', 0.957), ('line_of_business', 0.914)]
Reporting period                         -> reporting_period                    (score=0.936)  top=[('reporting_period', 0.936), ('cover_inception_date', 0.789), ('cover_expiration_date', 0.734)]
Master Policy Reference                  -> master_policy_reference             (score=1.169)  top=[('master_policy_reference', 1.169), ('policy_number', 0.959), ('policy_source', 0.915)]
MASTER Policy Inception                  -> master_policy_inception             (score=1.130)  top=[('master_policy_inception', 1.13), ('master_policy_reference', 0.82), ('master_policy_expiration', 0.818)]
MASTER Policy Expiry                     -> master_policy_expiration            (score=1.169)  top=[('master_policy_expiration', 1.169), ('cover_expiration_date', 0.937), ('cover_inception