In [2]:
import re
import uuid

# TODO: #92 Make orgs classification script into more well-defined pipeline
import numpy as np
import pandas as pd
import requests
from sklearn.feature_extraction.text import TfidfVectorizer

from utils.constants import BASE_FILEPATH, organizations_settings, organizations_blocking
from utils.linkage import standardize_corp_names, splink_dedupe
import splink.duckdb.comparison_library as cl
import splink.duckdb.comparison_template_library as ctl



In [3]:
aggregated_classification_csv = (
    BASE_FILEPATH / "data" / "classification" / "merged_cleaned_company_classification.csv"
)

In [4]:
data = pd.read_csv(aggregated_classification_csv)
data.head()

Unnamed: 0,company_name,stock_symbol,legal_name,address,city,state,zipcode,area_code,ABI,primary_SIC_code,SIC6_description,primary_NAICS_code,NAICS8_description,parent_company_ABI,classification,unique_id,parent_company_unique_id
0,3R PETROLEUM OLEO E GAS,RRRP3,3R Petroleum Oleo E Gas SA,,,,,,,,,,,,f,48438d26-f3e6-4cc0-9f63-97f2acd2d394,
1,88 ENERGY,88E,88 Energy Ltd,,,,,,,,,,,,f,3798f9cb-0f87-4dac-b7d1-f63546fd96d3,
2,A.B.P. NOCIVELLI,ABP,A.b.p. Nocivelli SpA,,,,,,,,,,,,f,7860035e-e584-40df-9f6d-f5c5128fc6c0,
3,A2A,A2A,A2A SpA,,,,,,,,,,,,f,c8f28f1d-3593-4a5d-beeb-5bfea6664517,
4,ABRAJ ENERGY SERVS,ABRJ,Abraj Energy Services,,,,,,,,,,,,f,30853f3a-8960-477d-976f-a59d8ca69dee,


In [5]:
# Initialise the linker, passing in the input dataset(s)
from splink.duckdb.linker import DuckDBLinker
linker = DuckDBLinker(data)

In [6]:
linker.missingness_chart()


In [7]:
linker.profile_columns(top_n=10, bottom_n=5)




* Lots of 770 area code --> use term_frequency_adjustments to weigh this differently
* company_name is a good linking variable along with zipcode

In [8]:

def convert_duplicates_to_dict(df_with_matches: pd.DataFrame) -> None:
    """Map each uuid to all other uuids for which it has been deemed a match

    Given a dataframe where the uuids of all rows deemed similar are stored in a
    list and all but the first row of each paired uuid is dropped, this function
    maps the matched uuids to a single uuid.

    Args:
        df_with_matches: A pandas df containing a column called 'duplicated',
            where each row is a list of all uuids deemed a match. In each list,
            all uuids but the first have their rows already dropped.

    Returns:
        None. However it outputs a file to the output directory, with 2
        columns. The first lists all the uuids in df, and is labeled
        'original_uuids.' The 2nd shows the uuids to which each entry is mapped
        to, and is labeled 'mapped_uuids'.
    """
    deduped_dict = {}
    for i in range(len(df_with_matches)):
        deduped_uudis = df_with_matches.iloc[i]["duplicated"]
        for j in range(len(deduped_uudis)):
            deduped_dict.update({deduped_uudis[j]: df_with_matches.iloc[i]["id"]})

    # now convert dictionary into a csv file
    deduped_df = pd.DataFrame.from_dict(deduped_dict, "index")
    deduped_df = deduped_df.reset_index().rename(
        columns={"index": "original_uuids", 0: "mapped_uuid"}
    )
    deduped_df.to_csv(
        BASE_FILEPATH / "output" / "deduplicated_UUIDs.csv",
        index=False,
        mode="a",
    )


