## Splink deduplication demo 

In this demo we de-duplicate a dataset.  The dataset is small (1000) rows to enable the tutorial to be run interactively, but readers should imaging the true dataset is large (millions of rows).

This is a comprehensive example in which we demonstrate the full range of functionality of Splink, and how it is intended to be used.  A quickstart notebook is provided elsewhere.

Note that various outputs are printed interactively using e.g. `spark_dataframe.show()`.  This is for instructional purposes only - it degrades performance and shouldn't be used in a production setting.

## Step 1:  Imports and setup

The following is just boilerplate code that sets up the Spark session and sets some other non-essential configuration options

In [53]:
import pandas as pd 
pd.options.display.max_columns = 500
pd.options.display.max_rows = 100


In [54]:
import logging 
logging.basicConfig()  # Means logs will print in Jupyter Lab

# Set to DEBUG if you want splink to log the SQL statements it's executing under the hood
logging.getLogger("splink").setLevel(logging.INFO)

In [55]:
from utility_functions.demo_utils import get_spark
spark = get_spark() # See utility_functions/demo_utils.py for how to set up Spark

## Step 2:  Read in data

Note that the `group` column is the truth - rows which share the same value refer to the same person.  In the real world, we wouldn't have this field because this is the truth - the label which Splink is trying to estimate.

In [56]:
df = spark.read.csv("data/fake_1000.csv", header=True)
df.show(5)

+---------+----------+-------+----------+------+--------------------+-----+
|unique_id|first_name|surname|       dob|  city|               email|group|
+---------+----------+-------+----------+------+--------------------+-----+
|        0|    Robert|   Alan|1971-06-24|  null| robert255@smith.net|    0|
|        1|    Robert|  Allen|1971-05-24|  null| roberta25@smith.net|    0|
|        2|       Rob|  Allen|1971-06-24|London| roberta25@smith.net|    0|
|        3|    Robert|   Alen|1971-06-24| Lonon|                null|    0|
|        4|     Grace|   null|1997-04-26|  Hull|grace.kelly52@jon...|    1|
+---------+----------+-------+----------+------+--------------------+-----+
only showing top 5 rows



## Step 3:  Profile the data

Understanding your data through the use of summary statistics is critical. 

For the purposes of data linking, one of the most important attributes of your data is cardinality.  Fields like date of birth which have a large number of unique values (high cardinality) are more useful than fields with low cardinality like gender.

Another important attribute is skew, which can occur:
- because values (like a first name of John) occur more frequently in the real world
- due to data quality issues (such as a large number of date of birth fields containing the value '1970-01-01' due errors in date parsing)

In general Splink will produce less accurate predictions when you have skew, especially in the case of extreme skew.  So, for example, different name frequences (John vs Robin) shouldn't cause a huge problem, but if you have 30,000 distinct dates of birth, but half of the values are '1970-01-01' Splink is unlikely to produce accuate predictions of match probability.

It is imperative, therefore, that data quality issues are addressed prior to using Splink.  Note that Splink will correctly deal with nulls, so suspicious values should usually be nulled out.

Splink provides a profiling tool to for you to visually inspect value frequencies, which we can see as follows:

In the caes of our test dataset, there is a moderate level of skew, partcularly on the first_name, surname and city columns.  We can see most of our fields have high cardinality.

We also observe that misspellings are common across most fields.

In [57]:
from splink.profile import column_value_frequencies_chart

In [58]:
import altair as alt
alt.renderers.enable('mimetype')
#alt.renderers.enable('default')
column_value_frequencies_chart(['first_name', "surname", "dob", "city", "email"], df, spark)

<VegaLite 4 object>

If you see this message, it means the renderer has not been properly enabled
for the frontend that you are using. For more information, see
https://altair-viz.github.io/user_guide/troubleshooting.html


In this step, with real world data, we would do quite a bit of data cleaning work to do things like standarise names (eg. making them all upper case, splitting out multiple names), cleaning up dates, etc.  Some useful functions are available in `[splink_data_standardisation](https://github.com/moj-analytical-services/splink_data_standardisation)`

This will be different for every dataset, but the aim is to make it as easy as possible for Splink to spot similarities and difference between fields.

Now that we understand the data, we are ready to start using Splink.

## Step 3: Decide on a blocking approach

