In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim, regexp_replace, udf, explode, collect_set, lit, count, when
from pyspark.sql.types import ArrayType, StringType, FloatType
from pyspark.ml.feature import Tokenizer
import pyspark.sql.functions as F
import re

In [10]:
spark = SparkSession.builder \
    .appName("EntityResolutionPreprocessing") \
    .getOrCreate()




# q1

In [11]:
data = [
    ("1", "Company ABC, Inc.", "1234 Elm St.", "New York", "NY"),
    ("2", "ABC Company, Inc.", "1234 Elm Street", "New York", "NY"),
    ("3", "XYZ Corp.", "5678 Oak Ave.", "Los Angeles", "CA")
]

columns = ["id", "name", "address", "city", "state"]

df = spark.createDataFrame(data, columns)

# Step 1: Normalize text (convert to lowercase, remove punctuation, trim whitespace)
def normalize_text(text):
    text = text.lower()  # Convert to lowercase
    text = re.sub(r'[^\w\s]', '', text)  # Remove punctuation
    text = text.strip()  # Trim leading/trailing whitespace
    return text

normalize_text_udf = udf(lambda x: normalize_text(x), StringType())

df = df.withColumn("name_normalized", normalize_text_udf(col("name")))
df = df.withColumn("address_normalized", normalize_text_udf(col("address")))
df = df.withColumn("city_normalized", normalize_text_udf(col("city")))

# Step 2: Tokenize text (split into words)
tokenizer = Tokenizer(inputCol="name_normalized", outputCol="name_tokens")
df = tokenizer.transform(df)

tokenizer = Tokenizer(inputCol="address_normalized", outputCol="address_tokens")
df = tokenizer.transform(df)

# Step 3: Remove stopwords (optional)
# Assuming a list of stopwords
stopwords = ['inc', 'co', 'corp', 'llc', 'company', 'street', 'st', 'ave', 'avenue']

def remove_stopwords(tokens):
    return [token for token in tokens if token not in stopwords]

remove_stopwords_udf = udf(lambda x: remove_stopwords(x), ArrayType(StringType()))

df = df.withColumn("name_tokens", remove_stopwords_udf(col("name_tokens")))
df = df.withColumn("address_tokens", remove_stopwords_udf(col("address_tokens")))

# Step 4: Remove duplicates and null values
df = df.dropDuplicates()
df = df.na.drop()

# Step 5: View the preprocessed DataFrame
df.select("id", "name_normalized", "name_tokens", "address_normalized", "address_tokens", "city_normalized").show(truncate=False)


+---+---------------+-----------+------------------+--------------+---------------+
|id |name_normalized|name_tokens|address_normalized|address_tokens|city_normalized|
+---+---------------+-----------+------------------+--------------+---------------+
|1  |company abc inc|[abc]      |1234 elm st       |[1234, elm]   |new york       |
|2  |abc company inc|[abc]      |1234 elm street   |[1234, elm]   |new york       |
|3  |xyz corp       |[xyz]      |5678 oak ave      |[5678, oak]   |los angeles    |
+---+---------------+-----------+------------------+--------------+---------------+



# q2

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, StringType, FloatType
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import explode, collect_set
import pyspark.sql.functions as F

# Initialize Spark session
spark = SparkSession.builder \
    .appName("SimilarityScoreComputation") \
    .getOrCreate()

# Sample data
data = [
    ("1", "Company ABC Inc."),
    ("2", "ABC Company Inc."),
    ("3", "XYZ Corp."),
    ("4", "Company XYZ")
]

