# Sample Example: SPLink

We provide two libraries for record linking, RecordLinkage and SPLink. Follow the RecordLinkage notebook [here](https://www.antigranular.com/notebooks/651a938d0f1e51b4fa0b651a).

SPLink is a Python package designed for probabilistic record linkage, and like Record Linkage, plays a crucial part in linking records and deduplicating datasets.

Participants can use SPLink to predict which rows link together and then further cluster these connections to generate an Individual ID. This can prove especially useful when unique identifiers are missing or differ significantly across datasets.


## Getting Started: Setting Up the Environment

In [52]:
!pip install antigranular



In [53]:
import antigranular as ag
session = ag.login(<client_id>, <client_secret>, competition = "Harvard OpenDP Hackathon")

Dataset "Flight Company Dataset" loaded to the kernel as [92mflight_company_dataset[0m

Dataset "Health Organisation Dataset" loaded to the kernel as [92mhealth_organisation_dataset[0m

Connected to Antigranular server session id: a86cdd33-9a59-4f1e-9b22-723673d6dc3c, the session will time out if idle for 25 minutes
Cell magic '%%ag' registered successfully, use `%%ag` in a notebook cell to execute your python code on Antigranular private python server
🚀 Everything's set up and ready to roll!


### Importing the Datasets

***In this competition we are provided with two datasets:***

The airline companies have information about passengers and their travel dates (`flight_company_dataset_for_sandbox`), and the national health organisation has records of patients who did the COVID test and whether their result was positive or negative (`health_organisation_dataset_for_sandbox`).

These are already provided within the AG environment.

In [54]:
%%ag
health = health_organisation_dataset
flight = flight_company_dataset

The **parse_date()** function standardizes diverse date formats within Flight and Health datasets. It transforms various date representations, such as 'DD-MM-YY', 'MM/YYYY/DD', 'YYYY.MM.DD', etc., into a consistent 'YYYY-MM-DD' format.

This process employs regular expressions and a month dictionary to accurately identify and convert dates. The function then applies these standardized dates to specific columns in the datasets to ensure consistency and streamline analysis.

In [55]:
%%ag
import re

def parse_date(date_str: str) ->str:
    months = {
        'Jan': '01', 'Feb': '02', 'Mar': '03', 'Apr': '04', 'May': '05', 'Jun': '06',
        'Jul': '07', 'Aug': '08', 'Sep': '09', 'Oct': '10', 'Nov': '11', 'Dec': '12',
        'January': '01', 'February': '02', 'March': '03', 'April': '04', 'May': '05',
        'June': '06', 'July': '07', 'August': '08', 'September': '09', 'October': '10',
        'November': '11', 'December': '12',
        'Sept': '09', 'Sep': '09',  # Added 'Sept' and 'Sep'
    }

    separators = [' ', '/', '.', '-']
    thirty_day_months = {4, 6, 9, 11}
    # Check for formats like 'Oct-12-02 , Oct.12.02 , Oct 12 02 , Oct/12/02'
    for separator in separators:
        parts = re.split(f'[{separator}]', date_str)
        if len(parts) == 3:
            month = months.get(parts[0])
            if month:
                year = parts[2]
                if len(year) == 2:
                    if int(year) < 21:
                        year = '20' + year
                    else:
                        year = '19' + year
                day = int(parts[1])
                if (
                    1 <= day <= 31 and
                    (int(month) != 2 or 1 <= day <= 28) and  # February
                    (int(month) not in thirty_day_months or 1 <= day <= 30)  # Months with 30 days
                ):
                  return f"{year}-{month}-{parts[1].zfill(2)}"

    # Check for formats like 'Oct-1978-12 , Oct.1978.12 , Oct 1978 12 , Oct/1978/12'
    for separator in separators:
        parts = re.split(f'[{separator}]', date_str)
        if len(parts) == 3:
            month = months.get(parts[0])
            if month:
                year = parts[1]
                if len(year) == 2:
                    if int(year) < 21:
                        year = '20' + year
                    else:
                        year = '19' + year
                day = int(parts[2])
                if (
                    1 <= day <= 31 and
                    (int(month) != 2 or 1 <= day <= 28) and  # February
                    (int(month) not in thirty_day_months or 1 <= day <= 30)  # Months with 30 days
                ):
                  return f"{year}-{month}-{parts[2].zfill(2)}"

    # Check for formats like '01.Sep.1990 , 01/Sep/1990 , 01 Sep 1990 , 01-Sep-1990'
    for separator in separators:
        parts = re.split(f'[{separator}]', date_str)
        if len(parts) == 3:
            month = months.get(parts[1])
            if month:
                year = parts[2]
                if len(year) == 2:
                    if int(year) < 21:
                        year = '20' + year
                    else:
                        year = '19' + year
                day = int(parts[0])
                if (
                    1 <= day <= 31 and
                    (int(month) != 2 or 1 <= day <= 28) and  # February
                    (int(month) not in thirty_day_months or 1 <= day <= 30)  # Months with 30 days
                ):
                  return f"{year}-{month}-{parts[0].zfill(2)}"

    # Check for formats like '1990.Sep.01 , 1990/Sep/01 , 1990 Sep 01 , 1990-Sep-01'
    for separator in separators:
        parts = re.split(f'[{separator}]', date_str)
        if len(parts) == 3:
            month = months.get(parts[1])
            if month:
                year = parts[0]
                if len(year) == 2:
                    if int(year) < 21:
                        year = '20' + year
                    else:
                        year = '19' + year
                day = int(parts[2])
                if (
                    1 <= day <= 31 and
                    (int(month) != 2 or 1 <= day <= 28) and  # February
                    (int(month) not in thirty_day_months or 1 <= day <= 30)  # Months with 30 days
                ):
                  return f"{year}-{month}-{parts[2].zfill(2)}"

    # Check for formats like '1990.01.Sept , 1990/01/Sept , 1990 01 Sept , 1990-01-Sept'
    for separator in separators:
        parts = re.split(f'[{separator}]', date_str)
        if len(parts) == 3:
            month = months.get(parts[2])
            if month:
                year = parts[0]
                if len(year) == 2:
                    if int(year) < 21:
                        year = '20' + year
                    else:
                        year = '19' + year
                day = int(parts[1])
                if (
                    1 <= day <= 31 and
                    (int(month) != 2 or 1 <= day <= 28) and  # February
                    (int(month) not in thirty_day_months or 1 <= day <= 30)  # Months with 30 days
                ):
                  return f"{year}-{month}-{parts[1].zfill(2)}"
    # Check for formats like '01.1990.Sept , 01/1990/Sept , 01 1990 Sept , 01-1990-Sept'
    for separator in separators:
        parts = re.split(f'[{separator}]', date_str)
        if len(parts) == 3:
            month = months.get(parts[2])
            if month:
                year = parts[1]
                if len(year) == 2:
                    if int(year) < 21:
                        year = '20' + year
                    else:
                        year = '19' + year
                day = int(parts[0])
                if (
                    1 <= day <= 31 and
                    (int(month) != 2 or 1 <= day <= 28) and  # February
                    (int(month) not in thirty_day_months or 1 <= day <= 30)  # Months with 30 days
                ):
                  return f"{year}-{month}-{parts[0].zfill(2)}"

    # Check for formats like '10-12-02 , 10.12.02 , 10 12 02 , 10/12/02'
    for separator in separators:
        parts = re.split(f'[{separator}]', date_str)
        if len(parts) == 3:
            month = parts[0]
            if 1 <= int(month) <= 12:
                year = parts[2]
                if len(year) == 2:
                    if int(year) < 21:
                        year = '20' + year
                    else:
                        year = '19' + year
                day = int(parts[1])
                if (
                    1 <= day <= 31 and
                    (int(month) != 2 or 1 <= day <= 28) and  # February
                    (int(month) not in thirty_day_months or 1 <= day <= 30)  # Months with 30 days
                ):
                  return f"{year}-{month}-{parts[1].zfill(2)}"

    # Check for formats like '10-1978-12 , 10.1978.12 , 10 1978 12 , 10/1978/12'
    for separator in separators:
        parts = re.split(f'[{separator}]', date_str)
        if len(parts) == 3:
            month = parts[0]
            if 1 <= int(month) <= 12:
                year = parts[1]
                if len(year) == 2:
                    if int(year) < 21:
                        year = '20' + year
                    else:
                        year = '19' + year
                day = int(parts[2])
                if (
                    1 <= day <= 31 and
                    (int(month) != 2 or 1 <= day <= 28) and  # February
                    (int(month) not in thirty_day_months or 1 <= day <= 30)  # Months with 30 days
                ):
                  return f"{year}-{month}-{parts[2].zfill(2)}"

    # Check for formats like '01.09.1990 , 01/09/1990 , 01 09 1990 , 01-09-1990'
    for separator in separators:
        parts = re.split(f'[{separator}]', date_str)
        if len(parts) == 3:
            month = parts[1]
            if 1 <= int(month) <= 12:
                year = parts[2]
                if len(year) == 2:
                    if int(year) < 21:
                        year = '20' + year
                    else:
                        year = '19' + year
                day = int(parts[0])
                if (
                    1 <= day <= 31 and
                    (int(month) != 2 or 1 <= day <= 28) and  # February
                    (int(month) not in thirty_day_months or 1 <= day <= 30)  # Months with 30 days
                ):
                  return f"{year}-{month}-{parts[0].zfill(2)}"

    # Check for formats like '1990.09.01 , 1990/09/01 , 1990 09 01 , 1990-09-01'
    for separator in separators:
        parts = re.split(f'[{separator}]', date_str)
        if len(parts) == 3:
            month = parts[1]
            if 1 <= int(month) <= 12:
                year = parts[0]
                if len(year) == 2:
                    if int(year) < 21:
                        year = '20' + year
                    else:
                        year = '19' + year
                day = int(parts[2])
                if (
                    1 <= day <= 31 and
                    (int(month) != 2 or 1 <= day <= 28) and  # February
                    (int(month) not in thirty_day_months or 1 <= day <= 30)  # Months with 30 days
                ):
                  return f"{year}-{month}-{parts[2].zfill(2)}"

    # Check for formats like '1990.01.09 , 1990/01/09 , 1990 01 09 , 1990-01-09'
    for separator in separators:
        parts = re.split(f'[{separator}]', date_str)
        if len(parts) == 3:
            month = parts[2]
            if 1 <= int(month) <= 12:
                year = parts[0]
                if len(year) == 2:
                    if int(year) < 21:
                        year = '20' + year
                    else:
                        year = '19' + year
                day = int(parts[1])
                if (
                    1 <= day <= 31 and
                    (int(month) != 2 or 1 <= day <= 28) and  # February
                    (int(month) not in thirty_day_months or 1 <= day <= 30)  # Months with 30 days
                ):
                  return f"{year}-{month}-{parts[1].zfill(2)}"
    # Check for formats like '01.1990.09 , 01/1990/09 , 01 1990 09 , 01-1990-09'
    for separator in separators:
        parts = re.split(f'[{separator}]', date_str)
        if len(parts) == 3:
            month = parts[2]
            if 1 <= int(month) <= 12:
                year = parts[1]
                if len(year) == 2:
                    if int(year) < 21:
                        year = '20' + year
                    else:
                        year = '19' + year
                day = int(parts[0])
                if (
                    1 <= day <= 31 and
                    (int(month) != 2 or 1 <= day <= 28) and  # February
                    (int(month) not in thirty_day_months or 1 <= day <= 30)  # Months with 30 days
                ):
                  return f"{year}-{month}-{parts[0].zfill(2)}"

    return f"Invalid Input Error: Could not parse string '{date_str}' according to format specifier '%Y-%m-%d'"


# Example usage:
date_str = "10-78-02"
parsed_date = parse_date(date_str)
ag_print(parsed_date)

# Standardize date format in flight dataset
flight['passenger_date_of_birth'] = flight['passenger_date_of_birth'].map(parse_date)
# Standardize date format in health dataset
health['patient_date_of_birth'] = health['patient_date_of_birth'].map(parse_date)


1978-10-02



The **normalize_phone_number** function tidies up phone numbers by removing non-numeric characters and leading zeros. It's applied to the 'passenger_phone_number' in the Flight dataset and 'patient_phone_number' in the Health dataset to standardize phone number formats for consistency in analysis.

In [56]:
%%ag
import re

def normalize_phone_number(phone:str)->str:
    # Remove non-numeric characters and spaces
    phone = re.sub(r'\D', '', phone)
    return phone

flight['passenger_phone_number'] = flight['passenger_phone_number'].map(normalize_phone_number)
# Standardize date format in health dataset
health['patient_phone_number'] = health['patient_phone_number'].map(normalize_phone_number)

## Pre Processing the Data

In [57]:
%%ag
import numpy as np
import op_pandas as opd
import pandas as pd

### **Splink requires that you clean your data and assign unique IDs to rows before linking**

* **Unique IDs:** Each input dataset must have a unique ID column, which is unique within the dataset. By default, Splink assumes this column will be called unique_id.
* **Conformant input datasets:** Input datasets must be conformant, meaning they share the same column names and data formats.
* **Cleaning:** Ensure data consistency by cleaning your data. This process includes standardising date formats, matching text case, and handling invalid data.


### Creating Unique IDs

Since the number of records in both the datasets is public information, we can use these to create the `unique_id` columns.

For this, we use `numpy.arange`, which generates evenly spaced values within a given interval.

In [58]:
%%ag
num_health = 71707 #taken from competition page
num_flight = 85242 # taken from competition page

unique_id_health = opd.PrivateSeries(pd.Series(np.arange(num_health)))
unique_id_flight = opd.PrivateSeries(pd.Series(np.arange(num_flight)))

In [59]:
%%ag
health['unique_id'] = unique_id_health
flight['unique_id'] = unique_id_flight

### Conforming Input Datasets

In order to conform the various column names, let us examine what they are.

In [60]:
%%ag
ag_print("Health Columns:")
ag_print(health.columns)
ag_print("Flight Columns:")
ag_print(flight.columns)

Health Columns:
['patient_firstname', 'patient_lastname', 'patient_date_of_birth', 'patient_phone_number', 'patient_email_address', 'covidtest_date', 'covidtest_result', 'patient_address', 'unique_id']
Flight Columns:
['flight_number', 'flight_date', 'flight_from', 'flight_to', 'passenger_firstname', 'passenger_lastname', 'passenger_date_of_birth', 'passenger_phone_number', 'passenger_email_address', 'unique_id']



As we can see, the column names are not the same in both of the datasets. For example, first name in the health dataset is `patient_firstname` and in the flight dataset it is `passenger_firstname`.

We also want to link on basis of dates (`covidtest_date` in Health Dataset and `flight_date` in Flight Dataset).

Hence, we will write a function which makes the same columns of the same name.

In [61]:
%%ag
def conform_columns(df: opd.PrivateDataFrame) -> opd.PrivateDataFrame:
    final_columns = []
    for col in df.columns:
        if "firstname" in col:  # converting patient_firstname and passenger_firstname -> firstname
            final_columns.append("firstname")
            df["firstname"] = df[col]
        elif "lastname" in col:  # converting patient_lastname and passenger_lastname -> lastname
            final_columns.append("lastname")
            df["lastname"] = df[col]
        elif "date_of_birth" in col: # converting patient_date_of_birth and ppassenger_date_of_birth -> date_of_birth
            final_columns.append("date_of_birth")
            df["date_of_birth"] = df[col]
        elif "covidtest_date" in col: # converting covidtest_date and flight_date -> date
            final_columns.append("date")
            df["date"] = df[col]
        elif "flight_date" in col:
            final_columns.append("date")
            df["date"] = df[col]
        elif "phone_number" in col:
            final_columns.append("phone_number")
            df["phone_number"] = df[col]
        elif "email_address" in col:
            final_columns.append("email_address")
            df["email_address"] = df[col]
        else:
            final_columns.append(col)

    df = df[final_columns]
    return df

health = conform_columns(health)
flight = conform_columns(flight)

We only need the records where covidtest_result is positive, so we will extract them.

In [62]:
%%ag
# Lets remove those passenger records who tested negative.
health['covidtest_result'] = health['covidtest_result'].where(health['covidtest_result'] == 'positive')
health = health.dropna()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self.df[key] = value._series



Checking out the columns again, we can see that they are conformant.

In [63]:
%%ag
ag_print("Health Columns:")
ag_print(health.columns)
ag_print("Flight Columns:")
ag_print(flight.columns)

Health Columns:
['firstname', 'lastname', 'date_of_birth', 'phone_number', 'email_address', 'date', 'covidtest_result', 'patient_address', 'unique_id']
Flight Columns:
['flight_number', 'date', 'flight_from', 'flight_to', 'firstname', 'lastname', 'date_of_birth', 'phone_number', 'email_address', 'unique_id']



Now we can see that the column name is conformant. But there are some columns which we don't need for our analysis, so let us remove those.

In [64]:
%%ag
health_link = health[['firstname', 'lastname', 'date_of_birth', 'date','phone_number','email_address','unique_id']]
flight_link = flight[['firstname', 'lastname', 'date_of_birth', 'date','phone_number','email_address','unique_id']]

## Comparisons

A key feature of Splink is the ability to customise how record comparisons are made - that is, how similarity is defined for different data types.

By tailoring the definitions of similarity, linking models are more effectively able to distinguish beteween different gradations of similarity, leading to more accurate data linking models.

For more information on comparisons, follow [this link](https://moj-analytical-services.github.io/splink/topic_guides/comparisons/customising_comparisons.html).

Here, we will create 4 comparisons:

* Fuzzy matching of `firstname`
* Fuzzy matching of `lastname`
* Fuzzy matching of `phonenumbers`
* Difference of `date` column to be within 14 days
* Fuzzy matching of `date_of_birth` column
*Fuzzy matching of `email_address`



In [67]:
%%ag
import op_splink.duckdb.comparison_template_library as ctl
from op_splink.duckdb.blocking_rule_library import block_on

bespoke_email_comparison = ctl.email_comparison(
    "email_address",
    jaro_winkler_thresholds=[],
    damerau_levenshtein_thresholds=[3],
    include_username_match_level=False,
    include_domain_match_level=True,
    invalid_emails_as_null=True,
)

first_name_comparison = ctl.name_comparison("firstname")
last_name_comparison = ctl.name_comparison("lastname", jaro_winkler_thresholds=[0.8])
phone_number_comparison=ctl.name_comparison("phone_number",damerau_levenshtein_thresholds = [])
date_difference = ctl.date_comparison("date", cast_strings_to_date = True, include_exact_match_level = False, damerau_levenshtein_thresholds = [], datediff_thresholds = [14], datediff_metrics = ["day"])
date_of_birth_comparison = ctl.date_comparison("date_of_birth", cast_strings_to_date = True)

SPLink uses SQL to find the comparisons. We can use `human_readable_description` method to check the comparison levels and the sql rules for the same.

In [40]:
%%ag
ag_print(first_name_comparison.human_readable_description)

Comparison 'Exact match vs. Firstname within levenshtein threshold 1 vs. Firstname within damerau-levenshtein threshold 1 vs. Firstname within jaro_winkler thresholds 0.9, 0.8 vs. anything else' of "firstname".
Similarity is assessed using the following ComparisonLevels:
    - 'Null' with SQL rule: "firstname_l" IS NULL OR "firstname_r" IS NULL
    - 'Exact match firstname' with SQL rule: "firstname_l" = "firstname_r"
    - 'Damerau_levenshtein <= 1' with SQL rule: damerau_levenshtein("firstname_l", "firstname_r") <= 1
    - 'Jaro_winkler_similarity >= 0.9' with SQL rule: jaro_winkler_similarity("firstname_l", "firstname_r") >= 0.9
    - 'Jaro_winkler_similarity >= 0.8' with SQL rule: jaro_winkler_similarity("firstname_l", "firstname_r") >= 0.8
    - 'All other comparisons' with SQL rule: ELSE




In [44]:
%%ag
ag_print(last_name_comparison.human_readable_description)

Comparison 'Exact match vs. Lastname within levenshtein threshold 1 vs. Lastname within damerau-levenshtein threshold 1 vs. Lastname within jaro_winkler threshold 0.8 vs. anything else' of "lastname".
Similarity is assessed using the following ComparisonLevels:
    - 'Null' with SQL rule: "lastname_l" IS NULL OR "lastname_r" IS NULL
    - 'Exact match lastname' with SQL rule: "lastname_l" = "lastname_r"
    - 'Damerau_levenshtein <= 1' with SQL rule: damerau_levenshtein("lastname_l", "lastname_r") <= 1
    - 'Jaro_winkler_similarity >= 0.8' with SQL rule: jaro_winkler_similarity("lastname_l", "lastname_r") >= 0.8
    - 'All other comparisons' with SQL rule: ELSE




In [45]:
%%ag
ag_print(phone_number_comparison.human_readable_description)

Comparison 'Exact match vs. Phone_Number within jaro_winkler thresholds 0.9, 0.8 vs. anything else' of "phone_number".
Similarity is assessed using the following ComparisonLevels:
    - 'Null' with SQL rule: "phone_number_l" IS NULL OR "phone_number_r" IS NULL
    - 'Exact match phone_number' with SQL rule: "phone_number_l" = "phone_number_r"
    - 'Jaro_winkler_similarity >= 0.9' with SQL rule: jaro_winkler_similarity("phone_number_l", "phone_number_r") >= 0.9
    - 'Jaro_winkler_similarity >= 0.8' with SQL rule: jaro_winkler_similarity("phone_number_l", "phone_number_r") >= 0.8
    - 'All other comparisons' with SQL rule: ELSE




In [47]:
%%ag
ag_print(date_difference.human_readable_description)

Comparison 'Dates within the following threshold Day(s): 14 vs. anything else' of "date".
Similarity is assessed using the following ComparisonLevels:
    - 'Null' with SQL rule: "date_l" IS NULL OR "date_r" IS NULL
    - 'Within 14 days' with SQL rule: 
            abs(date_diff('day',
                strptime("date_l", '%Y-%m-%d'),
                strptime("date_r", '%Y-%m-%d'))
                ) <= 14
        
    - 'All other comparisons' with SQL rule: ELSE




In [48]:
%%ag
ag_print(date_of_birth_comparison.human_readable_description)

Comparison 'Exact match vs. Date_Of_Birth within damerau-levenshtein threshold 1 vs. Dates within the following thresholds Month(s): 1, Year(s): 1, Year(s): 10 vs. anything else' of "date_of_birth".
Similarity is assessed using the following ComparisonLevels:
    - 'Null' with SQL rule: "date_of_birth_l" IS NULL OR "date_of_birth_r" IS NULL
    - 'Exact match' with SQL rule: "date_of_birth_l" = "date_of_birth_r"
    - 'Damerau_levenshtein <= 1' with SQL rule: damerau_levenshtein("date_of_birth_l", "date_of_birth_r") <= 1
    - 'Within 1 month' with SQL rule: 
            abs(date_diff('month',
                strptime("date_of_birth_l", '%Y-%m-%d'),
                strptime("date_of_birth_r", '%Y-%m-%d'))
                ) <= 1
        
    - 'Within 1 year' with SQL rule: 
            abs(date_diff('year',
                strptime("date_of_birth_l", '%Y-%m-%d'),
                strptime("date_of_birth_r", '%Y-%m-%d'))
                ) <= 1
        
    - 'Within 10 years' with SQL ru

In [68]:
%%ag
ag_print(bespoke_email_comparison.human_readable_description)

Comparison 'Exact match vs. Fuzzy Email within damerau_levenshtein thresholds  vs. Fuzzy Username within levenshtein thresholds  vs. Domain-only match vs.anything else' of "email_address".
Similarity is assessed using the following ComparisonLevels:
    - 'Null' with SQL rule: 
        regexp_extract("email_address_l", '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+[.][a-zA-Z]{2,}$')
     IS NULL OR 
        regexp_extract("email_address_r", '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+[.][a-zA-Z]{2,}$')
     IS NULL OR
                      
        regexp_extract("email_address_l", '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+[.][a-zA-Z]{2,}$')
    =='' OR 
        regexp_extract("email_address_r", '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+[.][a-zA-Z]{2,}$')
     ==''
    - 'Exact match email_address' with SQL rule: "email_address_l" = "email_address_r"
    - 'Damerau_levenshtein email_address <= 3' with SQL rule: damerau_levenshtein("email_address_l", "email_address_r") <= 3
    - 'Damerau_levenshtein Username <= 3' with SQL

## Creating a Linker

Using DuckDBLinker.

In [69]:
%%ag
from op_splink.duckdb.linker import DuckDBLinker

settings = {
                "link_type": "link_only",
                "comparisons":[
                    first_name_comparison,
                    last_name_comparison,
                    date_difference,
                    date_of_birth_comparison,
                    bespoke_email_comparison,
                ],
                "blocking_rules_to_generate_predictions": [
                    block_on("firstname"),
                    block_on("lastname"),
                ]
}
linker = DuckDBLinker([health_link, flight_link], settings)

  em_rules = [_exact_match(col) for col in col_names]




In other words, this setting dictionary says:

* We are performing a link_only (the other options are dedupe_only, or link_and_dedupe, which may be used if there are multiple input datasets).
* When comparing records, we will use information from the `firstname`, `lastname`, `date`, and `date_of_birth` columns to compute a match score.
* The blocking_rules_to_generate_predictions states that we will only check for duplicates amongst records where either the firstname or lastname is identical.

## Training the Model

Now that we have specified our linkage model, we need to estimate the u and m parameters which are used to train the Fellegi Sunter model.

The u values are the proportion of records falling into each ComparisonLevel amongst truly non-matching records.

We estimate u using the estimate_u_using_random_sampling method.

In [70]:
%%ag
linker.estimate_u_using_random_sampling(max_pairs=1e6)

----- Estimating u probabilities using random sampling -----


Estimated u probabilities using random sampling


Your model is not yet fully trained. Missing estimates for:
    - firstname (no m values are trained).
    - lastname (no m values are trained).
    - date (no m values are trained).
    - date_of_birth (no m values are trained).
    - email_address (no m values are trained).



m is the trickiest of the parameters to estimate, because we have to have some idea of what the true matches are.

If we have labels, we can directly estimate it. However, if we do not have labelled data, the m parameters can be estimated using an iterative maximum likelihood approach called the **Expectation Maximisation.**

Each estimation pass requires the user to configure an estimation blocking rule to reduce the number of record comparisons generated to a manageable level.

In our first estimation pass, we block on first_name, meaning we will generate all record comparisons that have first_name exactly equal.

In [71]:
%%ag
linker.estimate_parameters_using_expectation_maximisation(block_on("firstname"))

  em_rules = [_exact_match(col) for col in col_names]

----- Starting EM training session -----


Estimating the m probabilities of the model by blocking on:
l."firstname" = r."firstname"

Parameter estimates will be made for the following comparison(s):
    - lastname
    - date
    - date_of_birth
    - email_address

Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: 
    - firstname



EM Converged successfully


Your model is not yet fully trained. Missing estimates for:
    - firstname (no m values are trained).



In the second estimation pass, we block on date_of_birth. This allows us to estimate parameters for the first_name and the surname comparisons.

In [72]:
%%ag
linker.estimate_parameters_using_expectation_maximisation(block_on("date_of_birth"))

  em_rules = [_exact_match(col) for col in col_names]

----- Starting EM training session -----


Estimating the m probabilities of the model by blocking on:
l."date_of_birth" = r."date_of_birth"

Parameter estimates will be made for the following comparison(s):
    - firstname
    - lastname
    - date
    - email_address

Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: 
    - date_of_birth



EM Converged successfully


Your model is fully trained. All comparisons have at least one estimate for their m and u values



## Predicting Results

Now, we need to find the linked dataset. This can be done using the `predict_and_create_linked_df`, which will take a threshold parameter, to extract all the linked records with probability more than the threshold.
Getting **ag_timeout error** here ,

In [73]:
%%ag
linked_df = linker.predict(0.9)
# ag_print(linked_df.columns)

Error : AG execution timeout.


Counting the number of records within this linked PrivateDataFrame can be done as follows:

### Finding Out Which Flights Should Be Notified

To find out which flights should be notified, we can use the following algorithm:

* Find all the unique IDs of flight records in the linked dataset. This will include all flights where a COVID-positive passenger was identified in the subsequent 14 days. Let us call this set of unique ids `unique_ids`.
* Create a column within the flight dataset, where if `unique_id` of this record belongs to `unique_ids`, the value will be True, else False. This can be done with `isin` method in `op_pandas`. Let us call this column `notify`.
* Extract the `flight_number` of all the flights where `notify` is true.

In [None]:
%%ag
unique_ids = linked_df['unique_id_r']

flights_to_notify_id = flight['unique_id'].isin(unique_ids)
flight = flight[['unique_id', 'flight_number']]
flight['notify'] = flights_to_notify_id

flight = flight.where(flight['notify'] == True)
flights_to_notify = flight[['flight_number']]

Now, we will submit the predictions using `submit_predictions` method preloaded within the AG environment.

In [None]:
%%ag
submit_predictions(flights_to_notify)

Now that we're all done, we use this line to close our work session neatly. It's like turning off the lights when you leave a room – it’s a good habit to wrap things up properly!

In [None]:
session.terminate_session()