<a href="https://colab.research.google.com/github/Brent-Morrison/Misc_scripts/blob/master/record_linkage_match_key.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Introduction

This notebook will document a methodology for deriving a match key for records identified as duplicates.  The challenge is in situations where more than two records are matched.  I.e., where record linking suggests three records belong to the one entity.

For example, consider that record 1 is matched to record 2, and record 2 is matched to record 3.

This will be implemented in Spark 2.4.7.
<br>
<br>
### Set up



```
# This is formatted as code
```

#### PySpark installations

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-eu.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark
!pip install pyarrow



Set the environment variables so that Colab can find Spark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

Add PySpark to sys.path

PySpark isn't on sys.path by default, but that doesn't mean it can't be used as a regular library. You can address this by either symlinking pyspark into your site-packages, or adding pyspark to sys.path at runtime. [findspark](https://github.com/minrk/findspark) does the latter.

In [3]:
import findspark
findspark.init()

Create the Spark session

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Prior to using PySpark we need the required classses from the PySpark sql module.

In [5]:
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql import Column as C
from pyspark.sql import GroupedData as G
from pyspark.sql import DataFrame
from pyspark.sql.types import *

### Solution

#### Create mock data in a PySpark data frame

In [6]:
m1 = spark.createDataFrame(
  [
    (1,2)
    ,(2,3)
    ,(2,3) # There may be dupes here if different tests trigger the same combination
    ,(4,5)
    ,(6,7)
    ,(7,8)
    ,(8,9)
  ],
  ['ecid_1', 'ecid_2', ] 
)

m1.show()
m1.printSchema()

+------+------+
|ecid_1|ecid_2|
+------+------+
|     1|     2|
|     2|     3|
|     2|     3|
|     4|     5|
|     6|     7|
|     7|     8|
|     8|     9|
+------+------+

root
 |-- ecid_1: long (nullable = true)
 |-- ecid_2: long (nullable = true)



### Create the match key  

This is done with an array type column.

In [7]:
m2 = m1.withColumn('match_key1', F.array(F.col('ecid_1'),F.col('ecid_2')))
m2.show(truncate=False)

+------+------+----------+
|ecid_1|ecid_2|match_key1|
+------+------+----------+
|1     |2     |[1, 2]    |
|2     |3     |[2, 3]    |
|2     |3     |[2, 3]    |
|4     |5     |[4, 5]    |
|6     |7     |[6, 7]    |
|7     |8     |[7, 8]    |
|8     |9     |[8, 9]    |
+------+------+----------+



#### Union

In [8]:
m3 = m2.select(F.col('ecid_1'),F.col('match_key1')).unionAll(m2.select(F.col('ecid_2'),F.col('match_key1')))
m3.show(truncate=False)

+------+----------+
|ecid_1|match_key1|
+------+----------+
|1     |[1, 2]    |
|2     |[2, 3]    |
|2     |[2, 3]    |
|4     |[4, 5]    |
|6     |[6, 7]    |
|7     |[7, 8]    |
|8     |[8, 9]    |
|2     |[1, 2]    |
|3     |[2, 3]    |
|3     |[2, 3]    |
|5     |[4, 5]    |
|7     |[6, 7]    |
|8     |[7, 8]    |
|9     |[8, 9]    |
+------+----------+



#### Explode

In [9]:
m4 = m3.withColumn('match_key1', F.explode('match_key1'))
m4.show(truncate=False)

+------+----------+
|ecid_1|match_key1|
+------+----------+
|1     |1         |
|1     |2         |
|2     |2         |
|2     |3         |
|2     |2         |
|2     |3         |
|4     |4         |
|4     |5         |
|6     |6         |
|6     |7         |
|7     |7         |
|7     |8         |
|8     |8         |
|8     |9         |
|2     |1         |
|2     |2         |
|3     |2         |
|3     |3         |
|3     |2         |
|3     |3         |
+------+----------+
only showing top 20 rows



#### Collect to array

In [10]:
m5 = (m4
  .groupBy('ecid_1').agg(F.collect_list(F.col('match_key1')).alias('match_key1'))
  .withColumn('match_key1', F.array_distinct('match_key1'))
  .sort(F.col('ecid_1'))
  )
m5.show(truncate=False)

+------+----------+
|ecid_1|match_key1|
+------+----------+
|1     |[1, 2]    |
|2     |[2, 3, 1] |
|3     |[2, 3]    |
|4     |[4, 5]    |
|5     |[4, 5]    |
|6     |[6, 7]    |
|7     |[7, 8, 6] |
|8     |[8, 9, 7] |
|9     |[8, 9]    |
+------+----------+