In most large datasets, it will be computationally intractable to compare every row with every other row.  We can use a technique called blocking to dramatically reduce the number of comparisons by comparing only records that adhere to certain rules, such as that the first name and date of birth must be equal .  Blocking is described further [here](https://toolkit.data.gov.au/Data_Linking_Information_Series_Sheet_4:_Probabilistic_linking.html).

The above profling charts are useful to understand which blocking rules may be effective.  For example, we can see that that the forename 'George' is the most prevalant in the dataset, occurring 22 times.  If we block on forename, $\frac{n\left(n-1\right)}2$ comparisons will be generated amongst Georges, with each instance being compared to each other.  This is probably viable in terms of the number of comparisons generated.  On the other hand, London appears over 250 times, and so blocking on city would generate a very large number of comparisons where both records had city = London.  

Whilst blocking this is useful in reducing record comparisons, the problem is that its rare to find a blocking rule which eliminates enough record comparisons, and does not eliminate true matches (i.e. which has perfect recall).  

The solution to this problem is generally to use several different blocking rules -s o for example, to generate record comparisons based on either the rule that first name must be equal, or that surname must be equal.  This means that the blocking approach will only eliminate true comparisons when both first and surname match, which will happen much less often than either one individually.

Unfortuantely, it is theoretically invalid to estimate the Fellegi Sunter model on comparisons generated from multiple blocking rules, becauase it breaks the assumption of conditional independece.

The recommended approach is to estimate different models, one for each blocking rule, and combine the results. 

We will demonstrate how to combined models in a separate notebook.  In this notebook we demonstrate how to run the model for a single rule

## Step 4:  Configure splink using the `settings` object

`splink` configuration options are stored in a settings dictionary.  This dictionary allows significant customisation, and can therefore get quite complex.  

💥 We provide an tool for helping to author valid settings dictionaries, which includes tooltips and autocomplete, which you can find [here](http://robinlinacre.com/splink_settings_editor/).

Customisation overrides default values built into splink.  For the purposes of this demo, we will specify a simple settings dictionary, which means we will be relying on these sensible defaults.

To help with authoring and validation of the settings dictionary, we have written a [json schema](https://json-schema.org/), which can be found [here](https://github.com/moj-analytical-services/splink/blob/master/splink/files/settings_jsonschema.json).  




In [59]:
settings = {
    "link_type": "dedupe_only",
    "blocking_rules": [
        "l.first_name = r.first_name"
    ],
    "comparison_columns": [
        {
            "col_name": "surname",
            "num_levels": 3,
            "term_frequency_adjustments": True
        },
        {
            "col_name": "dob"
        },
        {
            "col_name": "city",
            "term_frequency_adjustments": True
        },
        {
            "col_name": "email"
        }
    ],
    "additional_columns_to_retain": ["group"],
    "em_convergence": 0.01
}

In words, this setting dictionary says:
- We are performing a deduplication task (the other options are `link_only`, or `link_and_dedupe`)
- We are going generate comparisons subject to the blocking rules contained in the specified array - in this case, only generate comparisons where the forename matches
- When comparing records, we will use information from the `surname`, `dob`, `city` and `email` columns to compute a match score.
- For `surname` string comparison will have three levels:
    - Level 2: Strings are (almost) exactly the same
    - Level 1: Strings are similar 
    - Level 0: No match
- We will make adjustments for term frequencies on the `surname` columns
- We will retain the `group` column in the results even though this is not used as part of comparisons.  This is a labelled dataset and `group` contains the true match - i.e. where group matches, the records pertain to the same person
- Consider the algorithm to have converged when no parameter changes by more than 0.01 between iterations

## Step 3:  Estimate u probabilities directly

The u probabilities are the probabilities that fields will match given the records do not match.  So, for example, a 'month of birth' column would be expected to match around 1/12 of the time amongst non-matching records

For many datasets, the probability that two records selected at random will match is almost zero.  A sample of such pairs of records should therefore almost exclusively contain non-matches.  

We can therefore simply assume this dataset contains non-matches only, and directly estimate u probabilities.


In [60]:
from splink.estimate import estimate_u_values
settings_with_u = estimate_u_values(settings, df, spark, fix_u_probabilities=True)

In [61]:
from pprint import pprint
pprint(settings_with_u)

{'additional_columns_to_retain': ['group'],
 'blocking_rules': ['l.first_name = r.first_name'],
 'comparison_columns': [{'col_name': 'surname',
                         'fix_u_probabilities': True,
                         'num_levels': 3,
                         'term_frequency_adjustments': True,
                         'u_probabilities': [0.9911395311355591,
                                             0.003970493096858263,
                                             0.004889975767582655]},
                        {'col_name': 'dob',
                         'fix_u_probabilities': True,
                         'u_probabilities': [0.998252252268821,
                                             0.001747747731179016]},
                        {'col_name': 'city',
                         'fix_u_probabilities': True,
                         'term_frequency_adjustments': True,
                         'u_probabilities': [0.9448524293462267,
                                          

## Step 4:  Estimate match scores using the Expectation Maximisation algorithm


Columns are assumed to be strings by default.  See the 'comparison vector settings' notebook for details of configuration options.



In [62]:
from splink import Splink

linker = Splink(settings_with_u, df, spark)
df_e = linker.get_scored_comparisons()

INFO:splink.iterate:Iteration 0 complete
INFO:splink.model:The maximum change in parameters was 0.38965016007423403 for key dob, level 0
INFO:splink.iterate:Iteration 1 complete
INFO:splink.model:The maximum change in parameters was 0.08626824617385864 for key email, level 0
INFO:splink.iterate:Iteration 2 complete
INFO:splink.model:The maximum change in parameters was 0.02818053960800171 for key email, level 0
INFO:splink.iterate:Iteration 3 complete
INFO:splink.model:The maximum change in parameters was 0.009459644556045532 for key email, level 0
INFO:splink.iterate:EM algorithm has converged


## Step 5: Inspect results 



In [63]:
# Inspect main dataframe that contains the match scores
cols_to_inspect = ["tf_adjusted_match_prob","unique_id_l","unique_id_r","group_l", "group_r", "surname_l","surname_r","dob_l","dob_r","city_l","city_r","email_l","email_r",]

df_e.toPandas()[cols_to_inspect].sort_values(["unique_id_l", "unique_id_r"]).head(10)

Unnamed: 0,tf_adjusted_match_prob,unique_id_l,unique_id_r,group_l,group_r,surname_l,surname_r,dob_l,dob_r,city_l,city_r,email_l,email_r
1,0.009296,0,1,0,0,Alan,Allen,1971-06-24,1971-05-24,,,robert255@smith.net,roberta25@smith.net
0,0.897881,0,3,0,0,Alan,Alen,1971-06-24,1971-06-24,,Lonon,robert255@smith.net,
2,0.955836,1,3,0,0,Allen,Alen,1971-05-24,1971-06-24,,Lonon,roberta25@smith.net,
166,0.230009,100,330,28,88,,Richards,2009-08-25,2014-07-10,,,e.e@day.com,
165,0.110099,100,331,28,88,,,2009-08-25,2024-07-07,,Manchester,e.e@day.com,e.richards16@finley.ifo
164,0.110099,100,333,28,88,,Richards,2009-08-25,2014-07-10,,Mancester,e.e@day.com,e.richards16@finley.info
176,1.0,101,102,29,29,Griffiths,,2008-05-07,2008-05-07,Plymouth,Plymouth,a.griffiths@garner-bridges.com,a.griffiths@garner-bridges.com
175,0.909611,101,103,29,29,Griffiths,Griffihs,2008-05-07,2009-04-07,Plymouth,Plyoutm,a.griffiths@garner-bridges.com,
174,1.0,101,105,29,29,Griffiths,Griffiths,2008-05-07,2008-05-07,Plymouth,Plymouth,a.griffiths@garner-bridges.com,a.griffiths@garner-bridges.com
173,0.010425,101,149,29,40,Griffiths,eKelly,2008-05-07,2010-10-09,Plymouth,Birmimghan,a.griffiths@garner-bridges.com,


We can plot a histogram of the splink match probility.  This chart shows that, for the majority of record comparisons, the match score is estimated to be close to 0 or 1.  This indicates that there are few pairs of records where we are unsure whether it's a match or not.

In [64]:
from splink.diagnostics import splink_score_histogram
splink_score_histogram(df_e, spark, 100)

<VegaLite 4 object>

If you see this message, it means the renderer has not been properly enabled
for the frontend that you are using. For more information, see
https://altair-viz.github.io/user_guide/troubleshooting.html


The `params` property of the `linker` is an object that contains a lot of diagnostic information about how the match probability was computed.  The following cells demonstrate some of its functionality

In [65]:
model = linker.model
model.probability_distribution_chart()

<VegaLite 4 object>

If you see this message, it means the renderer has not been properly enabled
for the frontend that you are using. For more information, see
https://altair-viz.github.io/user_guide/troubleshooting.html


An alternative representation of the parameters displays them in terms of the effect different values in the comparison vectors have on the match probability:

In [66]:
model.bayes_factor_chart()

<VegaLite 4 object>

If you see this message, it means the renderer has not been properly enabled
for the frontend that you are using. For more information, see
https://altair-viz.github.io/user_guide/troubleshooting.html


In [67]:
# If charts aren't displaying correctly in your notebook, you can write them to a file (by default splink_charts.html)
model.all_charts_write_html_file(filename="splink_charts.html", overwrite=True)

You can also generate a report which explains how the match probability was computed for an individual comparison row.  

Note that you need to convert the row to a dictionary for this to work

In [68]:
from splink.intuition import intuition_report

sample = df_e.toPandas().sample(1)
row_dict = sample.to_dict(orient="records")[0]
display(sample)
print(intuition_report(row_dict, model))

Unnamed: 0,tf_adjusted_match_prob,match_probability,unique_id_l,unique_id_r,surname_l,surname_r,gamma_surname,dob_l,dob_r,gamma_dob,city_l,city_r,gamma_city,email_l,email_r,gamma_email,group_l,group_r
1149,0.010425,0.010425,444,636,Campbell,Adasm,0,1997-05-04,2000-09-03,0,London,Londn,0,,jacob.a83@clark.com,-1,113,160



Initial probability of match (prior) = λ = 0.3337
------
Comparison of surname.  Values are:                
surname_l:                                         Campbell
surname_r:                                         Adasm
Comparison has:                                    3 levels
Level for this comparison:                         gamma_surname = 0
m probability = P(level|match):                    0.07517
u probability = P(level|non-match):                0.9911
Bayes factor = m/u:                                0.07585
New probability of match (updated belief):         0.03659

------
Comparison of dob.  Values are:                    
dob_l:                                             1997-05-04
dob_r:                                             2000-09-03
Comparison has:                                    2 levels
Level for this comparison:                         gamma_dob = 0
m probability = P(level|match):                    0.5954
u probability = P(level|non-match):       

You can also create a visual representation of this intuition report as a waterfall chart. 

This uses the Bayes factor to show the relative influence on the final match probability of the λ parameter, and each comparison column for an individual comparison row.

Columns that have no influence on the match score (due to missing data) are not shown.

In [69]:
from splink.intuition import bayes_factor_intuition_chart

bayes_factor_intuition_chart(row_dict, model)

<VegaLite 4 object>

If you see this message, it means the renderer has not been properly enabled
for the frontend that you are using. For more information, see
https://altair-viz.github.io/user_guide/troubleshooting.html


## Step 6: Measure accuracy of results 

If you have labelled data, Splink contains routines to compute various accuracy statistics.

The labelled data should look like this:

| unique_id_l | unique_id_r | clerical_match_score |
|:------------|:------------|---------------------:|
| id1         | id2         |                  0.9 |
| id1         | id3         |                  0.1 |

Since we have labelled data, in this noteboko I create this table from the 'group' field of the original data



**Note: We will fine that the overall accuracy of these results is low because our blocking rule eliminates a large percentage of true matches**

In [70]:
cols = ['unique_id', 'group']
dfpd_l = df.toPandas().sample(300)[cols]
dfpd_l["join_col"] = 1
dfpd_r = dfpd_l.copy()
labels = dfpd_l.merge(dfpd_r, on = "join_col", suffixes=('_l', '_r'))
labels = labels[labels["unique_id_r"]< labels["unique_id_l"]]
labels["clerical_match_score"] = (labels["group_l"] == labels["group_r"]).astype(float)
labels = labels.drop(["group_l", "group_r", "join_col"], axis=1)
labels.head()

Unnamed: 0,unique_id_l,unique_id_r,clerical_match_score
7,420,261,0.0
13,420,170,0.0
16,420,137,0.0
17,420,304,0.0
20,420,14,0.0


In [71]:
from splink.truth import labels_with_splink_scores, roc_chart, precision_recall_chart
labels_sp = spark.createDataFrame(labels)
labels_and_scores = labels_with_splink_scores(labels_sp, df_e, "unique_id", spark)
labels_and_scores.show()

+-----------+-----------+--------------------+----------------------+-----------------+
|unique_id_l|unique_id_r|clerical_match_score|tf_adjusted_match_prob|found_by_blocking|
+-----------+-----------+--------------------+----------------------+-----------------+
|        179|          0|                 0.0|                   0.0|            false|
|        209|          0|                 0.0|                   0.0|            false|
|        269|          0|                 0.0|                   0.0|            false|
|        273|          0|                 0.0|                   0.0|            false|
|        275|          0|                 0.0|                   0.0|            false|
|        289|          0|                 0.0|                   0.0|            false|
|        312|          0|                 0.0|                   0.0|            false|
|         34|          0|                 0.0|                   0.0|            false|
|        381|          0|       

In [72]:
labels_and_scores_pd = labels_and_scores.toPandas()
f1 = labels_and_scores_pd["clerical_match_score"] == 1.0
labels_and_scores_pd[f1].sample(10)

Unnamed: 0,unique_id_l,unique_id_r,clerical_match_score,tf_adjusted_match_prob,found_by_blocking
16650,804,801,1.0,0.0,False
11198,920,919,1.0,0.0,False
42908,484,481,1.0,0.0,False
3959,519,518,1.0,0.0,False
16026,653,655,1.0,0.571804,True
33257,699,697,1.0,0.0,False
12900,261,262,1.0,0.987637,True
33842,938,939,1.0,1.0,True
3411,449,446,1.0,0.0,False
39049,84,80,1.0,0.0,False


In [73]:
from splink.truth import labels_with_splink_scores, roc_chart, precision_recall_chart


roc_chart(labels_and_scores, spark)

<VegaLite 4 object>

If you see this message, it means the renderer has not been properly enabled
for the frontend that you are using. For more information, see
https://altair-viz.github.io/user_guide/troubleshooting.html


In [74]:
precision_recall_chart(labels_and_scores, spark)

<VegaLite 4 object>

If you see this message, it means the renderer has not been properly enabled
for the frontend that you are using. For more information, see
https://altair-viz.github.io/user_guide/troubleshooting.html


## Step 7: Use graphframes to resolve links into groups

In [75]:
from graphframes import GraphFrame


df_e.createOrReplaceTempView("df_e")
sql = """
select unique_id_l as id
from df_e
union
select unique_id_r as id
from df_e
"""
nodes = spark.sql(sql)

sql = """
select
unique_id_l as src,
unique_id_r as dst,
tf_adjusted_match_prob
from df_e
where tf_adjusted_match_prob > 0.99
"""
edges = spark.sql(sql)

g = GraphFrame(nodes, edges)

cc = g.connectedComponents()

cc.createOrReplaceTempView("cc")
df.createOrReplaceTempView("df")
sql = """
select cc.component as estimated_group, df.*
from cc
left join
df 
on cc.id = df.unique_id
order by group, estimated_group
"""
results = spark.sql(sql)
results.toPandas().head(30)

Unnamed: 0,estimated_group,unique_id,first_name,surname,dob,city,email,group
0,0,1,Robert,Allen,1971-05-24,,roberta25@smith.net,0
1,17179869198,3,Robert,Alen,1971-06-24,Lonon,,0
2,25769803776,0,Robert,Alan,1971-06-24,,robert255@smith.net,0
3,8589934626,5,Grace,Kelly,1991-04-26,,grace.kelly52@jones.com,1
4,51539607578,4,Grace,,1997-04-26,Hull,grace.kelly52@jones.com,1
5,34359738390,385,Lottie,Davis,1972-06-12,,lottie.d7@morgan-pierce.com,100
6,34359738390,386,Lottie,Davis,1972-06-12,StSockton-on-Tees,lottie.d7@morgan-pierce.com,100
7,34359738390,387,Lottie,Davis,1972-06-12,Stockton-onsTee-,,100
8,26,391,Isaac,,1991-05-06,Lodon,isaac.james@smich.tom,101
9,60129542170,393,Isaac,James,1991-05-06,London,,101


In [76]:
## splink_graph functionality

In [77]:
import splink_graph
from splink_graph.splink_graph import subgraph_stats
from splink_graph.splink_graph import _graphharmoniser
from pyspark.sql import functions as f

In [78]:

edgesinfo = (df_e.drop("gamma_dob","gamma_city","gamma_surname","gamma_email","group_l","group_r").
             withColumn( "l_info",f.concat_ws("_","surname_l","dob_l","city_l","email_l")).
             withColumn( "r_info",f.concat_ws("_","surname_r","dob_r","city_r","email_r")))
             
            


In [79]:
edgesinfo.printSchema()
ccresults=results.select("estimated_group","unique_id")
ccresults.printSchema()

root
 |-- tf_adjusted_match_prob: double (nullable = true)
 |-- match_probability: double (nullable = true)
 |-- unique_id_l: string (nullable = true)
 |-- unique_id_r: string (nullable = true)
 |-- surname_l: string (nullable = true)
 |-- surname_r: string (nullable = true)
 |-- dob_l: string (nullable = true)
 |-- dob_r: string (nullable = true)
 |-- city_l: string (nullable = true)
 |-- city_r: string (nullable = true)
 |-- email_l: string (nullable = true)
 |-- email_r: string (nullable = true)
 |-- l_info: string (nullable = false)
 |-- r_info: string (nullable = false)

root
 |-- estimated_group: long (nullable = true)
 |-- unique_id: string (nullable = true)



In [80]:
import pyspark.sql.functions as f
edge_df = edgesinfo.alias('a').join(ccresults.alias('b'),f.col("a.unique_id_l")==f.col("b.unique_id")).drop("unique_id")

In [81]:

edge_df = _graphharmoniser(edge_df,"unique_id_l","unique_id_r")

In [82]:
edge_df.printSchema()

root
 |-- tf_adjusted_match_prob: double (nullable = true)
 |-- match_probability: double (nullable = true)
 |-- surname_l: string (nullable = true)
 |-- surname_r: string (nullable = true)
 |-- dob_l: string (nullable = true)
 |-- dob_r: string (nullable = true)
 |-- city_l: string (nullable = true)
 |-- city_r: string (nullable = true)
 |-- email_l: string (nullable = true)
 |-- email_r: string (nullable = true)
 |-- l_info: string (nullable = false)
 |-- r_info: string (nullable = false)
 |-- estimated_group: long (nullable = true)
 |-- unique_id_l: string (nullable = true)
 |-- unique_id_r: string (nullable = true)



In [83]:
edge_df=edge_df.withColumn("component" ,f.col("estimated_group"))
edge_df = edge_df.withColumn("distance" ,1.01 - f.col("tf_adjusted_match_prob"))


In [84]:

sgs = subgraph_stats(edge_df,"component", "tf_adjusted_match_prob",
               src="unique_id_l", dst="unique_id_r")

In [85]:
sgs.sort(sgs.nodecount.desc()).show(1000)

+-----------+--------------------+---------+---------+-------+
|  component|               nodes|nodecount|edgecount|density|
+-----------+--------------------+---------+---------+-------+
|          3|[110, 111, 114, 1...|       28|      125|  0.331|
|25769803783|[115, 546, 845, 5...|       23|       22|  0.087|
| 8589934595|[14, 546, 845, 52...|       22|       21|  0.091|
|34359738376|[258, 546, 845, 5...|       21|       20|  0.095|
| 8589934605|[259, 546, 845, 5...|       20|       19|    0.1|
|         14|[260, 546, 845, 5...|       19|       18|  0.105|
|42949672972|[261, 546, 845, 5...|       18|       17|  0.111|
|17179869195|[262, 546, 845, 5...|       17|       16|  0.118|
|25769803820|[529, 528, 527, 5...|       16|       42|   0.35|
| 8589934598|[153, 441, 591, 6...|       16|       15|  0.125|
| 8589934600|[184, 182, 181, 1...|       15|       48|  0.457|
|25769803812|[442, 441, 444, 4...|       15|       60|  0.571|
|17179869214|[499, 623, 583, 6...|       14|       13| 

In [86]:
from splink_graph.vectorised import diameter_radius_transitivity

In [87]:
drt = diameter_radius_transitivity(edge_df,"unique_id_l", "unique_id_r")


In [88]:
drt.sort(drt.transitivity.desc()).show(400)

+-----------+--------+------+------------+--------------+-------------+--------------------+
|  component|diameter|radius|transitivity|tri_clustcoeff|sq_clustcoeff|           graphhash|
+-----------+--------+------+------------+--------------+-------------+--------------------+
| 8589934671|       1|     1|         1.0|           1.0|          1.0|7befbc1a93e06d1ba...|
|         45|       1|     1|         1.0|           1.0|          1.0|faa092922b6078a79...|
| 8589934616|       1|     1|         1.0|           1.0|          1.0|7befbc1a93e06d1ba...|
|17179869234|       1|     1|         1.0|           1.0|          1.0|faa092922b6078a79...|
|25769803828|       1|     1|         1.0|           1.0|          1.0|faa092922b6078a79...|
|         47|       1|     1|         1.0|           1.0|          1.0|faa092922b6078a79...|
| 8589934659|       1|     1|         1.0|           1.0|          1.0|faa092922b6078a79...|
|17179869188|       1|     1|         1.0|           1.0|          0.0

In [89]:
graphstats = sgs.join(drt,on="component")

In [90]:
graphstats.sort(graphstats.nodecount.desc(),graphstats.transitivity.asc()).drop("graphhash","nodes").show(1000)

+-----------+---------+---------+-------+--------+------+------------+--------------+-------------+
|  component|nodecount|edgecount|density|diameter|radius|transitivity|tri_clustcoeff|sq_clustcoeff|
+-----------+---------+---------+-------+--------+------+------------+--------------+-------------+
|          3|       28|      125|  0.331|       2|     1|       0.363|         0.871|          1.0|
|25769803783|       23|       22|  0.087|       2|     1|         0.0|           0.0|          0.0|
| 8589934595|       22|       21|  0.091|       2|     1|         0.0|           0.0|          0.0|
|34359738376|       21|       20|  0.095|       2|     1|         0.0|           0.0|          0.0|
| 8589934605|       20|       19|    0.1|       2|     1|         0.0|           0.0|          0.0|
|         14|       19|       18|  0.105|       2|     1|         0.0|           0.0|          0.0|
|42949672972|       18|       17|  0.111|       2|     1|         0.0|           0.0|          0.0|


In [91]:
import pyspark.sql.functions as f

In [92]:
graphstats.select("component","nodecount","edgecount","transitivity","graphhash").show(1000)

+-----------+---------+---------+------------+--------------------+
|  component|nodecount|edgecount|transitivity|           graphhash|
+-----------+---------+---------+------------+--------------------+
|          2|        4|        3|         0.0|5b6a1a47075309676...|
|         12|        3|        3|         1.0|7d2c307dbd866960f...|
|         26|        5|        4|         0.0|97181c87a84cb6c70...|
|         28|        2|        1|         0.0|2148f1da1ac29711e...|
|         29|       10|       20|       0.484|7a3cb2869ccb010f0...|
|         30|        4|        6|         1.0|faa092922b6078a79...|
|         33|        2|        1|         0.0|2148f1da1ac29711e...|
|         42|        3|        2|         0.0|0f43d8cdd43b0b787...|
|         48|        3|        2|         0.0|0f43d8cdd43b0b787...|
|         67|        3|        3|         1.0|7d2c307dbd866960f...|
| 8589934595|       22|       21|         0.0|194079a7950834715...|
| 8589934603|        9|        8|         0.0|39

In [None]:
#graphhash

In [93]:
graphstats.groupby("graphhash").count().show(1000)

+--------------------+-----+
|           graphhash|count|
+--------------------+-----+
|2c9d2efbc10596f86...|    2|
|fd7b942801eab9c07...|    1|
|f0d2dee88390f7b79...|    1|
|80a317e826601bdf0...|   12|
|bd8aa380863bb666c...|    1|
|4b8e9152b1670e620...|    1|
|6b04a2a91f3774858...|    1|
|faa092922b6078a79...|   24|
|ae74c38ca2f4f8315...|    4|
|235d3c6d6788b8177...|    3|
|12fa5d82d662c5c4a...|    1|
|5b6a1a47075309676...|   19|
|7d2c307dbd866960f...|   19|
|97181c87a84cb6c70...|   21|
|194079a7950834715...|    1|
|0b2ea0d3ac8492384...|    3|
|b52776320ff5b34f9...|    1|
|563977067cb3597a1...|    1|
|faebc76801aebb1c2...|    1|
|d1e4fb20bbef95a7c...|    2|
|7befbc1a93e06d1ba...|    4|
|0c805e848b7946de1...|    2|
|35e7736b330516c10...|    1|
|21d3663e77b1240fa...|    1|
|7a3cb2869ccb010f0...|    1|
|0f43d8cdd43b0b787...|   32|
|e6216e307d2520d5f...|    1|
|54fcfbc89e9ac5c75...|    1|
|73404bfaec40a29da...|    2|
|a63bc051387f5baf6...|    6|
|142a012e019943e2b...|    1|
|10fadd1e1c9ab

In [94]:
from splink_graph.vectorised import edgebetweeness


In [95]:
ebdf = edgebetweeness(edge_df, src="unique_id_l", dst="unique_id_r")
ebdf.show()

+---+---+-----------+---------+
|src|dst|         eb|component|
+---+---+-----------+---------+
|107|827|        0.5|        2|
|107|826|        0.5|        2|
|107|243|        0.5|        2|
|226|228| 0.33333334|       12|
|226|227| 0.33333334|       12|
|228|227| 0.33333334|       12|
|391|727|        0.4|       26|
|391|726|        0.4|       26|
|391|723|        0.4|       26|
|391|393|        0.4|       26|
|409|411|        1.0|       28|
| 43|860|0.044444446|       29|
| 43|859|0.044444446|       29|
| 43|858|0.044444446|       29|
| 43|855|0.044444446|       29|
| 43| 37| 0.06666667|       29|
| 43| 39|0.044444446|       29|
|860| 37| 0.08888889|       29|
|860| 39| 0.06666667|       29|
|859| 37| 0.08888889|       29|
+---+---+-----------+---------+
only showing top 20 rows



In [96]:

edge_df=edge_df.drop("component")

joined_edgedf= edge_df.join(ebdf, (ebdf.src == edge_df.unique_id_l) & (ebdf.dst==edge_df.unique_id_r)  
                         &  ( edge_df.estimated_group== ebdf.component)
                    )

In [97]:
#ready to plot
joined_edgedf.printSchema()


root
 |-- tf_adjusted_match_prob: double (nullable = true)
 |-- match_probability: double (nullable = true)
 |-- surname_l: string (nullable = true)
 |-- surname_r: string (nullable = true)
 |-- dob_l: string (nullable = true)
 |-- dob_r: string (nullable = true)
 |-- city_l: string (nullable = true)
 |-- city_r: string (nullable = true)
 |-- email_l: string (nullable = true)
 |-- email_r: string (nullable = true)
 |-- l_info: string (nullable = false)
 |-- r_info: string (nullable = false)
 |-- estimated_group: long (nullable = true)
 |-- unique_id_l: string (nullable = true)
 |-- unique_id_r: string (nullable = true)
 |-- distance: double (nullable = true)
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- eb: float (nullable = true)
 |-- component: long (nullable = true)



In [None]:
#currently not working

In [98]:
from splink_graph.vectorised import bridge_edges

In [99]:
edge_df.printSchema()

root
 |-- tf_adjusted_match_prob: double (nullable = true)
 |-- match_probability: double (nullable = true)
 |-- surname_l: string (nullable = true)
 |-- surname_r: string (nullable = true)
 |-- dob_l: string (nullable = true)
 |-- dob_r: string (nullable = true)
 |-- city_l: string (nullable = true)
 |-- city_r: string (nullable = true)
 |-- email_l: string (nullable = true)
 |-- email_r: string (nullable = true)
 |-- l_info: string (nullable = false)
 |-- r_info: string (nullable = false)
 |-- estimated_group: long (nullable = true)
 |-- unique_id_l: string (nullable = true)
 |-- unique_id_r: string (nullable = true)
 |-- distance: double (nullable = true)



In [100]:
bridge_edges_to_sample_from = bridge_edges(edge_df,src="unique_id_l",dst="unique_id_r",weight="tf_adjusted_match_prob",component="estimated_group")

In [101]:
bridge_edges_to_sample_from.show()

Py4JJavaError: An error occurred while calling o13973.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2573.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2573.0 (TID 8446, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/theodoremanassis/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/Users/theodoremanassis/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/theodoremanassis/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 290, in dump_stream
    for series in iterator:
  File "<string>", line 1, in <lambda>
  File "/Users/theodoremanassis/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 125, in wrapped
    "Expected: {} Actual: {}".format(len(return_type), len(result.columns)))
RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: 5 Actual: 16

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage20.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.GeneratedMethodAccessor180.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/theodoremanassis/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/Users/theodoremanassis/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/theodoremanassis/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 290, in dump_stream
    for series in iterator:
  File "<string>", line 1, in <lambda>
  File "/Users/theodoremanassis/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 125, in wrapped
    "Expected: {} Actual: {}".format(len(return_type), len(result.columns)))
RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: 5 Actual: 16

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage20.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
