In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = (SparkSession.builder 
    .appName("deduplication") 
    .master("local[*]") 
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12") 
    .config("spark.driver.memory", "16g")
    .getOrCreate())


spark.sparkContext.setCheckpointDir("/tmp/")

/usr/local/spark-3.4.1-bin-hadoop3/bin/load-spark-env.sh: line 68: ps: command not found


:: loading settings :: url = jar:file:/usr/local/spark-3.4.1-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2944b3cc-c023-4c8e-8504-de6c0118e4ce;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.2-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 298ms :: artifacts dl 8ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.2-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	-----------------------------------------------

In [2]:
n_records = 10000
n_duplicates0 = 100
n_duplicates1 = 100
n_duplicates2 = 100
n_duplicates3 = 100

In [3]:
from faker import Faker
import pandas as pd
import numpy as np
import random

# Instantiate Faker
fake = Faker()

# Create a new DataFrame for storing the original records and their duplicates
df_new = pd.DataFrame({
    'client_id': range(1, n_records+1),
    'client_name': [fake.name() for _ in range(n_records)],
    'address': [fake.address().replace('\n', ', ') for _ in range(n_records)],
    'email': [fake.email() for _ in range(n_records)],
    'phone_number': [fake.phone_number() for _ in range(n_records)],
    'duplicate_id': range(1, n_records+1),  # Add a new column for the duplicate ID
    'num_modifications': [0] * n_records  # Add a new column for the number of modifications
})

# Define modification functions
def modify_name(name):
    mods = ['add_middle_initial'
        ,'add_prefix'
        ,'typo'
        ,'switch_name_order'
        ,'change_case']
    mod = random.choice(mods)
    print(f"Chosen modification: {mod}")

    if mod == 'add_middle_initial':
        if len(name.split()) == 2:
            first, last = name.split(' ')
            middle_initial = fake.random_uppercase_letter()
            modified_name = f"{first} {middle_initial}. {last}"
        else:
            modified_name = f"{name} {fake.random_uppercase_letter}."

    elif mod == 'add_prefix':
        prefix = fake.prefix()
        modified_name = f"{prefix} {name}"

    elif mod == 'typo':
        index = random.randint(0, len(name)-1)
        modified_name = name[:index] + fake.random_lowercase_letter() + name[index+1:]

    elif mod == 'switch_name_order':
        name_parts = name.split()
        modified_name = ', '.join(name_parts[::-1])

    elif mod == 'change_case':
        modified_name = name.upper() if random.choice([True, False]) else name.lower()

    print(f"Modified name: {modified_name}")
    return modified_name


def modify_email(email):
    user, domain = email.split('@')
    new_domain = fake.free_email_domain()
    return f"{user}@{new_domain}"

def modify_address(address):
    new_address = fake.street_address()
    city_state_zip = address.split(', ')[1:]
    return f"{new_address}, {', '.join(city_state_zip)}"

def modify_phone_number(phone_number):
    digits = [char for char in phone_number if char.isdigit()]
    digit_to_replace = random.choice(digits)
    return phone_number.replace(digit_to_replace, str(fake.random_digit()), 1)

def modification(field, value):
    if field == 'client_name':
        return modify_name(value)
    elif field == 'address':
        return modify_address(value)
    elif field == 'email':
        return modify_email(value)
    elif field == 'phone_number':
        return modify_phone_number(value)