In [9]:
def splink_dedupe(df: pd.DataFrame, settings: dict, blocking: list) -> pd.DataFrame:
    """Use splink to deduplicate dataframe based on settings

    Configuration settings and blocking can be found in constants.py as
    individuals_settings, indivduals_blocking, organizations_settings,
    organizations_blocking

    Uses the splink library which employs probabilistic matching for
    record linkage
    https://moj-analytical-services.github.io/splink/index.html


    Args:
        df: dataframe
        settings: configuration settings
            (based on splink documentation and dataframe columns)
        blocking: list of columns to block on for the table
            (cuts dataframe into parts based on columns labeled blocks)

    Returns:
        deduplicated version of initial dataframe with column 'matching_id'
        that holds list of matching unique_ids
    """
    linker = DuckDBLinker(df, settings)
    linker.estimate_probability_two_random_records_match(
        blocking, recall=0.6
    )  # default
    linker.estimate_u_using_random_sampling(max_pairs=5e6)

    for i in blocking:
        linker.estimate_parameters_using_expectation_maximisation(i)

    df_predict = linker.predict()
    clusters = linker.cluster_pairwise_predictions_at_threshold(
        df_predict, threshold_match_probability=0.7
    )  # default
    clusters_df = clusters.as_pandas_dataframe()

    match_list_df = (
        clusters_df.groupby("cluster_id")["unique_id"].agg(list).reset_index()
    )  # dataframe where cluster_id maps unique_id to initial instance of row
    match_list_df = match_list_df.rename(columns={"unique_id": "duplicated"})

    first_instance_df = clusters_df.drop_duplicates(subset="cluster_id")
    col_names = np.append("cluster_id", df.columns)
    first_instance_df = first_instance_df[col_names]

    deduped_df = first_instance_df.merge(
        match_list_df[["cluster_id", "duplicated"]],
        on="cluster_id",
        how="left",
    )
    deduped_df = deduped_df.rename(columns={"cluster_id": "unique_id"})

    deduped_df["duplicated"] = deduped_df["duplicated"].apply(
        lambda x: x if isinstance(x, list) else [x]
    )
    convert_duplicates_to_dict(deduped_df)

    deduped_df = deduped_df.drop(columns=["duplicated"])

    return deduped_df


In [23]:
organizations_settings_test = {
    "link_type": "dedupe_only",
    "blocking_rules_to_generate_predictions": [
        "l.zipcode = r.zipcode"
        #"l.zipcode = r.zipcode"
    ],
    "comparisons": [
        cl.jaro_winkler_at_thresholds(
            "company_name", [0.9, 0.8]
        )
    ],
    "retain_matching_columns": True,
    "retain_intermediate_calculation_columns": True,
    "max_iterations": 10,
    "em_convergence": 0.01,
}

In [24]:
# organizations_settings = {
#     "link_type": "dedupe_only",
#     "blocking_rules_to_generate_predictions": [
#         "l.company_name = r.company_name",
#     ],
#     "comparisons": [
#         # cl.exact_match("entity_type", term_frequency_adjustments=True),
#         # cl.jaro_winkler_at_thresholds(
#         #     "state", [0.9, 0.8]
#         # ),  # threshold will catch typos and shortenings
#         # # Add more comparisons as needed
#     ],
#     "retain_matching_columns": True,
#     "retain_intermediate_calculation_columns": True,
#     "max_iterations": 10,
#     "em_convergence": 0.01,
# }

In [25]:
organizations_blocking = ["l.company_name = r.company_name"]

In [22]:
# currently throwing an error  
splink_dedupe(data, settings=organizations_settings, blocking = organizations_blocking)

Probability two random records match is estimated to be  5.95e-06.
This means that amongst all possible pairwise record comparisons, one in 168,063.30 are expected to match.  With 560,211 total possible comparisons, we expect a total of around 3.33 matching pairs
----- Estimating u probabilities using random sampling -----

Estimated u probabilities using random sampling

Your model is not yet fully trained. Missing estimates for:
    - company_name (no m values are trained).

----- Starting EM training session -----

Estimating the m probabilities of the model by blocking on:
l.company_name = r.company_name

Parameter estimates will be made for the following comparison(s):

Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: 
    - company_name



