# Entity Resolution #

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, Normalizer

# Initialize Spark Session
spark = SparkSession.builder.appName("EntityResolutionHardcoded").getOrCreate()

# Hardcoded data simulating slight variations in names
data = [
    (1, "John Doe"),
    (2, "Jon Doe"),
    (3, "Johnny Doe"),
    (4, "Jane Smith"),
    (5, "J Smith"),
    (6, "Janet Smith"),
    (7, "Doe John"),
    (8, "Smith Jane")
]

# Create DataFrame from hardcoded data
columns = ["id", "name"]
df = spark.createDataFrame(data, columns)

# Data Cleaning and Preprocessing
# Step 1: Normalize text (convert to lowercase, remove punctuation and whitespace)
df_cleaned = df.withColumn("name_normalized", lower(trim(col("name"))))
df_cleaned = df_cleaned.withColumn("name_normalized", regexp_replace(col("name_normalized"), "[^a-zA-Z0-9\\s]", ""))

# Step 2: Tokenization
tokenizer = Tokenizer(inputCol="name_normalized", outputCol="name_tokens")
df_tokenized = tokenizer.transform(df_cleaned)

# Step 3: Remove stop words
stop_words_remover = StopWordsRemover(inputCol="name_tokens", outputCol="name_filtered_tokens")
df_filtered = stop_words_remover.transform(df_tokenized)

# Step 4: Vectorize using HashingTF
hashing_tf = HashingTF(inputCol="name_filtered_tokens", outputCol="name_tf", numFeatures=1000)
df_vectorized = hashing_tf.transform(df_filtered)

# Step 5: Normalize the vectors
normalizer = Normalizer(inputCol="name_tf", outputCol="name_norm")
df_normalized = normalizer.transform(df_vectorized)

# Select only relevant columns for entity resolution
df_final = df_normalized.select("id", "name", "name_norm")
df_final.show(truncate=False)


24/11/05 18:48:09 WARN Utils: Your hostname, Arnavs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.20.10.2 instead (on interface en0)
24/11/05 18:48:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/05 18:48:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:>                                                          (0 + 1) / 1]

+---+-----------+--------------------------------------------------------+
|id |name       |name_norm                                               |
+---+-----------+--------------------------------------------------------+
|1  |John Doe   |(1000,[701,818],[0.7071067811865475,0.7071067811865475])|
|2  |Jon Doe    |(1000,[818,828],[0.7071067811865475,0.7071067811865475])|
|3  |Johnny Doe |(1000,[818,998],[0.7071067811865475,0.7071067811865475])|
|4  |Jane Smith |(1000,[685,901],[0.7071067811865475,0.7071067811865475])|
|5  |J Smith    |(1000,[660,685],[0.7071067811865475,0.7071067811865475])|
|6  |Janet Smith|(1000,[204,685],[0.7071067811865475,0.7071067811865475])|
|7  |Doe John   |(1000,[701,818],[0.7071067811865475,0.7071067811865475])|
|8  |Smith Jane |(1000,[685,901],[0.7071067811865475,0.7071067811865475])|
+---+-----------+--------------------------------------------------------+



                                                                                

In [2]:
from pyspark.sql.types import FloatType
from pyspark.sql import functions as F

# Define a UDF to compute cosine similarity between two vectors
def cosine_similarity(v1, v2):
    return float(v1.dot(v2) / (v1.norm(2) * v2.norm(2)))

cosine_similarity_udf = F.udf(cosine_similarity, FloatType())

# Self-join the DataFrame to get record pairs
df_pairs = df_final.alias("a").join(df_final.alias("b"), F.col("a.id") < F.col("b.id"))

# Calculate similarity score
df_similarity = df_pairs.withColumn("similarity_score", cosine_similarity_udf(col("a.name_norm"), col("b.name_norm")))

# Filter pairs with a high similarity score
similarity_threshold = 0.8
df_matches = df_similarity.filter(col("similarity_score") >= similarity_threshold)

df_matches.select("a.id", "a.name", "b.id", "b.name", "similarity_score").show(truncate=False)


+---+----------+---+----------+----------------+
|id |name      |id |name      |similarity_score|
+---+----------+---+----------+----------------+
|1  |John Doe  |7  |Doe John  |1.0             |
|4  |Jane Smith|8  |Smith Jane|1.0             |
+---+----------+---+----------+----------------+



                                                                                

In [3]:
# Hardcoded ground truth
ground_truth_data = [
    (1, 2),
    (4, 5),
    (1, 7),
    (4, 8)
]
ground_truth_columns = ["id1", "id2"]
ground_truth = spark.createDataFrame(ground_truth_data, ground_truth_columns)

# Join matched pairs with ground truth to get true positives
true_positives = df_matches.join(ground_truth, (df_matches["a.id"] == ground_truth["id1"]) & (df_matches["b.id"] == ground_truth["id2"]), "inner")

# Calculate counts
tp_count = true_positives.count()
predicted_match_count = df_matches.count()
actual_match_count = ground_truth.count()

# Calculate Precision, Recall, and F1-Score
precision = tp_count / predicted_match_count if predicted_match_count > 0 else 0
recall = tp_count / actual_match_count if actual_match_count > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1_score:.2f}")

# Stop the Spark session
spark.stop()


                                                                                

Precision: 1.00
Recall: 0.50
F1 Score: 0.67
