In [None]:
# General use
import re
import numpy as np
import pandas as pd
import matplotlib.pylab as plt
import seaborn as sns

# Notebook behavior
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
seed = 19  # Replicability

In [None]:
# Get env
from src.lib.utils.env_checker import running_environment
env = running_environment()

In [None]:
# Load data
fname = "cleaned_events.parquet"
if env=="sagemaker":
    df = pd.read_parquet(f"s3://news-s3/data/cleaned/{fname}")
else:
    df = pd.read_parquet(f"../../data/{fname}").fillna(np.nan)
df.shape
df.head()

In [None]:
# Limit to 14 days
lim = df["SQLDATE"].drop_duplicates().sort_values(ascending=False)[14]
df = df[df["SQLDATE"]>=lim]
df = df.drop(columns=["SQLDATE"], axis=1)
df.shape

In [None]:
# Check dtypes
df.info()

In [None]:
# Describe
df.describe()
df.describe(include="O")

# 1. Data prep prerequisites

In [None]:
# Unique IDs
df = df.reset_index()

In [None]:
# Cleaning- ensure values are normalized
# Capitalize all str cols
str_cols = df.select_dtypes(include="O")
for str_col in str_cols:
    df[str_col] = df[str_col].str.strip().str.upper().str.normalize("NFKD").str.encode("ascii", errors="ignore").str.decode("utf-8")

In [None]:
# Null-handling
# Note: Splink treats null values differently from empty strings, so using true nulls guarantees proper matching across datasets.
df.isna().mean()

# 2. Exploratory analysis

In [None]:
# Initialise the linker
from splink.duckdb.linker import DuckDBLinker
settings = {"link_type": "dedupe_only", "unique_id_column_name":"GLOBALEVENTID"}
linker = DuckDBLinker(df, settings)

In [None]:
# Check for sparsity
import altair as alt
linker.missingness_chart()

In [None]:
# Profile
linker.profile_columns(top_n=10, bottom_n=10)

# 3. Blocking

In [None]:
from splink.duckdb.blocking_rule_library import block_on
blocking_rules = [
    block_on(["QuadClass", "Actor1CountryCode", "Actor2CountryCode"]), 
    block_on(["Actor1Name", "Actor2Name"]), 
]

{blocking_rule.blocking_rule_sql:linker.count_num_comparisons_from_blocking_rule(blocking_rule) for blocking_rule in blocking_rules}

In [None]:
# How much of initial comparison space will now be compared
n_base_comparisons = (df.shape[0]**2) - df.shape[0]  # Without blocking rule
n_comparisons = np.sum([linker.count_num_comparisons_from_blocking_rule(blocking_rule) for blocking_rule in blocking_rules])
np.round(n_comparisons / n_base_comparisons, 4)

In [None]:
# Update settings
settings["blocking_rules_to_generate_predictions"] = blocking_rules

# 4. Estimating Model Parameters

In [None]:
# Not levenshtein because we don't expect mispellings
from splink.duckdb import comparison_library as cl
from splink.duckdb import comparison_template_library as ctl
comparisons = [
    # ctl.date_comparison("EventDateTime", datediff_thresholds=[7], datediff_metrics=["day"])
    cl.datediff_at_thresholds("EventDateTime", date_metrics=["day"], date_thresholds=[7]), 
    cl.jaro_winkler_at_thresholds("Actor1Geo_FullName", 0.8, term_frequency_adjustments=True), 
    cl.jaro_winkler_at_thresholds("Actor2Geo_FullName", 0.8, term_frequency_adjustments=True), 
    ctl.name_comparison("Actor1Name"), 
    ctl.name_comparison("Actor2Name"), 
    cl.levenshtein_at_thresholds("Actor1Code", 3, term_frequency_adjustments=True), 
    cl.levenshtein_at_thresholds("Actor2Code", 3, term_frequency_adjustments=True), 
    cl.exact_match("CAMEOEvent", term_frequency_adjustments=True), 
]
settings["comparisons"] = comparisons