SplinkException: Error executing the following sql for table `__splink__m_u_counts`(__splink__m_u_counts_d756869e7):

        CREATE TABLE __splink__m_u_counts_d756869e7
        AS
        (WITH __splink__df_comparison_vectors as (select * from __splink__df_comparison_vectors_095b42c08), 
__splink__df_match_weight_parts as (
    select "unique_id_l","unique_id_r","zipcode_l","zipcode_r" 
    from __splink__df_comparison_vectors
    ), 
__splink__df_predict as (
    select
    log2(cast(1.5833427544428462 as float8) * ) as match_weight,
    CASE WHEN  THEN 1.0 ELSE (cast(1.5833427544428462 as float8) * )/(1+(cast(1.5833427544428462 as float8) * )) END as match_probability,
    "unique_id_l","unique_id_r","zipcode_l","zipcode_r" 
    from __splink__df_match_weight_parts
    
    order by 1
    ) 
    select 0 as comparison_vector_value,
           sum(match_probability * 1) /
               sum(1) as m_count,
           sum((1-match_probability) * 1) /
               sum(1) as u_count,
           '_probability_two_random_records_match' as output_column_name
    from __splink__df_predict
    )
        

Error was: Parser Error: syntax error at or near ")"

In [26]:
transformed_data = BASE_FILEPATH / "data" / "transformed"
organizations_table = pd.read_csv(transformed_data / "orgs_mini.csv")

In [38]:
organizations_table = organizations_table.rename(columns = {"id": "unique_id"})

In [39]:
organizations_table

Unnamed: 0.1,Unnamed: 0,unique_id,name,state,entity_type
0,4119,f6a32c9b-3cbc-43c2-bbaf-1a197ac21a81,enterprise rent-a-car,,vendor
1,65272,22d1abc3-d19a-411d-9394-f398f9f5e8f5,wahmhoff robert,MI,corporation
2,79759,a07e79a8-8d31-45d4-af6c-5bbe79c88b7c,todd thomas,MI,corporation
3,77356,59a0f3e4-8bbb-46b6-86b9-c925db00cfce,great lakes stainless,MI,corporation
4,14039,2da151b9-a307-4098-8fd0-9c65d5578b3b,fry's marketplace,,vendor
...,...,...,...,...,...
1042,71289,4e662cfa-24ca-449e-99aa-ab73f99ccbd3,wheaton robert,MI,corporation
1043,16797,4493a816-fd09-40a0-9c5d-8f248a554116,winco,,vendor
1044,64907,af9b5871-765f-40ee-b697-d8b2a55e62a9,burgess lyla,MI,corporation
1045,55395,6eb15115-c706-4c43-84ba-ef07319aad35,dembeck deborah,MI,corporation


In [42]:
# original linkage is also throwing an error

organizations_settings = {
    "link_type": "dedupe_only",
    "blocking_rules_to_generate_predictions": [
        "l.name = r.name",
    ],
    "comparisons": [
        cl.exact_match("entity_type", term_frequency_adjustments=True),
        cl.jaro_winkler_at_thresholds(
            "state", [0.9, 0.8]
        ),  # threshold will catch typos and shortenings
        # Add more comparisons as needed
    ],
    "retain_matching_columns": True,
    "retain_intermediate_calculation_columns": True,
    "max_iterations": 10,
    "em_convergence": 0.01,
}

organizations_blocking = ["l.name = r.name"]

In [43]:
organizations = splink_dedupe(organizations_table, organizations_settings, organizations_blocking)

Probability two random records match is estimated to be  0.000645.
This means that amongst all possible pairwise record comparisons, one in 1,549.76 are expected to match.  With 547,581 total possible comparisons, we expect a total of around 353.33 matching pairs
----- Estimating u probabilities using random sampling -----
u probability not trained for state - Jaro_winkler_similarity >= 0.9 (comparison vector value: 2). This usually means the comparison level was never observed in the training data.
u probability not trained for state - Jaro_winkler_similarity >= 0.8 (comparison vector value: 1). This usually means the comparison level was never observed in the training data.

Estimated u probabilities using random sampling

Your model is not yet fully trained. Missing estimates for:
    - entity_type (no m values are trained).
    - state (some u values are not trained, no m values are trained).

----- Starting EM training session -----

Estimating the m probabilities of the model by 

KeyError: 'id'