In [None]:
# !pip install splink pyspark rapidfuzz

In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id

from splink.backends.spark import similarity_jar_location
from splink import SparkAPI

# Path to your custom JAR file
CUSTOM_JAR_PATH = r"C:\Users\AbhayPandey\Desktop\AP_SS\Note\scala-udf-similarity-0.1.1-shaded.jar"

# --- Use the modern SparkSession.builder pattern for configuration ---
spark = (
    SparkSession.builder.appName("SplinkRobustMemory")
    .config("spark.driver.memory", "12g")
    .config("spark.executor.memory", "8g")

    # --- THE CRITICAL FIX FOR THIS ERROR ---
    # This setting explicitly allocates more memory to the Python processes
    # that run your UDFs, preventing them from being killed for using too much RAM.
    .config("spark.python.worker.memory", "4g")

    # --- Other robust settings for stability ---
    .config("spark.driver.maxResultSize", "4g") # Safety for actions like .show()
    .config("spark.sql.shuffle.partitions", "16")
    .config("spark.sql.codegen.wholeStage", "true") # Important for performance
    .config("spark.ui.port", "4040") # For monitoring

    # Jars and Packages
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12")
    .config("spark.jars", f"{similarity_jar_location()},{CUSTOM_JAR_PATH}")
    .getOrCreate()
)

# Set checkpoint directory
spark.sparkContext.setCheckpointDir("./tmp_checkpoints")

print("✅ Spark Session created with robust memory settings.")
print(f"Access the Spark UI at: {spark.sparkContext.uiWebUrl}")

In [None]:
from pyspark.sql.types import StringType, DoubleType, ArrayType
# from pyspark.sql import callUDF


# Phonetic / normalization
spark.udf.registerJavaFunction("accent_remove", "uk.gov.moj.dash.linkage.AccentRemover", StringType())
spark.udf.registerJavaFunction("double_metaphone", "uk.gov.moj.dash.linkage.DoubleMetaphone", StringType())
spark.udf.registerJavaFunction("double_metaphone_alt", "uk.gov.moj.dash.linkage.DoubleMetaphoneAlt", StringType())

# Similarity
spark.udf.registerJavaFunction("cosine_distance", "uk.gov.moj.dash.linkage.CosineDistance", DoubleType())
spark.udf.registerJavaFunction("jaccard_similarity", "uk.gov.moj.dash.linkage.JaccardSimilarity", DoubleType())
spark.udf.registerJavaFunction("jaro_similarity", "uk.gov.moj.dash.linkage.JaroSimilarity", DoubleType())
spark.udf.registerJavaFunction("jaro_winkler_similarity", "uk.gov.moj.dash.linkage.JaroWinklerSimilarity", DoubleType())
spark.udf.registerJavaFunction("lev_damerau_distance", "uk.gov.moj.dash.linkage.LevDamerauDistance", DoubleType())

# Tokenisers
spark.udf.registerJavaFunction("qgram_tokeniser", "uk.gov.moj.dash.linkage.QgramTokeniser", StringType())
spark.udf.registerJavaFunction("q2gram_tokeniser", "uk.gov.moj.dash.linkage.Q2gramTokeniser", StringType())
spark.udf.registerJavaFunction("q3gram_tokeniser", "uk.gov.moj.dash.linkage.Q3gramTokeniser", StringType())
spark.udf.registerJavaFunction("q4gram_tokeniser", "uk.gov.moj.dash.linkage.Q4gramTokeniser", StringType())
spark.udf.registerJavaFunction("q5gram_tokeniser", "uk.gov.moj.dash.linkage.Q5gramTokeniser", StringType())
spark.udf.registerJavaFunction("q6gram_tokeniser", "uk.gov.moj.dash.linkage.Q6gramTokeniser", StringType())

# Array / explode helpers
spark.udf.registerJavaFunction("dual_array_explode", "uk.gov.moj.dash.linkage.DualArrayExplode", ArrayType(StringType()))
spark.udf.registerJavaFunction("latlong_explode", "uk.gov.moj.dash.linkage.latlongexplode", ArrayType(StringType()))

# Escaping
spark.udf.registerJavaFunction("sql_escape", "uk.gov.moj.dash.linkage.sqlEscape", StringType())


In [None]:
from pyspark.sql.functions import monotonically_increasing_id, col
from pyspark.sql.types import StringType
from pyspark.storagelevel import StorageLevel

csv_path = "data.csv"   # replace with your dataset
df = spark.read.csv(csv_path, header=True, inferSchema=True)
df = df.withColumn("unique_id", monotonically_increasing_id().cast(StringType()))
df.persist(StorageLevel.MEMORY_AND_DISK)
print("Showing first 5 rows:")
df.show(5, truncate=False) # truncate=False makes it easier to see full column content

In [None]:
# import shutil, glob, os

