In [63]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, udf, expr

spark = SparkSession.builder.appName("lab3").getOrCreate()

In [64]:
df1 = spark.read.option('recursiveFileLookup', True).csv("donation/*")

In [65]:
df1.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)



In [69]:
df1.show(2)

+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|  _c0|  _c1|              _c2|         _c3|         _c4|         _c5|    _c6|   _c7|   _c8|   _c9|   _c10|    _c11|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
|37291|53113|0.833333333333333|           ?|           1|           ?|      1|     1|     1|     1|      0|    TRUE|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
only showing top 2 rows



In [70]:
from pyspark.sql.types import ArrayType, StringType


# Define a UDF for lowercasing each element in the array
lowercase_udf = udf(lambda tokens: [token.lower() for token in tokens], ArrayType(StringType()))

# Apply the UDF to the lname_tokens column
df = df.withColumn("lname_tokens", lowercase_udf(col("lname_tokens")))

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `lname_tokens` cannot be resolved. Did you mean one of the following? [`cmp_sex`, `cmp_bd`, `cmp_bm`, `cmp_by`, `cmp_lname_c1`].;
'Project [id_1#13918, id_2#13919, cmp_fname_c1#13920, cmp_fname_c2#13921, cmp_lname_c1#13922, cmp_lname_c2#13923, cmp_sex#13924, cmp_bd#13925, cmp_bm#13926, cmp_by#13927, cmp_plz#13928, is_match#13929, <lambda>('lname_tokens)#14403 AS lname_tokens#14404]
+- Relation [id_1#13918,id_2#13919,cmp_fname_c1#13920,cmp_fname_c2#13921,cmp_lname_c1#13922,cmp_lname_c2#13923,cmp_sex#13924,cmp_bd#13925,cmp_bm#13926,cmp_by#13927,cmp_plz#13928,is_match#13929] csv


In [None]:
from pyspark.sql.functions import col
df.groupBy("is_match").count().orderBy(col("count").desc()).show()

df.createOrReplaceTempView("linkage")

spark.sql("""
    SELECT is_match, COUNT(*) cnt
    FROM linkage
    GROUP BY is_match
    ORDER BY cnt DESC
""").show()

summary = df.describe()
summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()

matches = df.where("is_match = true")
match_summary = matches.describe()
misses = df.filter(col("is_match") == False)
miss_summary = misses.describe()

In [None]:
from pyspark.sql.types import DoubleType

def pivot_summary(desc):
    desc_p = desc.toPandas()
    desc_p = desc_p.set_index('summary').transpose().reset_index()
    desc_p = desc_p.rename(columns={'index':'field'})
    desc_p = desc_p.rename_axis(None, axis=1)
    descT = spark.createDataFrame(desc_p)
    for c in descT.columns:
        if c == 'field':
            continue
        else:
            descT = descT.withColumn(c, descT[c].cast(DoubleType()))
    return descT

match_summaryT = pivot_summary(match_summary)
miss_summaryT = pivot_summary(miss_summary)
good_features = ["cmp_lname_c1", "cmp_plz", "cmp_by", "cmp_bd", "cmp_bm"]
sum_expression = " + ".join(good_features)

# Evaluate precision, recall, and F1-score
scored = df.fillna(0, subset=good_features).\
                withColumn('score', expr(sum_expression)).\
                select('score', 'is_match')

scored.show()

def crossTabs(scored: DataFrame, t: float) -> DataFrame:
    return scored.selectExpr(f"score >= {t} as above", "is_match").\
        groupBy("above").pivot("is_match", ("true", "false")).\
        count()

In [None]:
confused = crossTabs(scored, 4.0)
confused.show()

confused2 = crossTabs(scored, 2.0)
confused2.show()

tp = confused.filter("above = true").select("true").collect()[0].true
fp = confused.filter("above = true").select("false").collect()[0].false
fn = confused.filter("above = false").select("true").fillna(0).collect()[0].true
tn = confused.filter("above = false").select("false").collect()[0].false

In [None]:
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1_score = (2 * precision * recall) / (precision + recall)
   
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1_score}")

In [None]:
spark.stop()