# Make slight modifications to create duplicates
for idx in range(n_duplicates0):

    #print("duplicate #: ", idx)
    # Create the first duplicate
    duplicate = df_new.loc[idx].copy()
    #print(duplicate)
    new_client_id = df_new['client_id'].max() + 1  # Assign a new client_id
    duplicate['client_id'] = new_client_id
    duplicate['duplicate_id'] = int(idx + 1)  # Assign the duplicate ID to be the original client_id

    # Randomly select zero or more fields to modify (excluding client_id)
    num_modifications = np.random.choice([0, 0, 1, 1, 2])  # Make 2 modifications rarer
    duplicate['num_modifications'] = num_modifications
    if num_modifications > 0:
        fields_to_modify = np.random.choice(['client_name', 'address', 'email', 'phone_number'], size=num_modifications, replace=False)

        for field in fields_to_modify:
            # Apply the modification
            value = duplicate[field]
            duplicate[field] = modification(field,value)

    # Append the duplicate to the DataFrame
    df_new = pd.concat([df_new, duplicate.to_frame().transpose()], ignore_index=True)
    #print(idx,duplicate)

    # Create additional duplicates for a subset of the original records
    if idx < n_duplicates1:
        # Create one more duplicate for the first 50 original records
        duplicate = df_new.loc[idx].copy()
        new_client_id = df_new['client_id'].max() + 1  # Assign a new client_id
        duplicate['client_id'] = new_client_id
        duplicate['duplicate_id'] = int(idx + 1)  # Assign the duplicate ID to be the original client_id

        num_modifications = np.random.choice([0, 0, 1, 1, 2])  # Make 2 modifications rarer
        duplicate['num_modifications'] = num_modifications
        if num_modifications > 0:
            fields_to_modify = np.random.choice(['client_name', 'address', 'email', 'phone_number'], size=num_modifications, replace=False)

            for field in fields_to_modify:
                # Apply the modification
                value = duplicate[field]
                duplicate[field] = modification(field,value)

        # Append the duplicate to the DataFrame
        df_new = pd.concat([df_new, duplicate.to_frame().transpose()], ignore_index=True)
        #print(idx,duplicate)

    if idx < n_duplicates2:
        # Create more duplicates
        duplicate = df_new.loc[idx].copy()
        new_client_id = df_new['client_id'].max() + 1  # Assign a new client_id
        duplicate['client_id'] = new_client_id
        duplicate['duplicate_id'] = int(idx + 1)  # Assign the duplicate ID to be the original client_id

        num_modifications = np.random.choice([0, 0, 1, 1, 2])  # Make 2 modifications rarer
        duplicate['num_modifications'] = num_modifications
        if num_modifications > 0:
            fields_to_modify = np.random.choice(['client_name', 'address', 'email', 'phone_number'], size=num_modifications, replace=False)

            for field in fields_to_modify:
                # Apply the modification
                value = duplicate[field]
                duplicate[field] = modification(field,value)

        # Append the duplicate to the DataFrame
        df_new = pd.concat([df_new, duplicate.to_frame().transpose()], ignore_index=True)
        #print(idx,duplicate)

    if idx == 0:
        # Create more duplicates for the first original record
        for _ in range(n_duplicates3):
            duplicate = df_new.loc[idx].copy()
            new_client_id = df_new['client_id'].max() + 1  # Assign a new client_id
            duplicate['client_id'] = new_client_id
            duplicate['duplicate_id'] = int(idx + 1)  # Assign the duplicate ID to be the original client_id

            num_modifications = np.random.choice([0, 0, 1, 1, 2])  # Make 2 modifications rarer
            duplicate['num_modifications'] = num_modifications
            if num_modifications > 0:
                fields_to_modify = np.random.choice(['client_name', 'address', 'email', 'phone_number'], size=num_modifications, replace=False)

                for field in fields_to_modify:
                    # Apply the modification
                    value = duplicate[field]
                    duplicate[field] = modification(field,value)

            # Append the duplicate to the DataFrame
            df_new = pd.concat([df_new, duplicate.to_frame().transpose()], ignore_index=True)
            #print(idx,duplicate)

    #print(len(df_new))

# Shuffle the DataFrame
df_new = df_new.sample(frac=1).reset_index(drop=True)
df_new = df_new.astype({
    'client_id': 'int64',  # or 'int32' if your numbers are not too large
    'client_name': 'string',
    'address': 'string',
    'email': 'string',
    'phone_number': 'string',
    'duplicate_id': 'int64',  # or 'int32' if your numbers are not too large
    'num_modifications': 'int64'  # or 'int32' if your numbers are not too large
})


Chosen modification: typo
Modified name: Wendy Berges
Chosen modification: change_case
Modified name: wendy berger
Chosen modification: switch_name_order
Modified name: Berger, Wendy
Chosen modification: switch_name_order
Modified name: Berger, Wendy
Chosen modification: change_case
Modified name: WENDY BERGER
Chosen modification: add_prefix
Modified name: Mx. Wendy Berger
Chosen modification: typo
Modified name: Wendy Bergel
Chosen modification: change_case
Modified name: wendy berger
Chosen modification: add_prefix
Modified name: Dr. Wendy Berger
Chosen modification: add_prefix
Modified name: Ind. Wendy Berger
Chosen modification: add_middle_initial
Modified name: Wendy X. Berger
Chosen modification: switch_name_order
Modified name: Berger, Wendy
Chosen modification: switch_name_order
Modified name: Berger, Wendy
Chosen modification: typo
Modified name: Wendy Bergev
Chosen modification: change_case
Modified name: wendy berger
Chosen modification: add_prefix
Modified name: Ind. Wendy 

In [4]:
#show df_new
df_new.head(10)

