## Q3

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, regexp_replace, col, trim, concat_ws
from pyspark.sql import functions as F

# Initialize Spark session
spark = SparkSession.builder.appName("EntityResolutionDuplicateDetection").getOrCreate()

# Sample data simulating entities with slight variations
data = [
    (1, "Alice Johnson", "alice.j@example.com", "123-456-7890", "123 Elm St."),
    (2, "Alyce Jonson", "alyce.j@example.com", "1234567890", "123 Elm Street"),
    (3, "Bob Smith", "bob.smith@example.com", "987-654-3210", "456 Maple Ave"),
    (4, "Robert Smith", "rsmith@example.com", "9876543210", "456 Maple Avenue")
]

# Create a DataFrame
df = spark.createDataFrame(data, ["id", "name", "email", "phone", "address"])

# Display the original DataFrame
print("Original DataFrame:")
df.show(truncate=False)

# Data Cleaning and Normalization

# 1. Normalize text by converting to lowercase and trimming spaces
df = df.withColumn("name", trim(lower(col("name")))) \
       .withColumn("email", trim(lower(col("email")))) \
       .withColumn("address", trim(lower(col("address"))))

# 2. Remove special characters in phone numbers
df = df.withColumn("phone", regexp_replace(col("phone"), r"[^0-9]", ""))

# 3. Standardize common address terms
df = df.withColumn("address", regexp_replace(col("address"), r"\bstreet\b", "st")) \
       .withColumn("address", regexp_replace(col("address"), r"\bavenue\b", "ave"))

# Concatenate fields to create a "fingerprint" for each record
df = df.withColumn("fingerprint", concat_ws("_", col("name"), col("phone"), col("address")))

# Display the DataFrame with fingerprints
print("DataFrame with Fingerprints:")
df.show(truncate=False)

# Find duplicates based on the fingerprint
duplicates_df = df.groupBy("fingerprint") \
                  .agg(F.collect_list("id").alias("duplicate_ids"),
                       F.count("id").alias("count")) \
                  .filter("count > 1")

# Display the duplicate records
print("Potential Duplicates Based on Entity Resolution:")
duplicates_df.show(truncate=False)

# Stop the Spark session
spark.stop()


24/11/06 07:45:27 WARN Utils: Your hostname, Anants-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.86.5.30 instead (on interface en0)
24/11/06 07:45:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/06 07:45:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Original DataFrame:


                                                                                

+---+-------------+---------------------+------------+----------------+
|id |name         |email                |phone       |address         |
+---+-------------+---------------------+------------+----------------+
|1  |Alice Johnson|alice.j@example.com  |123-456-7890|123 Elm St.     |
|2  |Alyce Jonson |alyce.j@example.com  |1234567890  |123 Elm Street  |
|3  |Bob Smith    |bob.smith@example.com|987-654-3210|456 Maple Ave   |
|4  |Robert Smith |rsmith@example.com   |9876543210  |456 Maple Avenue|
+---+-------------+---------------------+------------+----------------+

DataFrame with Fingerprints:
+---+-------------+---------------------+----------+-------------+-------------------------------------+
|id |name         |email                |phone     |address      |fingerprint                          |
+---+-------------+---------------------+----------+-------------+-------------------------------------+
|1  |alice johnson|alice.j@example.com  |1234567890|123 elm st.  |alice johnson