# def move_single_csv(folder_path, final_csv):
#     """Move Spark's part-0000.csv to a single clean CSV file."""
#     part_file = glob.glob(os.path.join(folder_path, "part-*.csv"))[0]
#     shutil.move(part_file, final_csv)
#     shutil.rmtree(folder_path)  # cleanup temp folder


# def generate_predictions(linker, prediction_path: str, cluster_path: str, threshold: float):
#     # Predictions
#     df_predictions = linker.inference.predict()
#     df_predictions_spark = df_predictions.as_spark_dataframe()
#     df_predictions_spark = df_predictions_spark.filter(df_predictions_spark.match_probability > threshold)

#     (df_predictions_spark
#         .coalesce(1)
#         .write.mode("overwrite")
#         .option("header", True)
#         .csv(prediction_path.replace(".csv","")))

#     # Clusters
#     clusters = linker.clustering.cluster_pairwise_predictions_at_threshold(
#         df_predictions, threshold_match_probability=threshold
#     )
#     clusters_spark = clusters.as_spark_dataframe()

#     (clusters_spark
#         .coalesce(1)
#         .write.mode("overwrite")
#         .option("header", True)
#         .csv(cluster_path.replace(".csv","")))

#     # Rename temp folders to final CSVs
#     move_single_csv(prediction_path.replace(".csv",""), prediction_path)
#     move_single_csv(cluster_path.replace(".csv",""), cluster_path)

#     return df_predictions_spark, clusters_spark
import shutil
import glob
import os
import time
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import col, coalesce

def move_single_csv(folder_path, final_csv):
    """
    Finds Spark's output part-file and renames it to the desired final CSV name.
    """
    part_files = glob.glob(os.path.join(folder_path, "part-*.csv"))
    if not part_files:
        print(f"Warning: No data was written to the output folder {folder_path}.")
        with open(final_csv, 'w', newline='') as f:
            pass
        if os.path.exists(folder_path):
            shutil.rmtree(folder_path)
        return

    part_file = part_files[0]
    shutil.move(part_file, final_csv)
    if os.path.exists(folder_path):
        shutil.rmtree(folder_path)


def generate_predictions_single_csv(
    linker,
    original_df_spark: SparkDataFrame,
    prediction_path: str,
    cluster_path: str,
    threshold: float
):
    """
    Generates predictions and a COMPLETE, SORTED cluster file as a SINGLE CSV.
    """
    # (Prediction logic remains the same)
    start_time = time.time()
    df_predictions = linker.inference.predict()
    df_predictions_spark = df_predictions.as_spark_dataframe()
    df_predictions_spark = df_predictions_spark.filter(f"match_probability > {threshold}")

    temp_prediction_folder = prediction_path + "_temp"
    (df_predictions_spark
        .coalesce(1)
        .write.mode("overwrite")
        .option("header", True)
        .csv(temp_prediction_folder))

    move_single_csv(temp_prediction_folder, prediction_path)
    print(f"Single prediction CSV written to '{prediction_path}' in {time.time() - start_time:.2f} seconds.")


    # (Cluster generation logic)
    start_time = time.time()
    clusters = linker.clustering.cluster_pairwise_predictions_at_threshold(
        df_predictions, threshold_match_probability=threshold
    )
    clusters_spark_map = clusters.as_spark_dataframe()

    # --- FIX: Select only essential columns to prevent duplicate column errors ---
    # This ensures no extra columns like 'address_line1' are carried into the join.
    clusters_spark_map = clusters_spark_map.select("unique_id", "cluster_id")
    # --------------------------------------------------------------------------

    # CRITICAL LOGIC: JOIN CLUSTERS BACK TO THE ORIGINAL DATA
    # This join is now safe because clusters_spark_map only has unique_id and cluster_id.
    full_df_with_clusters = original_df_spark.join(
        clusters_spark_map, on="unique_id", how="left"
    )
    full_df_with_clusters = full_df_with_clusters.withColumn(
        "cluster_id", coalesce(col("cluster_id"), col("unique_id"))
    )
    full_df_with_clusters = full_df_with_clusters.sort("cluster_id", "unique_id")

    # (Writing logic remains the same)
    temp_cluster_folder = cluster_path + "_temp"
    (full_df_with_clusters
        .coalesce(1)
        .write.mode("overwrite")
        .option("header", True)
        .csv(temp_cluster_folder))

    move_single_csv(temp_cluster_folder, cluster_path)
    print(f"Single, full cluster CSV written to '{cluster_path}' in {time.time() - start_time:.2f} seconds.")

    return df_predictions_spark, full_df_with_clusters

In [None]:
# import auto_blocking2 as ab
# from splink import SparkAPI, Linker

# db_api = SparkAPI(spark_session=spark)
# settings, roles, diagnostics, df_enhanced = ab.auto_generate_settings(df, db_api, spark=spark)  # Add spark=spark here
# print("Roles:", roles)
# print("Diagnostics:", diagnostics)

