In [72]:
pip install splink

StatementMeta(synsp01, 41, 69, Finished, Available)

Note: you may need to restart the kernel to use updated packages.


### Splink Setup

In [73]:
from splink.spark.jar_location import similarity_jar_location

from pyspark  import SparkContext, SparkConf
from pyspark.sql import SparkSession, types
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql.functions import substring
import splink.spark.comparison_library as cl
import splink.spark.comparison_level_library as cll
import splink.spark.comparison_template_library as ctl
import splink.spark.blocking_rule_library as brl
from splink.comparison import Comparison
from splink.spark.linker import SparkLinker
import os
import pprint

conf = SparkConf()


conf.set("spark.driver.memory", "64g")
conf.set("spark.kryoserializer.buffer.max","1024m")
conf.set("spark.default.parallelism", "40")
spark.conf.set("spark.sql.shuffle.partitions", "40")


# Adds custom similarity functions, which are bundled with Splink
# documented here: https://github.com/moj-analytical-services/splink_scalaudfs
# The jar file needs to be downloaded from the above and uploaded to the synapse workspace
path = similarity_jar_location()

#conf.set('spark.driver.extraClassPath', path) #added by ash
conf.set("spark.jars", path)

sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession(sc)
spark.sparkContext.setCheckpointDir("./tmp_checkpoints")

os.makedirs("/tmp/Temp", exist_ok=True)

import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) 

StatementMeta(synsp01, 41, 70, Finished, Available)

In [None]:
print(path)


### Data Matching Function Setup

In [74]:
spark.udf.registerJavaFunction('jaro_winkler', 'uk.gov.moj.dash.linkage.JaroWinklerSimilarity',DoubleType())                              
spark.udf.registerJavaFunction('jaccard_sim', 'uk.gov.moj.dash.linkage.JaccardSimilarity',DoubleType())                          
spark.udf.registerJavaFunction('cosine_distance', 'uk.gov.moj.dash.linkage.CosineDistance',DoubleType())
spark.udf.registerJavaFunction('sqlEscape', 'uk.gov.moj.dash.linkage.sqlEscape',StringType())                        
spark.udf.registerJavaFunction('levdamerau_distance', 'uk.gov.moj.dash.linkage.LevDamerauDistance',DoubleType())   
spark.udf.registerJavaFunction('jaro_sim', 'uk.gov.moj.dash.linkage.JaroSimilarity',DoubleType())   

StatementMeta(synsp01, 41, 71, Finished, Available)

### Import Demo Dataset

In [75]:
#https://moj-analytical-services.github.io/splink/datasets.html

from splink.datasets import splink_datasets

#df = splink_datasets.fake_1000
#columns: ['unique_id', 'first_name', 'surname', 'dob', 'city', 'email', 'cluster']

pdf = splink_datasets.historical_50k
df = spark.createDataFrame(pdf)
#columns: ['unique_id','cluster','full_name','first_and_surname','first_name','surname','dob','birth_place','postcode_fake','gender','occupation']

#list(df.columns.values)
#pdf.head(5)
#df.limit(5).show()


StatementMeta(synsp01, 41, 72, Finished, Available)

  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


### Linkage settings

In [None]:
settings = {
    "link_type": "dedupe_only",
    "blocking_rules_to_generate_predictions": [
        brl.block_on(["substr(dob,1,4)","postcode_fake"]),
        brl.block_on(["dob","lower(substr(first_name,1,1))"]),
        brl.block_on(["lower(surname)","postcode_fake"]),
        brl.block_on(["lower(surname)","lower(first_name)"]),
    ],
    "comparisons": [
        ctl.name_comparison(
            "first_name",
            set_to_lowercase = True,
            include_exact_match_level=True,
            damerau_levenshtein_thresholds=[1,2],
            jaro_winkler_thresholds=[0.9, 0.8],
            term_frequency_adjustments=False
            ),
        ctl.name_comparison(
            "surname",
            set_to_lowercase = True,
            include_exact_match_level=True,
            damerau_levenshtein_thresholds=[1,2],
            jaro_winkler_thresholds=[0.9, 0.8],
            term_frequency_adjustments=False
            ),
        ctl.date_comparison("dob", 
            cast_strings_to_date=True,
            levenshtein_thresholds=[2],
            damerau_levenshtein_thresholds=[],
            datediff_thresholds=[1, 1],
            datediff_metrics=["month", "year"],
            ),
        cl.exact_match("birth_place"),
        ctl.postcode_comparison("postcode_fake", set_to_lowercase = True),
        cl.exact_match("gender"),
        cl.exact_match("occupation")
    ],
    "retain_matching_columns": True,
    "retain_intermediate_calculation_columns": True,
    #"additional_columns_to_retain": ["Street","Locality","Town","County"]
    "em_convergence": 0.001
}


