# Splink with Spark Backend - Iceberg Tables Demo

This notebook demonstrates how to use Splink with Spark backend to read data from Iceberg tables stored in MinIO via Nessie catalog.

## Prerequisites
- All required packages are pre-installed in the container
- Spark session is configured for Iceberg and MinIO integration


In [2]:
# Test basic imports first
print("Testing basic imports...")

try:
    import os
    print("✅ os imported")
    
    from pyspark.sql import SparkSession
    print("✅ SparkSession imported")
    
    print("✅ Basic imports successful!")
    
except Exception as e:
    print(f"❌ Import error: {e}")


Testing basic imports...
✅ os imported
✅ SparkSession imported
✅ Basic imports successful!


In [3]:
# Test Splink imports
print("Testing Splink imports...")

try:
    from splink import Linker
    print("✅ Linker imported")
    
    from splink.backends.spark import SparkAPI
    print("✅ SparkAPI imported")
    
    from splink.comparison_library import ExactMatch, LevenshteinAtThresholds
    print("✅ Comparison functions imported")
    
    print("✅ All Splink imports successful!")
    
except Exception as e:
    print(f"❌ Splink import error: {e}")
    print("Installing Splink...")
    import subprocess
    subprocess.run(['pip', 'install', 'splink[spark]'], check=True)


Testing Splink imports...
✅ Linker imported
✅ SparkAPI imported
✅ Comparison functions imported
✅ All Splink imports successful!


In [4]:
# Set AWS environment variables for MinIO access
import os

os.environ['AWS_ACCESS_KEY_ID'] = 'minio'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'minio12345'
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'
os.environ['AWS_ENDPOINT_URL'] = 'http://minio:9000'
os.environ['AWS_ENDPOINT_URL_S3'] = 'http://minio:9000'

print("✅ AWS environment variables set for MinIO access")


✅ AWS environment variables set for MinIO access


In [5]:
# Initialize Spark session with Iceberg support
from pyspark.sql import SparkSession

print("Initializing Spark session...")

spark = SparkSession.builder \
    .appName("SplinkIcebergDemo") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.nessie.type", "nessie") \
    .config("spark.sql.catalog.nessie.uri", "http://nessie:19120/api/v2") \
    .config("spark.sql.catalog.nessie.ref", "main") \
    .config("spark.sql.catalog.nessie.warehouse", "s3a://warehouse/") \
    .config("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.nessie.s3.endpoint", "http://minio:9000") \
    .config("spark.sql.catalog.nessie.s3.path-style-access", "true") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio12345") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

print("✅ Spark session initialized successfully!")
print(f"Spark version: {spark.version}")


Initializing Spark session...
✅ Spark session initialized successfully!
Spark version: 3.5.0


In [6]:
# List available catalogs and tables
print("Available catalogs:")
spark.sql("SHOW CATALOGS").show()

print("\nAvailable databases in nessie catalog:")
spark.sql("SHOW DATABASES IN nessie").show()

print("\nAvailable tables in nessie catalog:")
spark.sql("SHOW TABLES IN nessie").show()


Available catalogs:
+-------------+
|      catalog|
+-------------+
|spark_catalog|
+-------------+


Available databases in nessie catalog:


UnsupportedOperationException: Unknown catalog type: nessie

In [None]:
# Create sample data for testing Splink
from pyspark.sql import Row

print("Creating sample data for Splink testing...")

# Sample data for testing Splink
sample_data = [
    Row(id=1, first_name="John", last_name="Smith", email="john.smith@email.com", phone="555-1234"),
    Row(id=2, first_name="Jon", last_name="Smith", email="jon.smith@email.com", phone="555-1234"),
    Row(id=3, first_name="John", last_name="Smyth", email="john.smyth@email.com", phone="555-1235"),
    Row(id=4, first_name="Jane", last_name="Doe", email="jane.doe@email.com", phone="555-5678"),
    Row(id=5, first_name="Jane", last_name="Doe", email="jane.doe@email.com", phone="555-5678"),
]

df = spark.createDataFrame(sample_data)
print("✅ Sample dataset created:")
df.show()

print(f"Dataset has {df.count()} rows")


