In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, split

spark = SparkSession.builder.appName("EntityResolution").getOrCreate()

data = [("John Doe", "123 Main St"), ("john do", "123 main street"), ("Jane Smith", "456 Oak Ave")]
df = spark.createDataFrame(data, ["Name", "Address"])

df_cleaned = df.withColumn("NameTokens", split(lower(col("Name")), " ")).withColumn("AddressTokens", split(lower(col("Address")), " "))

df_cleaned.show(truncate=False)
   
spark.stop()




+----------+---------------+-------------+-------------------+
|Name      |Address        |NameTokens   |AddressTokens      |
+----------+---------------+-------------+-------------------+
|John Doe  |123 Main St    |[john, doe]  |[123, main, st]    |
|john do   |123 main street|[john, do]   |[123, main, street]|
|Jane Smith|456 Oak Ave    |[jane, smith]|[456, oak, ave]    |
+----------+---------------+-------------+-------------------+



In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from pyspark.sql.types import FloatType

spark = SparkSession.builder.appName("SimilarityScores").getOrCreate()

data = [("John Doe", ["apple", "orange", "banana"]),
        ("Jane Smith", ["banana", "grape", "kiwi"]),
        ("Bob Johnson", ["orange", "kiwi", "pear"])]

df = spark.createDataFrame(data, ["Name", "Tokens"])

# Convert the "Tokens" column to sets
df = df.withColumn("Tokens", col("Tokens").cast("array<string>").alias("Tokens"))

similarity_udf = lambda set1, set2: len(set(set1).intersection(set(set2))) / len(set(set1).union(set(set2))) if len(set(set1).union(set(set2))) > 0 else 0.0
jaccard_similarity_udf = spark.udf.register("jaccard_similarity", similarity_udf, FloatType())

pairwise_similarity = (df.alias("df1").crossJoin(df.alias("df2")).filter(col("df1.Name") < col("df2.Name"))
                      .withColumn("Similarity", jaccard_similarity_udf("df1.Tokens", "df2.Tokens"))
                      .select("df1.Name", "df2.Name", "Similarity"))

pairwise_similarity.show(truncate=False)

spark.stop()




+-----------+----------+----------+
|Name       |Name      |Similarity|
+-----------+----------+----------+
|Jane Smith |John Doe  |0.2       |
|Bob Johnson|John Doe  |0.2       |
|Bob Johnson|Jane Smith|0.2       |
+-----------+----------+----------+



In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from pyspark.sql.types import FloatType

spark = SparkSession.builder.appName("SimilarityScores").getOrCreate()

data = [("John Doe", ["apple", "orange", "banana"]),
        ("Jane Smith", ["banana", "grape", "kiwi"]),
        ("Bob Johnson", ["orange", "kiwi", "pear"])]

df = spark.createDataFrame(data, ["Name", "Tokens"])

# Convert the "Tokens" column to sets
df = df.withColumn("Tokens", col("Tokens").cast("array<string>").alias("Tokens"))

similarity_udf = lambda set1, set2: len(set(set1).intersection(set(set2))) / len(set(set1).union(set(set2))) if len(set(set1).union(set(set2))) > 0 else 0.0
jaccard_similarity_udf = spark.udf.register("jaccard_similarity", similarity_udf, FloatType())

pairwise_similarity = (df.alias("df1").crossJoin(df.alias("df2")).filter(col("df1.Name") < col("df2.Name"))
                      .withColumn("Similarity", jaccard_similarity_udf("df1.Tokens", "df2.Tokens"))
                      .select("df1.Name", "df1.Tokens", "df2.Name", "df2.Tokens", "Similarity"))

pairwise_similarity.show(truncate=False)

spark.stop()




+-----------+---------------------+----------+-----------------------+----------+
|Name       |Tokens               |Name      |Tokens                 |Similarity|
+-----------+---------------------+----------+-----------------------+----------+
|Jane Smith |[banana, grape, kiwi]|John Doe  |[apple, orange, banana]|0.2       |
|Bob Johnson|[orange, kiwi, pear] |John Doe  |[apple, orange, banana]|0.2       |
|Bob Johnson|[orange, kiwi, pear] |Jane Smith|[banana, grape, kiwi]  |0.2       |
+-----------+---------------------+----------+-----------------------+----------+



In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("EntityResolutionEvaluation").getOrCreate()

# Sample data (replace with your actual data)
ground_truth = [("John Doe", "john.doe@gmail.com", "123 Main St"),
                ("Jane Smith", "jane.smith@gmail.com", "456 Oak Ave")]

predicted = [("John Doe", "john.doe@gmail.com", "123 Main Street"),
              ("Jane Smith", "jane.smith@yahoo.com", "456 Oak Avenue"),
              ("Bob Johnson", "bob.johnson@gmail.com", "789 Pine St")]

ground_truth_df = spark.createDataFrame(ground_truth, ["Name", "Email", "Address"])
predicted_df = spark.createDataFrame(predicted, ["Name", "Email", "Address"])

# Join predicted and ground truth data
joined_df = predicted_df.alias("p").join(ground_truth_df.alias("g"), "Name", "left_outer")

# Calculate evaluation metrics
metrics = joined_df.groupBy("Name").agg(
    sum((col("p.Email").isNotNull() & col("g.Email").isNotNull()).cast("int")).alias("TP"),
    sum((col("p.Email").isNotNull() & col("g.Email").isNull()).cast("int")).alias("FP"),
    sum((col("p.Email").isNull() & col("g.Email").isNotNull()).cast("int")).alias("FN")
)

# Calculate precision, recall, and F1-score
metrics = metrics.withColumn(
    "Precision", col("TP") / (col("TP") + col("FP"))
).withColumn(
    "Recall", col("TP") / (col("TP") + col("FN"))
).withColumn(
    "F1_Score", 2 * (col("Precision") * col("Recall")) / (col("Precision") + col("Recall"))
)

# Display evaluation metrics
metrics.show(truncate=False)

spark.stop()


+-----------+---+---+---+---------+------+--------+
|Name       |TP |FP |FN |Precision|Recall|F1_Score|
+-----------+---+---+---+---------+------+--------+
|John Doe   |1  |0  |0  |1.0      |1.0   |1.0     |
|Jane Smith |1  |0  |0  |1.0      |1.0   |1.0     |
|Bob Johnson|0  |1  |0  |0.0      |null  |null    |
+-----------+---+---+---+---------+------+--------+