Unnamed: 0,client_id,client_name,address,email,phone_number,duplicate_id,num_modifications
0,1744,Bryan Burgess,"23386 Duncan Stream Apt. 276, Grahamtown, OR 1...",michael40@example.com,+1-903-570-1837x37019,1744,0
1,286,Sara Sharp,"553 Christopher Greens Apt. 150, Fletcherton, ...",christine70@example.com,(437)980-0094x2575,286,0
2,7774,Amy Hebert,"426 Landry Stream, North Michaelton, SC 75033",avilahunter@example.net,3413408987,7774,0
3,313,Robyn Armstrong,"07533 King Canyon Apt. 976, Carolineport, MD 1...",molly14@example.org,802.430.6142,313,0
4,5257,Kenneth Brown,"734 Byrd Street, Mitchellland, CT 98113",andrewsmith@example.com,476-785-8666x089,5257,0
5,7068,Robert Anderson,"162 Massey Ville Apt. 987, West Casey, WY 15128",wrightpaul@example.com,668.393.4787,7068,0
6,6315,Stephanie Palmer,"4489 Lozano Viaduct Suite 860, South Christine...",andersonamanda@example.net,001-353-640-4379x84204,6315,0
7,8787,Laura Spencer,"463 Cooley Lock, Parkerhaven, NC 14978",teresabrown@example.com,001-985-985-4104x73941,8787,0
8,986,Anna Nichols,"6662 Brooks Parks Suite 398, Alvarezport, VT 7...",sarahcarroll@example.org,+1-223-505-8549x4231,986,0
9,6469,Robert Blankenship,"149 Morales Fork Apt. 599, Allenborough, NJ 99871",friedmansarah@example.com,617.808.2250x31491,6469,0


In [5]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import Tokenizer, SQLTransformer, RegexTokenizer, NGram, HashingTF
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

from pyspark.ml.feature import MinHashLSH

from graphframes import GraphFrame

string_list = ['client_name', 'address', 'email', 'phone_number']
distance_threshold = 0.1
df_spark = spark.createDataFrame(df_new)
#p
transform0  =    SQLTransformer(statement=f"""SELECT *,LOWER(REGEXP_REPLACE(CONCAT({','.join(string_list)}), 
            '[\\s\\W]', '')) AS record_strings FROM __THIS__ """)
token0      =    Tokenizer(inputCol="record_strings", outputCol="token" )
transform1  =    SQLTransformer(statement="SELECT *, concat_ws(' ', token) concat FROM __THIS__")
token1      =    RegexTokenizer(pattern="", inputCol="concat", outputCol="char", minTokenLength=1 )
ngram       =    NGram(n=2, inputCol="char", outputCol="ngram")
hash      =    HashingTF(inputCol="ngram", outputCol="vector")

# feat        =    VectorAssembler(inputCols=["vector"], outputCol="features")
# kmeans      =    KMeans(k = 2, seed = 1, predictionCol="kmeans")

stages = [transform0,token0,transform1,token1,ngram,hash]
#pre-processing
pipeline = Pipeline(stages=stages)
model = pipeline.fit(df_spark)
model_df = model.transform(df_spark)


#knn then jaccard distance
lsh_model = MinHashLSH(inputCol="vector", outputCol="lsh", numHashTables=3).fit(model_df)
similarity_df = lsh_model.approxSimilarityJoin(model_df, model_df, distance_threshold, distCol="text_distance").filter("datasetA.client_id != datasetB.client_id")



from pyspark.sql.window import Window
from pyspark.sql.functions import col, concat_ws, split, array_sort, min

#graphx
edges = (similarity_df.selectExpr("datasetA.client_id as src","datasetB.client_id as dst")
         .withColumn('set_col', concat_ws(',', col('src'), col('dst')))
         .withColumn('sorted_set', array_sort(split(col('set_col'), ',')))
         .dropDuplicates(['sorted_set']).select(col("src"), col("dst")))

vertices = (similarity_df.selectExpr("datasetA.client_id as id").union(similarity_df.selectExpr("datasetB.client_id as id"))).distinct()


#connections graph
graph_frame = GraphFrame(vertices, edges)

#slow
components_df = graph_frame.connectedComponents().withColumn("min_id", min(col("id")).over(Window.partitionBy("component")))



In [6]:
components_df.show(20,truncate=False)

23/08/06 01:34:38 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-----+---------+------+
|id   |component|min_id|
+-----+---------+------+
|10014|1        |1     |
|10028|1        |1     |
|10019|1        |1     |
|10091|1        |1     |
|10037|1        |1     |
|10072|1        |1     |
|10059|1        |1     |
|10103|1        |1     |
|10055|1        |1     |
|10061|1        |1     |
|10030|1        |1     |
|10081|1        |1     |
|10020|1        |1     |
|10095|1        |1     |
|10060|1        |1     |
|10023|1        |1     |
|10051|1        |1     |
|10033|1        |1     |
|10011|1        |1     |
|10049|1        |1     |
+-----+---------+------+


                                                                                

In [7]:
similarity_df.show(20,truncate=False)



+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

Now try with kmeans

