# Entity Resolution Application using PySpark

### Lab Exercises:

1) Develop a PySpark script to clean and preprocess data before performing entity resolution. Include steps like tokenization and normalization.

2) Implement a PySpark program that computes similarity scores between records using achosen similarity metric.

3) Implement a PySpark program to evaluate the precision, recall, and F1-score of an entity resolution model.

### Formulas:

Extract donations folder  :    spark.read.options("RecursiveFileLookup","True").csv("path to folder")


precision = TP/(TP+FP)

recall = TP/(TP+FN)

F1-score = 2*Precision *recall / (precision+recall)
 

TP | FN

FP | TN

In [1]:
import pyspark
import os
import sys
from pyspark import SparkContext
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.config("spark.driver.memory", "16g").appName('chapter_2').getOrCreate()



In [3]:
prev = spark.read.csv("/home/lplab/Desktop/210962069/BDAL/lab3/donation/frequencies.csv")
prev

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string]

In [4]:
prev.show(2)

+--------------------+-------------------+--------------------+--------------------+-------+-----------------+------------------+-------------------+--------------------+
|                 _c0|                _c1|                 _c2|                 _c3|    _c4|              _c5|               _c6|                _c7|                 _c8|
+--------------------+-------------------+--------------------+--------------------+-------+-----------------+------------------+-------------------+--------------------+
|        cmp_fname_c1|       cmp_fname_c2|        cmp_lname_c1|        cmp_lname_c2|cmp_sex|           cmp_bd|            cmp_bm|             cmp_by|             cmp_plz|
|0.000235404896421846|0.00147710487444609|2.68694413843136e-05|0.000641025641025641|    0.5|0.032258064516129|0.0833333333333333|0.00943396226415094|0.000422654268808115|
+--------------------+-------------------+--------------------+--------------------+-------+-----------------+------------------+----------------

In [5]:
prev = spark.read.options(recursiveFileLookup="True").csv("donation/block_1.csv")
prev

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string]

In [6]:
prev.count()

574914

In [7]:
prev.show()

+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|  _c0|  _c1|              _c2|         _c3|         _c4|         _c5|    _c6|   _c7|   _c8|   _c9|   _c10|    _c11|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
|37291|53113|0.833333333333333|           ?|           1|           ?|      1|     1|     1|     1|      0|    TRUE|
|39086|47614|                1|           ?|           1|           ?|      1|     1|     1|     1|      1|    TRUE|
|70031|70237|                1|           ?|           1|           ?|      1|     1|     1|     1|      1|    TRUE|
|84795|97439|                1|           ?|           1|           ?|      1|     1|     1|     1|      1|    TRUE|
|36950|42116|                1|           ?|           1|       

In [8]:
prev.dtypes

[('_c0', 'string'),
 ('_c1', 'string'),
 ('_c2', 'string'),
 ('_c3', 'string'),
 ('_c4', 'string'),
 ('_c5', 'string'),
 ('_c6', 'string'),
 ('_c7', 'string'),
 ('_c8', 'string'),
 ('_c9', 'string'),
 ('_c10', 'string'),
 ('_c11', 'string')]

In [9]:
parsed = spark.read.option("header", "true").option("nullValue", "?").\
option("inferSchema", "true").csv("donation/block_1.csv")

### Analyzing dataframe with API