#### Cross join

In [11]:
m6 = m5.crossJoin(m5.withColumnRenamed('ecid_1','ecid_2').withColumnRenamed('match_key1','match_key2'))
m6.show(truncate=False)

+------+----------+------+----------+
|ecid_1|match_key1|ecid_2|match_key2|
+------+----------+------+----------+
|1     |[1, 2]    |1     |[1, 2]    |
|1     |[1, 2]    |2     |[2, 3, 1] |
|1     |[1, 2]    |3     |[2, 3]    |
|1     |[1, 2]    |4     |[4, 5]    |
|1     |[1, 2]    |5     |[4, 5]    |
|1     |[1, 2]    |6     |[6, 7]    |
|1     |[1, 2]    |7     |[7, 8, 6] |
|1     |[1, 2]    |8     |[8, 9, 7] |
|1     |[1, 2]    |9     |[8, 9]    |
|2     |[2, 3, 1] |1     |[1, 2]    |
|2     |[2, 3, 1] |2     |[2, 3, 1] |
|2     |[2, 3, 1] |3     |[2, 3]    |
|2     |[2, 3, 1] |4     |[4, 5]    |
|2     |[2, 3, 1] |5     |[4, 5]    |
|2     |[2, 3, 1] |6     |[6, 7]    |
|2     |[2, 3, 1] |7     |[7, 8, 6] |
|2     |[2, 3, 1] |8     |[8, 9, 7] |
|2     |[2, 3, 1] |9     |[8, 9]    |
|3     |[2, 3]    |1     |[1, 2]    |
|3     |[2, 3]    |2     |[2, 3, 1] |
+------+----------+------+----------+
only showing top 20 rows



#### Filter for overlaps

In [12]:
m7 = (m6
  .withColumn('ovelap', F.arrays_overlap('match_key1','match_key2'))
  .filter(F.col('ovelap') == True)
  .withColumn('match_key',F.array_union(F.col('match_key1'),F.col('match_key2')))
  .withColumn('match_key', F.array_sort('match_key'))
  .withColumn('match_key_size',F.size(F.col('match_key')))
)
m7.show(truncate=False)

+------+----------+------+----------+------+------------+--------------+
|ecid_1|match_key1|ecid_2|match_key2|ovelap|match_key   |match_key_size|
+------+----------+------+----------+------+------------+--------------+
|1     |[1, 2]    |1     |[1, 2]    |true  |[1, 2]      |2             |
|1     |[1, 2]    |2     |[2, 3, 1] |true  |[1, 2, 3]   |3             |
|1     |[1, 2]    |3     |[2, 3]    |true  |[1, 2, 3]   |3             |
|2     |[2, 3, 1] |1     |[1, 2]    |true  |[1, 2, 3]   |3             |
|2     |[2, 3, 1] |2     |[2, 3, 1] |true  |[1, 2, 3]   |3             |
|2     |[2, 3, 1] |3     |[2, 3]    |true  |[1, 2, 3]   |3             |
|3     |[2, 3]    |1     |[1, 2]    |true  |[1, 2, 3]   |3             |
|3     |[2, 3]    |2     |[2, 3, 1] |true  |[1, 2, 3]   |3             |
|3     |[2, 3]    |3     |[2, 3]    |true  |[2, 3]      |2             |
|4     |[4, 5]    |4     |[4, 5]    |true  |[4, 5]      |2             |
|4     |[4, 5]    |5     |[4, 5]    |true  |[4, 5] 

#### Filter to derive final table


In [13]:
w1 = W.partitionBy('ecid_1','match_key_size').orderBy('ecid_2')
w2 = W.partitionBy('ecid_1')

m8 = (m7
  .withColumn('rank', F.rank().over(w1))
  .withColumn('max_size', F.max(F.col('match_key_size')).over(w2))
  .filter(
    (F.col('match_key_size') == F.col('max_size')) &
    (F.col('rank') == F.lit(1))
    )
  .select(F.col('ecid_1').alias('ecid'),F.col('match_key'))
  .sort(F.col('ecid_1'))#,F.col('rank'))
) 
m8.show(truncate=False)

+----+------------+
|ecid|match_key   |
+----+------------+
|1   |[1, 2, 3]   |
|2   |[1, 2, 3]   |
|3   |[1, 2, 3]   |
|4   |[4, 5]      |
|5   |[4, 5]      |
|6   |[6, 7, 8, 9]|
|7   |[6, 7, 8, 9]|
|8   |[6, 7, 8, 9]|
|9   |[6, 7, 8, 9]|
+----+------------+