columns = ["id", "name"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Tokenize text data
tokenizer = Tokenizer(inputCol="name", outputCol="name_tokens")
df = tokenizer.transform(df)

# Create a DataFrame with (id, tokens) for self-joins
df_tokens = df.select("id", "name_tokens")

# Cross join the DataFrame with itself to compute pairwise similarity
df_cross = df_tokens.alias("df1").crossJoin(df_tokens.alias("df2"))

# Define UDF to compute Jaccard Similarity
def jaccard_similarity(set1, set2):
    set1 = set(set1)
    set2 = set(set2)
    if len(set1.union(set2)) == 0:
        return 0.0
    return len(set1.intersection(set2)) / len(set1.union(set2))

jaccard_similarity_udf = udf(jaccard_similarity, FloatType())

# Compute Jaccard Similarity for each pair
df_similarity = df_cross.withColumn(
    "similarity",
    jaccard_similarity_udf(col("df1.name_tokens"), col("df2.name_tokens"))
)
# Filter out self-comparisons
df_similarity = df_similarity.filter(col("df1.id") != col("df2.id"))

# Show results
df_similarity.select(
    col("df1.id").alias("id1"),
    col("df2.id").alias("id2"),
    col("similarity")
).show(truncate=False)


+---+---+----------+
|id1|id2|similarity|
+---+---+----------+
|1  |2  |1.0       |
|1  |3  |0.0       |
|1  |4  |0.25      |
|2  |1  |1.0       |
|2  |3  |0.0       |
|2  |4  |0.25      |
|3  |1  |0.0       |
|3  |2  |0.0       |
|3  |4  |0.33333334|
|4  |1  |0.25      |
|4  |2  |0.25      |
|4  |3  |0.33333334|
+---+---+----------+



In [13]:
# Sample ground truth data (true matches and non-matches)
# Format: (id1, id2, is_match)
ground_truth_data = [
    ("1", "2", 1),
    ("2", "3", 0),
    ("3", "4", 1),
    ("4", "1", 0)
]

columns = ["id1", "id2", "is_match"]

# Create DataFrame for ground truth
df_ground_truth = spark.createDataFrame(ground_truth_data, columns)

# Sample predictions from the entity resolution model
# Format: (id1, id2, predicted_match)
predictions_data = [
    ("1", "2", 1),
    ("2", "3", 0),
    ("3", "4", 0),
    ("4", "1", 0)
]

columns_pred = ["id1", "id2", "predicted_match"]

# Create DataFrame for predictions
df_predictions = spark.createDataFrame(predictions_data, columns_pred)

# Join ground truth and predictions on id1 and id2
df_joined = df_ground_truth.alias("gt").join(
    df_predictions.alias("pred"),
    (col("gt.id1") == col("pred.id1")) & (col("gt.id2") == col("pred.id2")),
    "left"
).select(
    col("gt.id1"),
    col("gt.id2"),
    col("gt.is_match"),
    col("pred.predicted_match")
)

# Compute True Positives, False Positives, True Negatives, and False Negatives
df_metrics = df_joined.withColumn(
    "true_positive",
    when((col("is_match") == 1) & (col("predicted_match") == 1), 1).otherwise(0)
).withColumn(
    "false_positive",
    when((col("is_match") == 0) & (col("predicted_match") == 1), 1).otherwise(0)
).withColumn(
    "true_negative",
    when((col("is_match") == 0) & (col("predicted_match") == 0), 1).otherwise(0)
).withColumn(
    "false_negative",
    when((col("is_match") == 1) & (col("predicted_match") == 0), 1).otherwise(0)
)

# Aggregate counts
metrics = df_metrics.agg(
    count(when(col("true_positive") == 1, 1)).alias("true_positives"),
    count(when(col("false_positive") == 1, 1)).alias("false_positives"),
    count(when(col("true_negative") == 1, 1)).alias("true_negatives"),
    count(when(col("false_negative") == 1, 1)).alias("false_negatives")
).collect()[0]

# Extract counts
true_positives = metrics.true_positives
false_positives = metrics.false_positives
true_negatives = metrics.true_negatives
false_negatives = metrics.false_negatives

# Compute Precision, Recall, and F1-Score
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0.0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0.0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0

# Print results
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1_score:.4f}")


Precision: 1.0000
Recall: 0.5000
F1-Score: 0.6667


In [14]:
# Stop Spark session
spark.stop()
