# Entity Resolution with Splink using Spark

In [None]:
from splink.spark.jar_location import similarity_jar_location
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import warnings

conf = SparkConf()
conf.set("spark.default.parallelism", "50")
conf.set("spark.sql.shuffle.partitions", "50")
conf.set("spark.executor.memory", "6g")  # Set to 6 gigabytes
conf.set("spark.driver.memory", "6g")  # Set to 6 gigabytes

# Add custom similarity functions, which are bundled with Splink
# documented here: https://github.com/moj-analytical-services/splink_scalaudfs
path = similarity_jar_location()
conf.set("spark.jars", path)

sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession(sc)
spark.sparkContext.setCheckpointDir("./tmp_checkpoints")

# Disable warnings for pyspark / hadoop
spark.sparkContext.setLogLevel("ERROR")
warnings.simplefilter("ignore", UserWarning)

In [None]:
# Read in the data and view a sample
df = spark.read.csv('/Users/chase.burkhalter/Documents/DBeaver/Exports/TEMP_SPLINK_DATA_202404291432_ALL.csv', header=True, inferSchema=True)

df.show(10)

In [35]:
from splink.spark.linker import SparkLinker
#from splink.spark.blocking_rule_library import block_on

# Define the settings for the linker
settings = {
    "link_type": "dedupe_only",
    "blocking_rules_to_generate_predictions": [
        "l.anonymous_id = r.anonymous_id",
        "l.user_id = r.user_id",
        "l.email = r.email"
    ],
    "retain_matching_columns": False,
    "retain_intermediate_calculation_columns": False
}

# Instantiate the linker with Spark
linker = SparkLinker(df, settings, spark=spark)

In [None]:
# Estimate the probability that two random records match
for rule in settings["blocking_rules_to_generate_predictions"]:
    prob = linker.estimate_probability_two_random_records_match(rule, recall=0.9)
    print(f"Probability two random records match for '{rule}': {prob}")
    
# Count the number of comparisons generated by blocking rules
for rule in settings["blocking_rules_to_generate_predictions"]:
    count = linker.count_num_comparisons_from_blocking_rule(rule)
    print(f"Number of comparisons generated by '{rule}': {count}")

In [None]:
# Use deterministic rules link
df_deterministic = linker.deterministic_link()
df_deterministic.as_pandas_dataframe(10)

In [None]:
# Cluster the deterministic links based on threshold
clusters_deterministic = linker.cluster_pairwise_predictions_at_threshold(df_deterministic, threshold_match_probability=0.9)
clusters_deterministic.as_pandas_dataframe()

In [None]:
# Initiate the cluster studio dashboard to view the clusters 
linker.cluster_studio_dashboard(df_deterministic, clusters_deterministic, "cluster_studio.html", sampling_method="by_cluster_size", overwrite=True)
from IPython.display import IFrame
IFrame(
    src="cluster_studio.html", width="100%", height=1100
)