# # Sample 40% of the data for training
# training_df = df_enhanced.sample(0.4, seed=42)

# # Initialize Linker with training data and train
# linker = Linker(training_df, settings, db_api=db_api)
# deterministic_rules = [diag['rule'] for diag in diagnostics if diag['kept']]
# try:
#     linker.training.estimate_probability_two_random_records_match(
#         deterministic_matching_rules=deterministic_rules,
#         recall=0.95
#     )
# except:
#     linker.training.estimate_probability_two_random_records_match(
#         deterministic_matching_rules=deterministic_rules,
#         recall=1.0
#     )
# linker.training.estimate_u_using_random_sampling(max_pairs=5e5)
# if deterministic_rules:
#     linker.training.estimate_parameters_using_expectation_maximisation(deterministic_rules[0])

# # Save the trained model to JSON
# linker.misc.save_model_to_json("splink_model.json", overwrite=True)

# # Load the trained model with the full DataFrame
# import json
# with open("splink_model.json", "r", encoding="utf-8") as fh:
#     trained_settings = json.load(fh)
# full_linker = Linker(df_enhanced, trained_settings, db_api=db_api)

# df_preds_spark, clusters_spark = generate_predictions(
#     full_linker,
#     "splink_predictions.csv",
#     "splink_clusters.csv",
#     threshold=0.99
# )    

import auto_blocking2 as ab
from splink import SparkAPI, Linker
from pyspark.storagelevel import StorageLevel
import json
import time

# --- Setup (same as before) ---
db_api = SparkAPI(spark_session=spark)

# --- 1. Generate Settings (The slow UDFs run here) ---
start_time = time.time()
settings, roles, diagnostics, df_enhanced = ab.auto_generate_settings(df, db_api, spark=spark)
print(f"✅ Settings and derived columns generated in {time.time() - start_time:.2f} seconds.")
print("\nDetected Roles:", roles)
print("\nBlocking Rule Diagnostics:", diagnostics)

# --- 2. OPTIMIZATION: Cache the enhanced dataframe ---
# This is the most important change. It stores the result of the slow UDFs in memory.
# All subsequent steps will be much faster because they read from this cache.
df_enhanced.persist(StorageLevel.MEMORY_AND_DISK)
# This action triggers the computation and caching.
print(f"\nCached enhanced DataFrame with {df_enhanced.count()} rows.")


# --- 3. MODIFIED: Use a 40% sample for training ---
# The .sample() function takes a fraction (0.4 = 40%).
# `seed=42` ensures you get the same random sample every time you run the code.
training_df = df_enhanced.sample(0.4, seed=42)

# Cache the training set as it will be used repeatedly during the training steps.
training_df.cache()
print(f"✅ Using a 40% sample of {training_df.count()} records for model training.")
# --- 4. Train the Model (now on the small, fast sample) ---
start_time = time.time()
linker = Linker(training_df, settings, db_api=db_api)
deterministic_rules = [diag['rule'] for diag in diagnostics if diag['kept']]
try:
    linker.training.estimate_probability_two_random_records_match(
        deterministic_matching_rules=deterministic_rules, recall=0.95
    )
except Exception:
    linker.training.estimate_probability_two_random_records_match(
        deterministic_matching_rules=deterministic_rules, recall=1.0
    )
linker.training.estimate_u_using_random_sampling(max_pairs=2e6)

# --- MODIFICATION START: Train the model on ALL good blocking rules ---
# This loop teaches the model from a much richer set of examples.
print("\nStarting Expectation Maximisation training on all blocking rules combined...")
linker.training.estimate_parameters_using_expectation_maximisation(deterministic_rules)
# Save the trained model
linker.misc.save_model_to_json("splink_model.json", overwrite=True)
print(f"✅ Model training completed in {time.time() - start_time:.2f} seconds.")

# Clean up the training cache
training_df.unpersist()

# --- 5. Predict on the FULL dataset ---
# Load the trained model
with open("splink_model.json", "r", encoding="utf-8") as fh:
    trained_settings = json.load(fh)
# Create a new linker with the FULL (cached) enhanced dataframe
full_linker = Linker(df_enhanced, trained_settings, db_api=db_api)

# --- 6. CRITICAL CHANGE: Call the correct function from the previous cell ---
# Use the function that generates a single, complete CSV file.
# We must pass `df_enhanced` so it can join the full record data.
df_preds_spark, clusters_spark_full = generate_predictions_single_csv(
    full_linker,
    df_enhanced,  # <-- Pass the full, cached data here
    "splink_predictions.csv",
    "splink_clusters.csv",
    threshold=0.95
)

# --- 7. Clean up the main cache ---
df_enhanced.unpersist()

# --- 8. Show Final Results ---
print("\n✅ Job Complete! Final files have been generated.")
print("Showing a sample of the final, complete cluster data:")
clusters_spark_full.show(20, truncate=False)

In [1]:
!pip freeze > requirements.txt