### In one step 


In [14]:
m9 = spark.createDataFrame(
  [
    (1,2)
    ,(2,3)
    ,(2,3) # There may be dupes here if different tests trigger the same combination
    ,(4,5)
    ,(6,7)
    ,(7,8)
    ,(8,9)
  ],
  ['ecid_1','ecid_2',] 
)

m9.show(truncate=False)

+------+------+
|ecid_1|ecid_2|
+------+------+
|1     |2     |
|2     |3     |
|2     |3     |
|4     |5     |
|6     |7     |
|7     |8     |
|8     |9     |
+------+------+



In [15]:
m10 = (
  m9.withColumn('match_key1', F.array(F.col('ecid_1'),F.col('ecid_2'))).select(F.col('ecid_1'),F.col('match_key1'))
  .unionAll(
    m9.withColumn('match_key1', F.array(F.col('ecid_1'),F.col('ecid_2'))).select(F.col('ecid_2'),F.col('match_key1'))
  )
  .withColumn('match_key1', F.explode('match_key1'))
  .groupBy('ecid_1').agg(F.collect_list(F.col('match_key1')).alias('match_key1'))
  .withColumn('match_key1', F.array_distinct('match_key1'))
  
  .crossJoin(
    m9.withColumn('match_key1', F.array(F.col('ecid_1'),F.col('ecid_2'))).select(F.col('ecid_1'),F.col('match_key1'))
    .unionAll(
      m9.withColumn('match_key1', F.array(F.col('ecid_1'),F.col('ecid_2'))).select(F.col('ecid_2'),F.col('match_key1'))
      )
    .withColumn('match_key1', F.explode('match_key1'))
    .groupBy('ecid_1').agg(F.collect_list(F.col('match_key1')).alias('match_key1'))
    .withColumn('match_key1', F.array_distinct('match_key1')).withColumnRenamed('ecid_1','ecid_2').withColumnRenamed('match_key1','match_key2')
    )
  
  .withColumn('overlap', F.arrays_overlap('match_key1','match_key2'))
  .filter(F.col('overlap') == True)
  .withColumn('match_key',F.array_union(F.col('match_key1'),F.col('match_key2')))
  .withColumn('match_key', F.array_sort('match_key'))
  .withColumn('match_key_size',F.size(F.col('match_key')))
  .withColumn('rank', F.rank().over(W.partitionBy('ecid_1','match_key_size').orderBy('ecid_2')))
  .withColumn('max_size', F.max(F.col('match_key_size')).over(W.partitionBy('ecid_1')))
  .filter(
    (F.col('match_key_size') == F.col('max_size')) &
    (F.col('rank') == F.lit(1))
    )
  .select(F.col('ecid_1').alias('ecid'),F.col('match_key'))
  .sort(F.col('ecid_1'))
)

m10.show(truncate=False)

+----+------------+
|ecid|match_key   |
+----+------------+
|1   |[1, 2, 3]   |
|2   |[1, 2, 3]   |
|3   |[1, 2, 3]   |
|4   |[4, 5]      |
|5   |[4, 5]      |
|6   |[6, 7, 8, 9]|
|7   |[6, 7, 8, 9]|
|8   |[6, 7, 8, 9]|
|9   |[6, 7, 8, 9]|
+----+------------+



Note that the array type column can be converted to a string with the ```concat_ws``` function.

This may be required if writing to a csv file or querying via an engine that does not support the array type.  
  
Impala for example (Hive is OK).

In [16]:
m11 = m10.withColumn('match_key_string', F.concat_ws('_',F.col('match_key')))

m11.show()
m11.printSchema()

+----+------------+----------------+
|ecid|   match_key|match_key_string|
+----+------------+----------------+
|   1|   [1, 2, 3]|           1_2_3|
|   2|   [1, 2, 3]|           1_2_3|
|   3|   [1, 2, 3]|           1_2_3|
|   4|      [4, 5]|             4_5|
|   5|      [4, 5]|             4_5|
|   6|[6, 7, 8, 9]|         6_7_8_9|
|   7|[6, 7, 8, 9]|         6_7_8_9|
|   8|[6, 7, 8, 9]|         6_7_8_9|
|   9|[6, 7, 8, 9]|         6_7_8_9|
+----+------------+----------------+

root
 |-- ecid: long (nullable = true)
 |-- match_key: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- match_key_string: string (nullable = false)