In [None]:
# Update model-based settings
settings["retain_matching_columns"] = True
settings["retain_intermediate_calculation_columns"] = True
settings["max_iterations"] = 20
settings["em_convergence"] = 0.001

In [None]:
# Instantiate new linker based on updated settings
linker = DuckDBLinker("df", settings)
linker.estimate_probability_two_random_records_match(
    blocking_rules,
    recall=0.8,
)
linker.estimate_u_using_random_sampling(max_pairs=1e9)

# Note: IndexError is a bug (https://github.com/moj-analytical-services/splink/issues/2076#issuecomment-2007755672)
# Sol'n: cona install sqlglot=22.5.0
feature = "GoldsteinScore"
# linker.estimate_parameters_using_expectation_maximisation(
#     f"abs(l.{feature} - r.{feature}) <= {np.round(df[feature].std()/2, 4)}", 
# )
linker.estimate_parameters_using_expectation_maximisation(
    block_on(["Actor1Name", "QuadClass"]), 
    estimate_without_term_frequencies=False,
)

# 5. Predicting results

In [None]:
# Predict
threshold = 0.95
pred = linker.predict(threshold_match_probability=threshold)
pred_df = pred.as_pandas_dataframe()

clusters = linker.cluster_pairwise_predictions_at_threshold(
    pred, threshold_match_probability=threshold
).as_pandas_dataframe()

n_pairwise_comparisons = (
    linker.count_num_comparisons_from_blocking_rules_for_prediction(
        pred
    ).as_pandas_dataframe(limit=None)
).count_of_edges.sum()
n_base_comparisons, n_comparisons, n_pairwise_comparisons

In [None]:
# Check model parameters
linker.m_u_parameters_chart()

Ref: https://www.robinlinacre.com/fast_deduplication/

clusters[clusters["cluster_id"]==clusters["cluster_id"].value_counts().index[0]]

import splink.duckdb.comparison_library as cl
import splink.duckdb.comparison_template_library as ctl

settings = {
    "link_type": "dedupe_only",
    "unique_id_column_name":"GLOBALEVENTID", 
    "blocking_rules_to_generate_predictions": [
        block_on(["QuadClass", "Actor1CountryCode", "Actor2CountryCode"]), 
    ],
    "comparisons": [
        ctl.name_comparison("Actor1Name", term_frequency_adjustments=True),
        ctl.name_comparison("Actor2Name", term_frequency_adjustments=True),
        cl.levenshtein_at_thresholds("Actor1Geo_FullName", [1, 2]),
        cl.datediff_at_thresholds("EventDateTime", ), 
        cl.jaro_at_thresholds("full_name", [0.9, 0.7], term_frequency_adjustments=True),
        cl.levenshtein_at_thresholds("dob", [1, 2]),
        cl.levenshtein_at_thresholds("postcode_fake", 2),
        cl.jaro_winkler_at_thresholds("birth_place", 0.9, term_frequency_adjustments=True),
        cl.exact_match("occupation",  term_frequency_adjustments=True),
    ],           

    'comparison_levels': [
    {
        'sql_condition': '"date_of_birth_l" IS NULL OR "date_of_birth_r" IS NULL',
        'label_for_charts': 'Null',
        'is_null_level': True
    },
    "comparisons": [
        ctl.date_comparison("EventDateTime", cast_strings_to_date=True, )
        ctl.name_comparison("first_name"),
        ctl.name_comparison("surname"),
        ctl.date_comparison("dob", cast_strings_to_date=True),
        cl.exact_match("city", term_frequency_adjustments=True),
        ctl.email_comparison("email", include_username_fuzzy_level=False),
    ],"retain_matching_columns": True,
    "retain_intermediate_calculation_columns": True,
}

linker = DuckDBLinker(df, settings)