### Create the linkage model

In [76]:
linker = SparkLinker(df, settings)

linker.profile_columns(
    ["first_name", "surname", "postcode_fake", "substr(dob, 1,4)"], top_n=10, bottom_n=5
)

StatementMeta(synsp01, 41, 73, Finished, Available)



--WARN-- 
 You are using datediff comparison
                        with str-casting and ANSI is not enabled. Bad dates
                        e.g. 1999-13-54 will not trigger an exception but will
                        classed as comparison level = "ELSE". Ensure date strings
                        are cleaned to remove bad dates 



### Calculate probability two random records match


In [77]:
deterministic_rules = [
    "l.first_name = r.first_name and l.surname = r.surname and l.dob = r.dob",
    "l.first_name = r.first_name and l.surname = r.surname and l.postcode_fake = r.postcode_fake"
]

linker.estimate_probability_two_random_records_match(deterministic_rules, recall=0.70)

StatementMeta(synsp01, 41, 74, Finished, Available)

Probability two random records match is estimated to be  6.43e-05.
This means that amongst all possible pairwise record comparisons, one in 15,547.42 are expected to match.  With 1,279,041,753 total possible comparisons, we expect a total of around 82,267.14 matching pairs


### Number of comparisons generated by blocking rules

In [78]:
linker.cumulative_num_comparisons_from_blocking_rules_chart()

StatementMeta(synsp01, 41, 75, Finished, Available)

### Random sampling to calculate u values (known non-matches)

In [79]:
linker.estimate_u_using_random_sampling(max_pairs=1e7)

StatementMeta(synsp01, 41, 76, Finished, Available)

----- Estimating u probabilities using random sampling -----

Estimated u probabilities using random sampling

Your model is not yet fully trained. Missing estimates for:
    - first_name (no m values are trained).
    - surname (no m values are trained).
    - dob (no m values are trained).
    - birth_place (no m values are trained).
    - postcode_fake (no m values are trained).
    - gender (no m values are trained).
    - occupation (no m values are trained).


### Deterministic rules to calculate m values (known matches)

In [80]:
training_blocking_rule = brl.block_on(["surname","first_name","dob"])
training_session_1 = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)

training_blocking_rule = brl.block_on(["surname","first_name","postcode_fake"])
training_session_2 = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)

training_blocking_rule = brl.block_on(["first_name","postcode_fake","dob"])
training_session_3 = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)

training_blocking_rule = brl.block_on(["surname","postcode_fake","dob"])
training_session_4 = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)



StatementMeta(synsp01, 41, 77, Finished, Available)


----- Starting EM training session -----

Estimating the m probabilities of the model by blocking on:
(l.`surname` = r.`surname`) AND (l.`first_name` = r.`first_name`) AND (l.`dob` = r.`dob`)

Parameter estimates will be made for the following comparison(s):
    - birth_place
    - postcode_fake
    - gender
    - occupation

Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: 
    - first_name
    - surname
    - dob

Iteration 1: Largest change in params was 0.807 in probability_two_random_records_match
Iteration 2: Largest change in params was 0.126 in probability_two_random_records_match
Iteration 3: Largest change in params was 0.0228 in probability_two_random_records_match
Iteration 4: Largest change in params was 0.00687 in probability_two_random_records_match
Iteration 5: Largest change in params was 0.00281 in probability_two_random_records_match
Iteration 6: Largest change in params was 0.00138 in probability_two_rand

In [81]:

training_blocking_rule = brl.block_on(["surname","birth_place","occupation"])
training_session_5 = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)


StatementMeta(synsp01, 41, 78, Finished, Available)


----- Starting EM training session -----

Estimating the m probabilities of the model by blocking on:
(l.`surname` = r.`surname`) AND (l.`birth_place` = r.`birth_place`) AND (l.`occupation` = r.`occupation`)

Parameter estimates will be made for the following comparison(s):
    - first_name
    - dob
    - postcode_fake
    - gender

Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: 
    - surname
    - birth_place
    - occupation

Level Within 1 month on comparison dob not observed in dataset, unable to train m value

Iteration 1: Largest change in params was 0.713 in probability_two_random_records_match
Iteration 2: Largest change in params was 0.0574 in probability_two_random_records_match
Iteration 3: Largest change in params was 0.0126 in probability_two_random_records_match
Iteration 4: Largest change in params was 0.00403 in probability_two_random_records_match
Iteration 5: Largest change in params was 0.0016 in proba

In [82]:

training_blocking_rule = brl.block_on(["first_name","birth_place","occupation"])
training_session_6 = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)


StatementMeta(synsp01, 41, 79, Finished, Available)


----- Starting EM training session -----

