In [0]:
# Spark Native Fuzzy Deduplication using Levenshtein Distance (No pandas, No toPandas)
# This solution uses only PySpark built-ins, so it is scalable and distributed.

import dlt
from pyspark.sql import functions as F
from pyspark.sql import Window

@dlt.table(
    name="dedup_output_PySpark_DLT"
)
def deduplication_spark_native():
    # Replace with your actual input table path
    input_table = "my_database.my_schema.dedup_input"

    # Load data
    df = spark.table(input_table).select(
        F.col("FUSION_CUSTOMER_NAME").alias("fusion_customer_name"),
        F.col("ADDRESS_LINE_1").alias("address_line_1"),
        F.col("POSTAL_CODE").alias("postal_code"),
        F.col("CITY").alias("city"),
        F.col("COUNTRY").alias("country"),
        F.col("ID").alias("id"),
        F.col("SOURCE_SYSTEM").alias("source_system")
    )

    # (Optional) Normalize string columns
    df = df.withColumn("fusion_customer_name_norm", 
                       F.lower(F.regexp_replace(F.col("fusion_customer_name"), r'[^a-z0-9\s]', '')))
    df = df.withColumn("address_line_1_norm", 
                       F.lower(F.regexp_replace(F.col("address_line_1"), r'[^a-z0-9\s]', '')))
    df = df.withColumn("city_norm", 
                       F.lower(F.regexp_replace(F.col("city"), r'[^a-z0-9\s]', '')))
    df = df.withColumn("country_norm", 
                       F.lower(F.regexp_replace(F.col("country"), r'[^a-z0-9\s]', '')))
    # You can skip normalization if input is already clean

    # Combine relevant columns into one string for fuzzy matching
    df = df.withColumn(
        "combined_key",
        F.concat_ws(
            " ",
            F.col("fusion_customer_name_norm"),
            F.col("address_line_1_norm"),
            F.col("city_norm"),
            F.col("country_norm")
        )
    )

    # Generate a unique row number for each record (for join logic)
    df = df.withColumn("row_num", F.monotonically_increasing_id())

    # Self-join with windowing to limit the comparison scope for scalability
    window = Window.orderBy("fusion_customer_name_norm")
    df = df.withColumn("window_id", F.row_number().over(window))

    # For each record, compare only with nearby records (window of 100, adjust as needed)
    window_size = 100
    df1 = df.alias("df1")
    df2 = df.alias("df2")

    join_condition = (
        (F.abs(F.col("df1.window_id") - F.col("df2.window_id")) <= window_size) &
        (F.col("df1.row_num") < F.col("df2.row_num"))
    )

    # Compute Levenshtein distance between combined keys
    joined = df1.join(df2, join_condition, how="inner") \
        .withColumn(
            "levenshtein_dist",
            F.levenshtein(F.col("df1.combined_key"), F.col("df2.combined_key"))
        )

    # Set your similarity threshold (lower distance = more similar)
    max_distance = 5  # Adjust as needed

    similar_pairs = joined.filter(F.col("levenshtein_dist") <= max_distance) \
        .select(
            F.col("df1.id").alias("id_1"),
            F.col("df2.id").alias("id_2"),
            F.col("levenshtein_dist")
        )

    # For clustering: assign minimum id as the cluster leader (simple approach)
    cluster_leader_df = similar_pairs.withColumn(
        "cluster_leader",
        F.least("id_1", "id_2")
    )

    # Create mapping from id to cluster leader
    id_to_leader = cluster_leader_df.select("id_1", "cluster_leader") \
        .union(cluster_leader_df.select(F.col("id_2").alias("id_1"), "cluster_leader")) \
        .distinct()

    # Join cluster info back to the original
    df_with_cluster = df.join(
        id_to_leader,
        df.id == id_to_leader.id_1,
        how="left"
    ).withColumn(
        "Cluster_Id",
        F.coalesce(F.col("cluster_leader"), F.col("id"))  # If not matched, self as cluster leader
    ).drop("id_1", "cluster_leader")

    # Calculate cluster sizes
    cluster_sizes = df_with_cluster.groupBy("Cluster_Id").count().withColumnRenamed("count", "Cluster_Size")
    df_with_cluster = df_with_cluster.join(cluster_sizes, "Cluster_Id", how="left")

    # Add Row_Id for compatibility with previous output
    df_with_cluster = df_with_cluster.withColumn("Row_Id", F.col("id"))

    # Select and order output columns
    result = df_with_cluster.select(
        "Row_Id",
        "fusion_customer_name",
        "address_line_1",
        "postal_code",
        "city",
        "country",
        "id",
        "source_system",
        "Cluster_Id",
        "Cluster_Size"
    )

    return result