<a href="https://colab.research.google.com/github/chamindu002/chamindu002/blob/main/sanction_textual_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install unidecode -q
!pip install torch pandas openpyxl -q

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/235.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.8/235.8 kB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
import torch
import torch.nn as nn
import re
from unidecode import unidecode
import pandas as pd
from google.colab import drive
import numpy as np

print("Mounting Google Drive...")
drive.mount('/content/drive')
print("Drive mounted.")

Mounting Google Drive...
Mounted at /content/drive
Drive mounted.


Define Model Classes

In [3]:
class NameEncoder(nn.Module):
    def __init__(self, vocab_size, embed_dim=96, hidden=192):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = nn.LSTM(embed_dim, hidden, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden*2, 192)

    def forward(self, x):
        x = self.embed(x)
        out, _ = self.lstm(x)
        out = out[:, -1, :]  # last timestep
        out = torch.tanh(self.fc(out))
        return out

class Siamese(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.encoder = NameEncoder(vocab_size)
        self.cosine = nn.CosineSimilarity(dim=1)

    def forward(self, x1, x2):
        v1 = self.encoder(x1)
        v2 = self.encoder(x2)
        return self.cosine(v1, v2)

**Load the Model**

In [4]:
device = "cuda" if torch.cuda.is_available() else "cpu"
load_path = "/content/drive/My Drive/Research/Data/siamese_name_matcher_best_intials.pt"

print(f"Loading model from: {load_path}")
checkpoint = torch.load(load_path, map_location=device)

# Restore config
char2idx = checkpoint['char2idx']
max_len = checkpoint['max_len']
vocab_size = checkpoint['vocab_size']

model = Siamese(vocab_size).to(device)
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()
print("✅ Model loaded!")

Loading model from: /content/drive/My Drive/Research/Data/siamese_name_matcher_best_intials.pt
✅ Model loaded!


Define Helper Functions

In [5]:
def normalize_and_transliterate(s):
    if s is None or pd.isna(s): return ""
    s = unidecode(str(s))
    s = s.lower().strip()
    s = re.sub(r"[^a-z\s\.]", " ", s)  # Keep dots for initials
    s = re.sub(r"\s+", " ", s)
    return s.strip()

def encode_name(name):
    name = normalize_and_transliterate(name)
    seq = [char2idx.get(c, char2idx["<UNK>"]) for c in name[:max_len]]
    seq += [char2idx["<PAD>"]] * (max_len - len(seq))
    return torch.tensor([seq], dtype=torch.long).to(device)

def get_similarity(name1, name2):
    if not name1 or not name2: return 0.0
    t1 = encode_name(name1)
    t2 = encode_name(name2)
    with torch.no_grad():
        score = model(t1, t2).item()
    return score

def extract_birth_year(birth_date):
    """Extract set of possible birth years from string."""
    if pd.isna(birth_date) or not str(birth_date).strip():
        return set()
    bd = str(birth_date).strip()
    years = set()

    # Full date: 1978-04-28 -> 1978
    if re.match(r'^\d{4}-\d{2}-\d{2}$', bd):
        years.add(int(bd[:4]))

    # Year only: 1973
    elif re.match(r'^\d{4}$', bd):
        years.add(int(bd))

    # Range: 1970-1980
    elif '-' in bd:
        parts = bd.split('-')
        if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit():
            start, end = int(parts[0]), int(parts[1])
            years.update(range(start, end+1))

    # Multiple: 1970, 1972 -> {1970,1972}
    elif ',' in bd:
        for y in bd.split(','):
            y = y.strip()
            if y.isdigit() and len(y)==4:
                years.add(int(y))

    return years

def fields_match(cust_val, src_val, field):
    """Check if fields match (exact, ignoring case, if provided)."""
    if pd.isna(cust_val) or not str(cust_val).strip():
        return True  # If customer doesn't provide, skip
    cust = str(cust_val).strip().upper()
    src = str(src_val).strip().upper()
    return cust == src

**Load Master Sanction/PEP List**

In [6]:
master_path = "/content/drive/My Drive/Research/Data/testing_data.csv"
master_df = pd.read_csv(master_path)
master_df.fillna("", inplace=True)
master_df['NAME'] = master_df['NAME'].astype(str).str.strip().str.upper()
master_df['ALIAS'] = master_df['ALIAS'].astype(str).str.strip()
master_df['BIRTH_DATE'] = master_df['BIRTH_DATE'].astype(str).str.strip()
master_df['ID'] = master_df['ID'].astype(str).str.strip()
master_df['NATIONALITY'] = master_df['NATIONALITY'].astype(str).str.strip()

print(f"Loaded master list: {len(master_df)} records")

Loaded master list: 32 records


**Load Customer List (Update Path After Uploading File)**

In [7]:
# Assume you upload 'customer_list.xlsx' to Colab's /content/ folder
customer_path = "/content/drive/My Drive/Research/Data/customer/testing_data_cus_list.xlsx"  # Update if different
customer_df = pd.read_excel(customer_path)
customer_df.fillna("", inplace=True)
customer_df['NAME'] = customer_df['NAME'].astype(str).str.strip().str.upper()
customer_df['ALIAS'] = customer_df['ALIAS'].astype(str).str.strip() if 'ALIAS' in customer_df else ""
customer_df['BIRTH_DATE'] = customer_df['BIRTH_DATE'].astype(str).str.strip() if 'BIRTH_DATE' in customer_df else ""
customer_df['ID'] = customer_df['ID'].astype(str).str.strip() if 'ID' in customer_df else ""
customer_df['NATIONALITY'] = customer_df['NATIONALITY'].astype(str).str.strip() if 'NATIONALITY' in customer_df else ""

print(f"Loaded customer list: {len(customer_df)} records")

Loaded customer list: 5 records


**Perform Matching**

In [14]:
import pandas as pd
import numpy as np

# ==========================================
# 1. CONFIGURATION & SETUP
# ==========================================
# Display settings to see all columns
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

SIMILARITY_THRESHOLD = 0.50  # Minimum name similarity to consider
RISK_THRESHOLD = 0.50        # Minimum risk score to save

WEIGHTS = {
    "name": 0.6,
    "dob": 0.2,
    "nationality": 0.1,
    "id": 0.1
}

# ==========================================
# 2. HELPER FUNCTIONS
# ==========================================
def safe_get(row, keys, default=""):
    """Robustly fetch keys like CATEGORY/Category/Type."""
    for k in keys:
        if k in row:
            val = row[k]
            if pd.notna(val) and str(val).strip():
                return str(val).strip()
    return default

def safe_field_match(cust_val, src_val):
    """Checks exact match, ignoring case and scientific notation."""
    if not cust_val or not src_val:
        return 0.0
    # specific fix for ID scientific notation (e.g. 2E+11)
    c_str = str(cust_val).strip().upper().replace(".0", "")
    s_str = str(src_val).strip().upper().replace(".0", "")
    if c_str == s_str:
        return 1.0
    return 0.0

def birth_year_score(cust_years, src_years):
    if not cust_years or not src_years:
        return 0.0
    if cust_years.intersection(src_years):
        return 1.0
    return 0.0

# ==========================================
# 3. MAIN MATCHING LOOP
# ==========================================
reports = []
print(f"Starting matching process for {len(customer_df)} customers against {len(master_df)} source records...")

for _, cust in customer_df.iterrows():
    # 3a. Parse Customer Data
    cust_name = cust.get("NAME", "")
    cust_alias = cust.get("ALIAS", "")
    cust_birth_years = extract_birth_year(cust.get("BIRTH_DATE", ""))
    cust_id = cust.get("ID", "")
    cust_nat = cust.get("NATIONALITY", "")

    best_match = None
    best_risk = -1.0
    best_details = {}

    # 3b. Iterate Master List
    for _, src in master_df.iterrows():
        src_id = src.get("ID", "")

        # --- SCORE CALCULATION ---

        # 1. NAME CHECK
        sims = {"NAME": get_similarity(cust_name, src.get("NAME", ""))}
        if "ALIAS" in src and pd.notna(src["ALIAS"]):
            sims["ALIAS"] = get_similarity(cust_name, src["ALIAS"])
        if cust_alias:
            sims["CUST_ALIAS"] = get_similarity(cust_alias, src.get("NAME", ""))

        # Find best name score and which field matched
        name_match_type, name_score = max(sims.items(), key=lambda x: x[1])

        # 2. ATTRIBUTE CHECKS
        src_birth_years = extract_birth_year(src.get("BIRTH_DATE", ""))
        dob_score = birth_year_score(cust_birth_years, src_birth_years)
        nat_score = safe_field_match(cust_nat, src.get("NATIONALITY", ""))
        id_score = safe_field_match(cust_id, src_id)

        # 3. DETERMINE FINAL MATCH REASON (Priority Logic)
        final_reason = name_match_type  # Default
        if id_score == 1.0:
            final_reason = "ID_MATCH"
        elif dob_score == 1.0 and name_score >= SIMILARITY_THRESHOLD:
            final_reason = f"{name_match_type}_AND_DOB"

        # 4. CALCULATE RISK
        current_risk_score = (
            name_score * WEIGHTS["name"] +
            dob_score * WEIGHTS["dob"] +
            nat_score * WEIGHTS["nationality"] +
            id_score * WEIGHTS["id"]
        )

        # Force High Risk for ID Match
        if id_score == 1.0:
            current_risk_score = 1.0
            confidence = "CERTAIN"
        else:
            match_count = sum([1 for s in [dob_score, nat_score, id_score] if s == 1.0])
            if match_count >= 2 and name_score > 0.70:
                confidence = "VERY_HIGH"
            elif match_count >= 1 and name_score > 0.75:
                confidence = "HIGH"
            elif name_score > 0.80:
                confidence = "MEDIUM"
            else:
                confidence = "LOW"

        # 5. KEEP BEST MATCH
        if current_risk_score > best_risk:
            best_risk = current_risk_score
            best_match = src
            best_details = {
                "risk_score": round(current_risk_score, 4),
                "confidence": confidence,
                "match_reason": final_reason,
                "score_name": name_score,
                "score_dob": dob_score,
                "score_nat": nat_score,
                "score_id": id_score
            }

    # 3c. Save Record if Threshold Met
    if best_match is not None and (best_details["risk_score"] >= RISK_THRESHOLD or best_details["score_id"] == 1.0):

        # Get Source Info
        src_cat = safe_get(best_match, ["CATEGORY", "Category", "TYPE", "Type"], "Unknown").upper()
        src_ds = safe_get(best_match, ["DATASET", "Dataset", "SOURCE", "Source"], "Unknown").upper()

        reports.append({
            "customer_name": cust_name,
            "source_name": best_match.get("NAME", ""),
            "TYPE": src_cat,
            "SOURCE_LIST": src_ds,

            "status": "HIT" if best_details["risk_score"] >= 0.7 else "REVIEW",
            "risk_score": best_details["risk_score"],
            "confidence": best_details["confidence"],
            "match_reason": best_details["match_reason"],

            # RAW SCORES
            "SCORE_NAME": f"{best_details['score_name']*100:.1f}%",
            "SCORE_DOB": f"{best_details['score_dob']*100:.1f}%",
            "SCORE_NAT": f"{best_details['score_nat']*100:.1f}%",
            "SCORE_ID": f"{best_details['score_id']*100:.1f}%",

            # Metadata
            "customer_id": cust_id,
            "source_id": best_match.get("ID", "")
        })



Starting matching process for 5 customers against 32 source records...


**Generate and Save Report**

In [16]:
 #==========================================
# 4. SUMMARY REPORTING
# ==========================================
if reports:
    report_df = pd.DataFrame(reports)

    # Define clean column order
    cols_order = [
        "customer_name", "source_name", "TYPE", "SOURCE_LIST",
        "status", "risk_score", "confidence", "match_reason",
        "SCORE_NAME", "SCORE_DOB", "SCORE_NAT", "SCORE_ID"
    ]
    # Filter columns
    final_cols = [c for c in cols_order if c in report_df.columns] + \
                 [c for c in report_df.columns if c not in cols_order]
    report_df = report_df[final_cols]

    # --- PRINT SUMMARY ---
    print("\n" + "="*50)
    print("MATCHING SUMMARY STATISTICS")
    print("="*50)
    print(f"Total Hits Found: {len(report_df)}")

    print("\n[1] Breakdown by Match Reason:")
    print(report_df['match_reason'].value_counts())

    print("\n[2] Breakdown by Confidence:")
    print(report_df['confidence'].value_counts())

    print("\n[3] Breakdown by Category (Sanction/PEP):")
    print(report_df['TYPE'].value_counts())

    # Save
    save_path = '/content/drive/My Drive/Research/Data/customer/sanction_check_report.csv'
    report_df.to_csv(save_path, index=False)
    print("\n" + "="*50)
    print(f"Full report saved to: {save_path}")
    print("="*50)

    # Show preview
    print("\nPreview of Top Matches:")
    print(report_df.head(5))

else:
    print("\nNo matches found above the threshold.")


MATCHING SUMMARY STATISTICS
Total Hits Found: 5

[1] Breakdown by Match Reason:
match_reason
NAME        3
ID_MATCH    2
Name: count, dtype: int64

[2] Breakdown by Confidence:
confidence
HIGH       3
CERTAIN    2
Name: count, dtype: int64

[3] Breakdown by Category (Sanction/PEP):
TYPE
SANCTION    3
PEP         2
Name: count, dtype: int64

Full report saved to: /content/drive/My Drive/Research/Data/customer/sanction_check_report.csv

Preview of Top Matches:
                               customer_name                                source_name      TYPE   SOURCE_LIST  status  risk_score confidence match_reason SCORE_NAME SCORE_DOB SCORE_NAT SCORE_ID   customer_id     source_id
0  DEEGODA GAMAGEI CHAMINDU DENUWAN RASHMIKA  DEEGODA GAMAGEI CHAMINDU DENUWAN RASHMIKA  SANCTION  TESTING_DATA     HIT      1.0000    CERTAIN     ID_MATCH     100.0%      0.0%    100.0%   100.0%  200221302925  200221302925
1                     CHARUKA BANDARA DANAKA                   CHARUKA BANDARA DAHANAKA 

In [9]:
# if reports:
#     report_df = pd.DataFrame(reports)
#     report_df.to_csv('/content/drive/My Drive/Research/Data/customer/sanction_check_report.csv', index=False)
#     print("\n--- SANCTION/PEP MATCH SUMMARY ---")
#     print(report_df)
#     print("\nReport saved to: /content/sanction_check_report.csv")
# else:
#     print("\nNo matches found for any customers.")


--- SANCTION/PEP MATCH SUMMARY ---
                               customer_name  \
0  DEEGODA GAMAGEI CHAMINDU DENUWAN RASHMIKA   
1                     CHARUKA BANDARA DANAKA   
2                               A H B K SAMA   
3                      W. M. CHATHURA DESHAN   
4                     K. G. N. PRIYADARSHNEE   

                                 source_name  status SCORE_NAME SCORE_DOB  \
0  DEEGODA GAMAGEI CHAMINDU DENUWAN RASHMIKA     HIT     100.0%      0.0%   
1                   CHARUKA BANDARA DAHANAKA  REVIEW      99.7%      0.0%   
2                           A H B K SAMANTHA  REVIEW      97.0%      0.0%   
3                      W. M. CHATHURA DESHAN  REVIEW     100.0%      0.0%   
4                     K. G. N. PRIYADARSHANI     HIT      98.1%      0.0%   

  SCORE_NAT SCORE_ID  risk_score confidence   customer_id     source_id  \
0    100.0%   100.0%      1.0000    CERTAIN  200221302925  200221302925   
1    100.0%     0.0%      0.6983       HIGH                   