In [8]:
feat        =    VectorAssembler(inputCols=["vector"], outputCol="features")
kmeans      =    KMeans(k = 4, seed = 1, predictionCol="kmeans")

stages = [transform0,token0,transform1,token1,ngram,hash,feat,kmeans]
#pre-processing
pipeline = Pipeline(stages=stages)
model = pipeline.fit(df_spark)
model_df = model.transform(df_spark)


#knn then jaccard distance
lsh_model = MinHashLSH(inputCol="vector", outputCol="lsh", numHashTables=3).fit(model_df)
similarity_df = lsh_model.approxSimilarityJoin(model_df, model_df, distance_threshold, distCol="text_distance").filter("datasetA.client_id != datasetB.client_id")



from pyspark.sql.window import Window
from pyspark.sql.functions import col, concat_ws, split, array_sort, min

#graphx
edges = (similarity_df.selectExpr("datasetA.client_id as src","datasetB.client_id as dst")
         .withColumn('set_col', concat_ws(',', col('src'), col('dst')))
         .withColumn('sorted_set', array_sort(split(col('set_col'), ',')))
         .dropDuplicates(['sorted_set']).select(col("src"), col("dst")))

vertices = (similarity_df.selectExpr("datasetA.client_id as id").union(similarity_df.selectExpr("datasetB.client_id as id"))).distinct()


#connections graph
graph_frame = GraphFrame(vertices, edges)

#slow
components_df = graph_frame.connectedComponents().withColumn("min_id", min(col("id")).over(Window.partitionBy("component")))

23/08/06 01:36:06 WARN DAGScheduler: Broadcasting large task binary with size 14.2 MiB
23/08/06 01:36:09 WARN DAGScheduler: Broadcasting large task binary with size 14.2 MiB
23/08/06 01:36:11 WARN DAGScheduler: Broadcasting large task binary with size 14.2 MiB
23/08/06 01:36:15 WARN DAGScheduler: Broadcasting large task binary with size 14.2 MiB
23/08/06 01:36:18 WARN DAGScheduler: Broadcasting large task binary with size 14.2 MiB
23/08/06 01:36:21 WARN DAGScheduler: Broadcasting large task binary with size 14.2 MiB
23/08/06 01:36:23 WARN DAGScheduler: Broadcasting large task binary with size 14.2 MiB
23/08/06 01:36:26 WARN DAGScheduler: Broadcasting large task binary with size 14.2 MiB
23/08/06 01:36:29 WARN DAGScheduler: Broadcasting large task binary with size 14.2 MiB
23/08/06 01:36:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/08/06 01:36:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


In [9]:
components_df.show(20,truncate=False)

#number of duplicate clusters
components_df.select("min_id").distinct().count()


23/08/06 01:38:10 WARN DAGScheduler: Broadcasting large task binary with size 22.3 MiB
23/08/06 01:38:12 WARN DAGScheduler: Broadcasting large task binary with size 22.3 MiB
23/08/06 01:38:17 WARN DAGScheduler: Broadcasting large task binary with size 22.4 MiB
23/08/06 01:39:03 WARN DAGScheduler: Broadcasting large task binary with size 22.3 MiB
23/08/06 01:39:06 WARN DAGScheduler: Broadcasting large task binary with size 22.3 MiB
23/08/06 01:39:08 WARN DAGScheduler: Broadcasting large task binary with size 22.3 MiB
                                                                                

+-----+---------+------+
|id   |component|min_id|
+-----+---------+------+
|10014|1        |1     |
|10028|1        |1     |
|10019|1        |1     |
|10091|1        |1     |
|10037|1        |1     |
|10072|1        |1     |
|10059|1        |1     |
|10103|1        |1     |
|10055|1        |1     |
|10061|1        |1     |
|10030|1        |1     |
|10081|1        |1     |
|10020|1        |1     |
|10095|1        |1     |
|10060|1        |1     |
|10023|1        |1     |
|10051|1        |1     |
|10033|1        |1     |
|10011|1        |1     |
|10049|1        |1     |
+-----+---------+------+


23/08/06 01:39:09 WARN DAGScheduler: Broadcasting large task binary with size 22.3 MiB
23/08/06 01:39:11 WARN DAGScheduler: Broadcasting large task binary with size 22.3 MiB
23/08/06 01:39:16 WARN DAGScheduler: Broadcasting large task binary with size 22.4 MiB
23/08/06 01:40:01 WARN DAGScheduler: Broadcasting large task binary with size 22.3 MiB
23/08/06 01:40:03 WARN DAGScheduler: Broadcasting large task binary with size 22.3 MiB
23/08/06 01:40:05 WARN DAGScheduler: Broadcasting large task binary with size 22.3 MiB
23/08/06 01:40:07 WARN DAGScheduler: Broadcasting large task binary with size 22.3 MiB
                                                                                

99