# Using normalization, tokenization and stop words remover for entity resolution

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, lower, regexp_replace
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.feature import Tokenizer, StopWordsRemover
import re

spark = SparkSession.builder.appName('entity resolution').getOrCreate()
df = spark.read.csv('customers-100.csv', header = True)

# creating udfs
def normalize_text(text):
    text = text.lower()
    # note
    text = re.sub(r'[^\w\s]', '', text)
    return text
normal_udf = udf(normalize_text, StringType())

df = df.withColumn('Index clean', normal_udf(col('Index')))
df = df.withColumn('Customer Id clean', normal_udf(col('Customer Id')))
df = df.withColumn('First Name clean', normal_udf(col('First Name')))
df = df.withColumn('Last Name clean', normal_udf(col('Last Name')))
df = df.withColumn('Company clean', normal_udf(col('Company')))
df = df.withColumn('City clean', normal_udf(col('City')))
df = df.withColumn('Country clean', normal_udf(col('Country')))
df = df.withColumn('Phone 1 clean', normal_udf(col('Phone 1')))
df = df.withColumn('Phone 2 clean', normal_udf(col('Phone 2')))
df = df.withColumn('Email clean', normal_udf(col('Email')))
df = df.withColumn('Subscription Date clean', normal_udf(col('Subscription Date')))
df = df.withColumn('Website clean', normal_udf(col('Website')))

# Tokenization
tokenizer = Tokenizer(inputCol= 'First Name clean', outputCol='tokens')
df_tokenized = tokenizer.transform(df)

# remove stop words
stop_rem = StopWordsRemover(inputCol= 'tokens', outputCol='filtered_tokens')
df_filtered = stop_rem.transform(df_tokenized)

df_filtered.show()
spark.stop()

24/10/05 08:07:22 WARN Utils: Your hostname, DESKTOP-2J74AJH resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/10/05 08:07:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/05 08:07:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/05 08:07:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/10/05 08:07:24 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/10/05 08:07:30 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-----+---------------+----------+---------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+-----------+-----------------+----------------+---------------+--------------------+-----------------+--------------------+-------------------+------------------+--------------------+-----------------------+--------------------+----------+---------------+
|Index|    Customer Id|First Name|Last Name|             Company|             City|             Country|             Phone 1|             Phone 2|               Email|Subscription Date|             Website|Index clean|Customer Id clean|First Name clean|Last Name clean|       Company clean|       City clean|       Country clean|      Phone 1 clean|     Phone 2 clean|         Email clean|Subscription Date clean|       Website clean|    tokens|filtered_tokens|
+-----+---------------+----------+---------+--------------------+-----------

# UDF usage for entity resolution

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, udf
from pyspark.sql.types import FloatType, ArrayType, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover
import re

spark = SparkSession.builder.appName('entity resolution').getOrCreate()
df = spark.read.csv('customers-100.csv', header = True)

def metric(x, y):
    x, y = set(x), set(y)
    u = x.union(y)
    i = x.intersection(y)
    if len(i) == 0:
        return 0.0
    else:
        return len(i) / len(u)
metrix_udf = udf(metric, FloatType())

def list_convert(t):
    return list(t)
convert = udf(list_convert, ArrayType(StringType()))

df = df.withColumn("Phone 1 Set", convert(col("Phone 1")))
df = df.withColumn("Phone 2 Set", convert(col("Phone 2")))
df = df.withColumn("metric", metrix_udf(col("Phone 1 Set"), col("Phone 2 Set")))

df.select('Phone 1', 'Phone 2', 'metric').show()
spark.stop()

24/10/05 08:07:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/10/05 08:07:32 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

+--------------------+--------------------+----------+
|             Phone 1|             Phone 2|    metric|
+--------------------+--------------------+----------+
|        229.077.5154|    397.884.0519x718| 0.6363636|
|          5153435776|    686-620-1820x944|      0.25|
|     +1-539-402-0259| (496)978-3969x58947|0.33333334|
|001-808-617-6467x...|     +1-813-324-8756|0.61538464|
|001-234-203-0635x...|001-199-446-3860x...| 0.5833333|
| (283)437-3886x88321|        999-728-1637| 0.5833333|
|  (496)452-6181x3291|+1-247-266-0963x4995|       0.6|
|001-583-352-7197x297|    001-333-145-0369|       0.5|
|   854-138-4911x5772| +1-448-910-2276x729|0.61538464|
|    739.218.2516x459|001-054-401-0347x617|0.53846157|
|    637-854-0256x825|    114.336.0784x788| 0.5833333|
|       (041)737-3846|+1-556-888-3485x4...|       0.5|
|    001-949-844-8787|       (855)713-8773|0.36363637|
|  786-284-3358x62152|+1-315-627-1796x8074| 0.7692308|
|  (781)861-7180x8306|        207-185-3665| 0.5833333|
|    540.0

# Confusion matrix creation

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when

# Initialize Spark session
spark = SparkSession.builder \
    .appName("EntityResolutionEvaluation") \
    .getOrCreate()

# Sample data: (id1, id2, true_label, predicted_label)
data = [(1, 2, 1, 1),  # 1: True Positive
        (1, 3, 1, 0),  # 2: False Negative
        (2, 3, 0, 1),  # 3: False Positive
        (2, 4, 0, 0)]  # 4: True Negative

df = spark.createDataFrame(data, ["id1", "id2", "true_label", "predicted_label"])

# Calculate confusion matrix components
# spark needs proper brackets for boolean and & like C
confusion_matrix = df.groupBy().agg(
    count(when((col("true_label") == 1) & (col("predicted_label") == 1), 1)).alias("TP"),  # True Positives
    count(when((col("true_label") == 1) & (col("predicted_label") == 0), 1)).alias("FN"),  # False Negatives
    count(when((col("true_label") == 0) & (col("predicted_label") == 1), 1)).alias("FP"),  # False Positives
    count(when((col("true_label") == 0) & (col("predicted_label") == 0), 1)).alias("TN")   # True Negatives
)

# Calculate Precision, Recall, F1-Score
metrics = confusion_matrix.select(
    (col("TP") / (col("TP") + col("FP"))).alias("Precision"),
    (col("TP") / (col("TP") + col("FN"))).alias("Recall"),
    (2 * col("TP") / (2 * col("TP") + col("FP") + col("FN"))).alias("F1_Score")
)

# Show metrics
metrics.show(truncate=False)

# Stop the Spark session
spark.stop()


24/10/05 08:07:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/10/05 08:07:34 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

+---------+------+--------+
|Precision|Recall|F1_Score|
+---------+------+--------+
|0.5      |0.5   |0.5     |
+---------+------+--------+