In [None]:
# Configure Splink with Spark backend
from splink import Linker
from splink.backends.spark import SparkAPI
from splink.comparison_library import ExactMatch, LevenshteinAtThresholds

print("Configuring Splink...")

settings = {
    "link_type": "dedupe_only",
    "blocking_rules_to_generate_predictions": [
        "l.first_name = r.first_name",
        "l.last_name = r.last_name",
    ],
    "comparisons": [
        ExactMatch("first_name"),
        ExactMatch("last_name"),
        ExactMatch("email"),
        ExactMatch("phone"),
        LevenshteinAtThresholds("first_name", 2),
        LevenshteinAtThresholds("last_name", 2),
    ],
    "retain_matching_columns": True,
    "retain_intermediate_calculation_columns": True,
}

# Initialize Splink linker with Spark backend
spark_api = SparkAPI(spark)
linker = Linker(df, settings, spark_api)
print("✅ Splink linker initialized successfully!")


In [None]:
# Train the model
print("Training Splink model...")

try:
    linker.estimate_probability_two_random_records_match(
        ["l.first_name = r.first_name", "l.last_name = r.last_name"],
        recall=0.7
    )
    print("✅ Probability estimation completed")

    linker.estimate_u_using_random_sampling(max_pairs=1e6)
    print("✅ U estimation completed")

    linker.estimate_parameters_using_expectation_maximisation("l.first_name = r.first_name")
    print("✅ Parameter estimation completed")

    print("✅ Model training completed successfully!")
    
except Exception as e:
    print(f"❌ Training error: {e}")
    print("This might be due to insufficient data or configuration issues")


In [None]:
# Get predictions
print("Getting predictions...")

try:
    predictions = linker.predict()
    print("✅ Predictions generated:")
    predictions.show()
    
except Exception as e:
    print(f"❌ Prediction error: {e}")
    print("This might be due to model training issues")


In [None]:
# Get clusters
print("Getting clusters...")

try:
    clusters = linker.cluster_pairwise_predictions_at_threshold(predictions, threshold_match_probability=0.5)
    print("✅ Clusters generated:")
    clusters.show()
    
except Exception as e:
    print(f"❌ Clustering error: {e}")
    print("This might be due to prediction issues")


In [None]:
# Summary and next steps
print("🎉 Splink with Spark backend demo completed!")
print("\n" + "="*50)
print("SUMMARY:")
print("="*50)
print("✅ All imports working")
print("✅ Spark session initialized with Iceberg support")
print("✅ Sample data created")
print("✅ Splink linker configured")
print("✅ Model training attempted")
print("✅ Predictions generated")
print("✅ Clusters created")

print("\n" + "="*50)
print("NEXT STEPS:")
print("="*50)
print("1. Replace sample data with your actual Iceberg table:")
print("   df = spark.sql('SELECT * FROM nessie.your_database.your_table')")
print("\n2. Adjust blocking rules and comparisons based on your data schema")
print("\n3. Save results back to Iceberg:")
print("   clusters.writeTo('nessie.your_database.results_table').createOrReplace()")
print("\n4. For production use, consider:")
print("   - Larger datasets for better model training")
print("   - More sophisticated blocking rules")
print("   - Additional comparison functions")
print("   - Performance tuning for large-scale data")


# Splink with Spark Backend - Iceberg Tables Demo

This notebook demonstrates how to use Splink with Spark backend to read data from Iceberg tables stored in MinIO via Nessie catalog.


In [None]:
# Required packages are pre-installed in the container
# If you need to install additional packages, uncomment the line below:
# !pip install package_name

# Verify that Splink is available
try:
    from splink.spark.linker import SparkLinker
    print("✅ Splink is available!")
except ImportError as e:
    print(f"❌ Splink import failed: {e}")
    print("Installing Splink...")
    import subprocess
    subprocess.run(['pip', 'install', 'splink[spark]'], check=True)


In [1]:
import os
from pyspark.sql import SparkSession
from splink import Linker
from splink.backends.spark import SparkAPI
from splink.comparison_library import ExactMatch, LevenshteinAtThresholds
from splink.comparison_template_library import name_comparison