Estimating the m probabilities of the model by blocking on:
(l.`first_name` = r.`first_name`) AND (l.`birth_place` = r.`birth_place`) AND (l.`occupation` = r.`occupation`)

Parameter estimates will be made for the following comparison(s):
    - surname
    - dob
    - postcode_fake
    - gender

Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: 
    - first_name
    - birth_place
    - occupation

Iteration 1: Largest change in params was 0.724 in probability_two_random_records_match
Iteration 2: Largest change in params was 0.0242 in probability_two_random_records_match
Iteration 3: Largest change in params was 0.00453 in probability_two_random_records_match
Iteration 4: Largest change in params was 0.00106 in probability_two_random_records_match
Iteration 5: Largest change in params was 0.000255 in probability_two_random_records_match

EM converged after 5 iterations

Your model is 

In [83]:
linker.match_weights_chart()
#linker.m_u_parameters_chart()

StatementMeta(synsp01, 41, 80, Finished, Available)

In [84]:
linker.unlinkables_chart()


StatementMeta(synsp01, 41, 81, Finished, Available)

In [85]:
results = linker.predict(threshold_match_probability=0.9)


StatementMeta(synsp01, 41, 82, Finished, Available)

In [86]:
rdf = results.as_pandas_dataframe(limit=100)
rdf

StatementMeta(synsp01, 41, 83, Finished, Available)

Unnamed: 0,match_weight,match_probability,unique_id_l,unique_id_r,first_name_l,first_name_r,gamma_first_name,bf_first_name,surname_l,surname_r,...,bf_postcode_fake,gender_l,gender_r,gamma_gender,bf_gender,occupation_l,occupation_r,gamma_occupation,bf_occupation,match_key
0,27.458204,1.000000,Q15909904-13,Q15909904-17,jean,john,3,6.490748,reeves,reeves,...,5110.408275,male,male,1,1.301964,,,-1,1.000000,0
1,11.553777,0.999667,Q14946876-13,Q14946876-8,bert,leaf,0,0.136476,leaf,,...,5110.408275,male,male,1,1.301964,,,-1,1.000000,0
2,15.235733,0.999974,Q7528873-3,Q7528873-6,sir,sir,5,49.261980,wilmot,baronet,...,5110.408275,male,male,1,1.301964,,,-1,1.000000,0
3,34.564123,1.000000,Q594112-2,Q594112-9,william,william,5,49.261980,denning,denning,...,5110.408275,male,,-1,1.000000,astronomer,astronomer,1,23.630688,0
4,30.382222,1.000000,Q472470-4,Q472470-8,william,william,5,49.261980,leach,leach,...,5110.408275,male,male,1,1.301964,naturalist,,-1,1.000000,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,26.936637,1.000000,Q6244750-1,Q6244750-5,john,john,5,49.261980,lewis,louis,...,5110.408275,male,male,1,1.301964,businessperson,businessperson,1,23.630688,0
96,30.382222,1.000000,Q7965789-1,Q7965789-2,walter,walter,5,49.261980,parsons,parsons,...,5110.408275,male,male,1,1.301964,,,-1,1.000000,0
97,28.853642,1.000000,Q4348292-1,Q4348292-7,ada,adriana,1,17.075109,roe,roe,...,5110.408275,female,female,1,1.301964,,,-1,1.000000,0
98,10.965922,0.999500,Q5623556-3,Q5623556-8,gwerful,gwerfyl,4,20.359765,,fychan,...,5110.408275,female,,-1,1.000000,poet,,-1,1.000000,0


### Cluster to generate distinct output list

In [87]:
clusters = linker.cluster_pairwise_predictions_at_threshold(results, threshold_match_probability=0.9)
#cdf = clusters.as_pandas_dataframe()
#clusters.as_pandas_dataframe(limit=5)
spark.createDataFrame(clusters.as_pandas_dataframe()).createOrReplaceTempView("clusters")

StatementMeta(synsp01, 41, 84, Finished, Available)

Completed iteration 1, root rows count 926
Completed iteration 2, root rows count 567
Completed iteration 3, root rows count 484
Completed iteration 4, root rows count 262
Completed iteration 5, root rows count 147
Completed iteration 6, root rows count 47
Completed iteration 7, root rows count 16
Completed iteration 8, root rows count 9
Completed iteration 9, root rows count 4
Completed iteration 10, root rows count 2
Completed iteration 11, root rows count 0


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


In [88]:
%%sql
select count(1)
from
(select distinct cluster_id
from clusters)
--limit 10

StatementMeta(synsp01, 41, 85, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

### M & U Analysis

In [89]:
from splink.charts import waterfall_chart
records_to_plot = rdf.to_dict(orient="records")
linker.waterfall_chart(records_to_plot, filter_nulls=False)


StatementMeta(synsp01, 41, 86, Finished, Available)