In [10]:
parsed.printSchema()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: double (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: integer (nullable = true)
 |-- cmp_bm: integer (nullable = true)
 |-- cmp_by: integer (nullable = true)
 |-- cmp_plz: integer (nullable = true)
 |-- is_match: boolean (nullable = true)



In [11]:
parsed.show()

+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|37291|53113|0.833333333333333|        null|         1.0|        null|      1|     1|     1|     1|      0|    true|
|39086|47614|              1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|70031|70237|              1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|84795|97439|              1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|36950|42116|              1.0|        null|         1.0|         1.0|      1|     1|     1|     1|      1|    true|
|42413|48491|              1.0|        null|         1.0|       

In [12]:
parsed.cache()

DataFrame[id_1: int, id_2: int, cmp_fname_c1: double, cmp_fname_c2: double, cmp_lname_c1: double, cmp_lname_c2: double, cmp_sex: int, cmp_bd: int, cmp_bm: int, cmp_by: int, cmp_plz: int, is_match: boolean]

In [13]:
from pyspark.sql.functions import col
parsed.groupBy("is_match").count().orderBy(col("count").desc()).show()

+--------+------+
|is_match| count|
+--------+------+
|   false|572820|
|    true|  2093|
+--------+------+



In [14]:
parsed.createOrReplaceTempView("linkage")

In [15]:
spark.sql("""
SELECT is_match, COUNT(*) cnt
FROM linkage
GROUP BY is_match
ORDER BY cnt DESC
""").show()

+--------+------+
|is_match|   cnt|
+--------+------+
|   false|572820|
|    true|  2093|
+--------+------+



### Fast summary statistics for df

In [16]:
summary = parsed.describe()
summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()

+-------+------------------+------------------+
|summary|      cmp_fname_c1|      cmp_fname_c2|
+-------+------------------+------------------+
|  count|            574811|             10325|
|   mean|0.7127592938253411|0.8977586763518969|
| stddev|0.3889286452463531|0.2742577520430532|
|    min|               0.0|               0.0|
|    max|               1.0|               1.0|
+-------+------------------+------------------+



In [17]:
matches = parsed.where("is_match = true")
match_summary = matches.describe()
misses = parsed.filter(col("is_match") == False)
miss_summary = misses.describe()

### Pivoting and Reshaping df

In [18]:
sp = summary.toPandas()
sp.shape

(5, 12)

In [19]:
sp.head()

Unnamed: 0,summary,id_1,id_2,cmp_fname_c1,cmp_fname_c2,cmp_lname_c1,cmp_lname_c2,cmp_sex,cmp_bd,cmp_bm,cmp_by,cmp_plz
0,count,574913.0,574913.0,574811.0,10325.0,574913.0,239.0,574913.0,574851.0,574851.0,574851.0,573618.0
1,mean,33271.962171667714,66564.6636865056,0.7127592938253411,0.8977586763518969,0.3155724578100624,0.3269155414552904,0.9550923357099248,0.224755632329073,0.4886361857246487,0.2226663952919974,0.0054949461139643
2,stddev,23622.66942593376,23642.00230967228,0.3889286452463531,0.2742577520430532,0.3342494687554245,0.3783092020540671,0.2071015224050444,0.4174216587235557,0.4998712818281637,0.4160365041645591,0.0739240232130197
3,min,1.0,6.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,max,99894.0,100000.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


In [20]:
sp = sp.set_index('summary').transpose().reset_index()

sp = sp.rename(columns={'index':'field'})

sp = sp.rename_axis(None, axis=1)

sp.shape

(11, 6)

In [21]:
sp2 =  spark.createDataFrame(sp)
sp2

DataFrame[field: string, count: string, mean: string, stddev: string, min: string, max: string]

In [22]:
sp2.printSchema()

root
 |-- field: string (nullable = true)
 |-- count: string (nullable = true)
 |-- mean: string (nullable = true)
 |-- stddev: string (nullable = true)
 |-- min: string (nullable = true)
 |-- max: string (nullable = true)



In [24]:
from pyspark.sql.types import DoubleType

for c in sp2.columns:
    if c == 'field':
        continue
    sp2 = sp2.withColumn(c, sp2[c].cast(DoubleType()))

sp2.printSchema()

root
 |-- field: string (nullable = true)
 |-- count: double (nullable = true)
 |-- mean: double (nullable = true)
 |-- stddev: double (nullable = true)
 |-- min: double (nullable = true)
 |-- max: double (nullable = true)



In [25]:
from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType

def pivot_summary(desc):
    # convert to pandas dataframe
    dp = desc.toPandas()
    # transpose
    dp = dp.set_index('summary').transpose().reset_index()
    dp = dp.rename(columns={'index':'field'})
    dp = dp.rename_axis(None, axis=1)
    # convert to Spark dataframe
    descT = spark.createDataFrame(dp)
    # convert metric columns to double from string
    for c in descT.columns:
        if c == 'field':
            continue
        else:
            descT = descT.withColumn(c, descT[c].cast(DoubleType()))
        return descT

In [26]:
match_sp2 = pivot_summary(match_summary)
miss_sp2 = pivot_summary(miss_summary)

### Joining DataFrames and Selecting Features

In [27]:
match_sp2.createOrReplaceTempView("match_desc")
miss_sp2.createOrReplaceTempView("miss_desc")
spark.sql("""
SELECT a.field, a.count + b.count total, a.mean - b.mean delta
FROM match_desc a INNER JOIN miss_desc b ON a.field = b.field
WHERE a.field NOT IN ("id_1", "id_2")
ORDER BY delta DESC, total DESC
""")

DataFrame[field: string, total: double, delta: double]

### Scoring and Model Evaluation

In [28]:
good_features = ["cmp_lname_c1", "cmp_plz", "cmp_by", "cmp_bd", "cmp_bm"]
...
sum_expression = " + ".join(good_features)
...
sum_expression

'cmp_lname_c1 + cmp_plz + cmp_by + cmp_bd + cmp_bm'

In [29]:
from pyspark.sql.functions import expr
scored = parsed.fillna(0, subset=good_features).\
withColumn('score', expr(sum_expression)).\
select('score', 'is_match')
...
scored.show()

+-----+--------+
|score|is_match|
+-----+--------+
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
+-----+--------+
only showing top 20 rows



In [36]:
def crossTabs(scored: DataFrame, t: DoubleType) -> DataFrame:
    return scored.selectExpr(f"score >= {t} as above", "is_match").groupBy("above").pivot("is_match", ("true", "false")).count()

crossTabs(scored, 4.0).show()

+-----+----+------+
|above|true| false|
+-----+----+------+
| true|2087|    66|
|false|   6|572754|
+-----+----+------+



In [37]:
crossTabs(scored, 2.0).show()

+-----+----+------+
|above|true| false|
+-----+----+------+
| true|2093| 59729|
|false|null|513091|
+-----+----+------+



precision = TP/(TP+FP)

recall = TP/(TP+FN)

F1-score = 2*Precision *recall / (precision+recall)
 

TP | FN

FP | TN

In [42]:
report = crossTabs(scored, 4.0).collect()
print(report)

[Row(above=True, true=2087, false=66), Row(above=False, true=6, false=572754)]


In [47]:
TP = report[0][1]
FN = report[0][2]
FP = report[1][1]
TN = report[1][2]

In [50]:
precision = TP/(TP+FP)
recall = TP/(TP+FN)
F1 = 2*precision*recall/(precision+recall)

print("Precision: {} \nRecall: {} \nF1-score: {}".format(precision, recall, F1))

Precision: 0.9971333014811276 
Recall: 0.9693450998606595 
F1-score: 0.9830428638718794