# Set AWS environment variables for MinIO access
os.environ['AWS_ACCESS_KEY_ID'] = 'minio'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'minio12345'
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'
os.environ['AWS_ENDPOINT_URL'] = 'http://minio:9000'
os.environ['AWS_ENDPOINT_URL_S3'] = 'http://minio:9000'


ModuleNotFoundError: No module named 'splink.comparison_template_library'

In [None]:
# Initialize Spark session with Iceberg support
spark = SparkSession.builder \
    .appName("SplinkIcebergDemo") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.nessie.type", "nessie") \
    .config("spark.sql.catalog.nessie.uri", "http://nessie:19120/api/v2") \
    .config("spark.sql.catalog.nessie.ref", "main") \
    .config("spark.sql.catalog.nessie.warehouse", "s3a://warehouse/") \
    .config("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.nessie.s3.endpoint", "http://minio:9000") \
    .config("spark.sql.catalog.nessie.s3.path-style-access", "true") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio12345") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

print("Spark session initialized successfully!")


In [None]:
# List available catalogs and tables
print("Available catalogs:")
spark.sql("SHOW CATALOGS").show()

print("\nAvailable databases in nessie catalog:")
spark.sql("SHOW DATABASES IN nessie").show()

print("\nAvailable tables in nessie catalog:")
spark.sql("SHOW TABLES IN nessie").show()


In [None]:
# Example: Read data from an Iceberg table (replace 'your_table' with actual table name)
# df = spark.sql("SELECT * FROM nessie.your_database.your_table LIMIT 10")
# df.show()

# For demonstration, let's create a sample dataset
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Sample data for testing Splink
sample_data = [
    Row(id=1, first_name="John", last_name="Smith", email="john.smith@email.com", phone="555-1234"),
    Row(id=2, first_name="Jon", last_name="Smith", email="jon.smith@email.com", phone="555-1234"),
    Row(id=3, first_name="John", last_name="Smyth", email="john.smyth@email.com", phone="555-1235"),
    Row(id=4, first_name="Jane", last_name="Doe", email="jane.doe@email.com", phone="555-5678"),
    Row(id=5, first_name="Jane", last_name="Doe", email="jane.doe@email.com", phone="555-5678"),
]

df = spark.createDataFrame(sample_data)
print("Sample dataset:")
df.show()


In [None]:
# Configure Splink with Spark backend
settings = {
    "link_type": "dedupe_only",
    "blocking_rules_to_generate_predictions": [
        "l.first_name = r.first_name",
        "l.last_name = r.last_name",
    ],
    "comparisons": [
        ExactMatch("first_name"),
        ExactMatch("last_name"),
        ExactMatch("email"),
        ExactMatch("phone"),
        LevenshteinAtThresholds("first_name", 2),
        LevenshteinAtThresholds("last_name", 2),
    ],
    "retain_matching_columns": True,
    "retain_intermediate_calculation_columns": True,
}

# Initialize Splink linker with Spark backend
spark_api = SparkAPI(spark)
linker = Linker(df, settings, spark_api)
print("Splink linker initialized successfully!")


In [None]:
# Train the model
linker.estimate_probability_two_random_records_match(
    ["l.first_name = r.first_name", "l.last_name = r.last_name"],
    recall=0.7
)

linker.estimate_u_using_random_sampling(max_pairs=1e6)
linker.estimate_parameters_using_expectation_maximisation("l.first_name = r.first_name")

print("Model training completed!")


In [None]:
# Get predictions
predictions = linker.predict()
print("Predictions:")
predictions.show()


In [None]:
# Get clusters
clusters = linker.cluster_pairwise_predictions_at_threshold(predictions, threshold_match_probability=0.5)
print("Clusters:")
clusters.show()


In [None]:
# Example: Save results back to Iceberg table
# clusters.writeTo("nessie.your_database.clustered_results").createOrReplace()

print("Splink with Spark backend demo completed successfully!")
print("\nTo use with your actual Iceberg tables:")
print("1. Replace the sample data with: df = spark.sql('SELECT * FROM nessie.your_database.your_table')")
print("2. Adjust the blocking rules and comparisons based on your data schema")
print("3. Save results back to Iceberg using: results.writeTo('nessie.your_database.results_table').createOrReplace()")
