Code mostly copied from chapter 2 of _Advanced Analytics with PySpark_ 

To download the dataset 
```shell
$ mkdir linkage
$ cd linkage/
$ curl -L -o donation.zip https://bit.ly/1Aoywaq
$ unzip donation.zip
$ unzip 'block_*.zip'
```

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

24/08/24 01:09:16 WARN Utils: Your hostname, Ambujs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.86.0.75 instead (on interface en0)
24/08/24 01:09:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/24 01:09:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Question 1

Load the data

In [2]:
path = 'data/linkage/block*.csv'
df = spark.read.csv(path, header=True, nullValue='?', inferSchema=True)

                                                                                

Cache the DataFrame

["Explaining the mechanics of Spark caching"](https://luminousmen.com/post/explaining-the-mechanics-of-spark-caching) -- great article explaining why and how Spark DataFrames are cached

The first action run after `df.cache()` will take its time (including overhead of caching). All subsequent actions will be fast it seems.

In [3]:
df.cache()

%timeit -n 1 -r 1 df.count() # slow
%timeit -n 1 -r 1 df.count() # fast



3.59 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
55.5 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


                                                                                

Create summary statistics for matches and non-matches

In [4]:
from pyspark.sql.functions import col

def pivot(source_df):
    """Return a pivoted summary of `source_df`."""

    # convert to pandas DataFrame
    pd_df = source_df.toPandas()

    # pivot
    pd_df = pd_df.set_index('summary').transpose().reset_index()
    pd_df = pd_df.rename(columns={'index':'field'})

    # convert to Spark DataFrame
    result_df = spark.createDataFrame(pd_df)

    # convert non-index columns from string to double
    for c in result_df.columns:
        if c == 'field':
            continue
        result_df = result_df.withColumn(c, col(c).cast('double'))

    return result_df


In [5]:
match_stats = pivot(df.where(df.is_match == True).describe())
miss_stats = pivot(df.where(df.is_match == False).describe())

match_stats.show()
miss_stats.show()

24/08/24 01:09:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+------------+-------+------------------+--------------------+---+-------+
|       field|  count|              mean|              stddev|min|    max|
+------------+-------+------------------+--------------------+---+-------+
|        id_1|20931.0| 34575.72117911232|   21950.31285196913|5.0|99946.0|
|        id_2|20931.0| 51259.95939037791|   24345.73345377519|6.0|99996.0|
|cmp_fname_c1|20922.0|0.9973163859635038| 0.03650667584833679|0.0|    1.0|
|cmp_fname_c2| 1333.0|0.9898900320318176| 0.08251973727615237|0.0|    1.0|
|cmp_lname_c1|20931.0|0.9970152595958817|0.043118807533945126|0.0|    1.0|
|cmp_lname_c2|  475.0| 0.969370167843852| 0.15345280740388917|0.0|    1.0|
|     cmp_sex|20931.0| 0.987291577086618| 0.11201570591216435|0.0|    1.0|
|      cmp_bd|20925.0|0.9970848267622461| 0.05391487659807981|0.0|    1.0|
|      cmp_bm|20925.0|0.9979450418160095|0.045286127452170664|0.0|    1.0|
|      cmp_by|20925.0|0.9961290322580645| 0.06209804856731055|0.0|    1.0|
|     cmp_plz|20902.0|0.9

Compute how much each field differs for matches versus non-matches

In [6]:
match_stats.createOrReplaceTempView('match_stats')
miss_stats.createOrReplaceTempView('miss_stats')

query = '''
    select A.field, A.count + B.count as occurances, A.mean - B.mean as delta
    from match_stats A 
        inner join miss_stats B on A.field = B.field
    where A.field not in ('id_1', 'id_2')
    order by delta desc;
'''
spark.sql(query).show()

+------------+----------+--------------------+
|       field|occurances|               delta|
+------------+----------+--------------------+
|     cmp_plz| 5736289.0|  0.9563812499852176|
|cmp_lname_c2|    2464.0|  0.8064147192926266|
|      cmp_by| 5748337.0|  0.7762059675300512|
|      cmp_bd| 5748337.0|   0.775442311783404|
|cmp_lname_c1| 5749132.0|  0.6838772482594513|
|      cmp_bm| 5748337.0|  0.5109496938298685|
|cmp_fname_c1| 5748125.0|  0.2854529057459947|
|cmp_fname_c2|  103698.0| 0.09104268062280174|
|     cmp_sex| 5749132.0|0.032408185250332844|
+------------+----------+--------------------+



### Question 2

First select relevant features

What makes a good feature?
- Has different values for matches and non-matches
- Available most of the time (not null for most instances)

In [7]:
features = ["cmp_lname_c1", "cmp_plz", "cmp_by", "cmp_bd", "cmp_bm"]

In [8]:
from pyspark.sql.functions import expr

scored = df.fillna(0).\
            withColumn('score', expr(" + ".join(features))).\
            select('score', 'is_match')

scored.show()

+-----+--------+
|score|is_match|
+-----+--------+
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
+-----+--------+
only showing top 20 rows



### Question 3

In [9]:
def tabulate(threshold):
    scored.selectExpr(f'score >= {threshold} as above', 'is_match').\
        groupBy('above').pivot('is_match', ('true', 'false')).\
        count().\
        show()

In [10]:
for t in range(2, 5):
    print(f'threshold = {t}')
    tabulate(t)

threshold = 2
+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20931| 596414|
|false| NULL|5131787|
+-----+-----+-------+

threshold = 3
+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20916| 315213|
|false|   15|5412988|
+-----+-----+-------+

threshold = 4
+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20871|    637|
|false|   60|5727564|
+-----+-----+-------+



In [11]:
spark.stop()