# Import Required Libraries
Import the necessary libraries, including PySpark, Snowflake connector, and Iceberg.

In [None]:
# Import Required Libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import snowflake.connector
import logging

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Snowflake to Iceberg Replication") \
    .config("spark.jars.packages", "net.snowflake:snowflake-jdbc,net.snowflake:spark-snowflake") \
    .getOrCreate()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuration Setup
Define all configurations such as source & target tables, processing modes, and logging levels in a single cell for easy reusability.

In [None]:
# Configuration Setup

# Snowflake configuration
snowflake_config = {
    "sfURL": "your_snowflake_account_url",
    "sfUser": "your_snowflake_username",
    "sfPassword": "your_snowflake_password",
    "sfDatabase": "your_snowflake_database",
    "sfSchema": "your_snowflake_schema",
    "sfWarehouse": "your_snowflake_warehouse",
    "sfRole": "your_snowflake_role"
}

# Iceberg configuration
iceberg_config = {
    "iceberg_catalog": "your_iceberg_catalog",
    "iceberg_namespace": "your_iceberg_namespace",
    "iceberg_table": "your_iceberg_table"
}

# Control table configuration
control_table_config = {
    "control_table_name": "control_table",
    "control_table_schema": "control_schema"
}

# Processing modes
processing_modes = {
    "full_load": True,
    "chunk_load": False,
    "partition_load": False,
    "bucketing": False,
    "incremental_load": False
}

# Logging levels
logging_levels = {
    "info": logging.INFO,
    "debug": logging.DEBUG,
    "error": logging.ERROR
}

# Set logging level
logger.setLevel(logging_levels["info"])

# Example of how to use configurations
logger.info("Snowflake configuration: %s", snowflake_config)
logger.info("Iceberg configuration: %s", iceberg_config)
logger.info("Control table configuration: %s", control_table_config)
logger.info("Processing modes: %s", processing_modes)

# Schema Handling
Support replication for tables of different schemas dynamically.

In [None]:
# Schema Handling

# Function to get schema from Snowflake table
def get_snowflake_table_schema(table_name):
    query = f"SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'"
    conn = snowflake.connector.connect(
        user=snowflake_config["sfUser"],
        password=snowflake_config["sfPassword"],
        account=snowflake_config["sfURL"],
        warehouse=snowflake_config["sfWarehouse"],
        database=snowflake_config["sfDatabase"],
        schema=snowflake_config["sfSchema"],
        role=snowflake_config["sfRole"]
    )
    cursor = conn.cursor()
    cursor.execute(query)
    schema_info = cursor.fetchall()
    cursor.close()
    conn.close()
    return schema_info

# Function to create Iceberg table schema from Snowflake schema
def create_iceberg_table_schema(snowflake_schema):
    iceberg_schema = StructType()
    for column in snowflake_schema:
        column_name = column[0]
        data_type = column[1]
        if data_type == "TEXT":
            iceberg_schema.add(StructField(column_name, StringType(), True))
        elif data_type == "NUMBER":
            iceberg_schema.add(StructField(column_name, IntegerType(), True))
        # Add more data type mappings as needed
    return iceberg_schema

# Function to create Iceberg table if it doesn't exist
def create_iceberg_table_if_not_exists(table_name, schema):
    if not spark.catalog.tableExists(f"{iceberg_config['iceberg_catalog']}.{iceberg_config['iceberg_namespace']}.{table_name}"):
        df = spark.createDataFrame([], schema)
        df.write.format("iceberg").mode("overwrite").saveAsTable(f"{iceberg_config['iceberg_catalog']}.{iceberg_config['iceberg_namespace']}.{table_name}")
        logger.info(f"Iceberg table {table_name} created with schema: {schema}")

# Example usage
source_table_name = "your_source_table"
snowflake_schema = get_snowflake_table_schema(source_table_name)
iceberg_schema = create_iceberg_table_schema(snowflake_schema)
create_iceberg_table_if_not_exists(iceberg_config["iceberg_table"], iceberg_schema)

# Target Table Creation
If the Iceberg table doesnâ€™t exist, create it based on the source table schema.

In [None]:
# Target Table Creation

# Function to get schema from Snowflake table
def get_snowflake_table_schema(table_name):
    query = f"SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'"
    conn = snowflake.connector.connect(
        user=snowflake_config["sfUser"],
        password=snowflake_config["sfPassword"],
        account=snowflake_config["sfURL"],
        warehouse=snowflake_config["sfWarehouse"],
        database=snowflake_config["sfDatabase"],
        schema=snowflake_config["sfSchema"],
        role=snowflake_config["sfRole"]
    )
    cursor = conn.cursor()
    cursor.execute(query)
    schema_info = cursor.fetchall()
    cursor.close()
    conn.close()
    return schema_info

# Function to create Iceberg table schema from Snowflake schema
def create_iceberg_table_schema(snowflake_schema):
    iceberg_schema = StructType()
    for column in snowflake_schema:
        column_name = column[0]
        data_type = column[1]
        if data_type == "TEXT":
            iceberg_schema.add(StructField(column_name, StringType(), True))
        elif data_type == "NUMBER":
            iceberg_schema.add(StructField(column_name, IntegerType(), True))
        # Add more data type mappings as needed
    return iceberg_schema

# Function to create Iceberg table if it doesn't exist
def create_iceberg_table_if_not_exists(table_name, schema):
    if not spark.catalog.tableExists(f"{iceberg_config['iceberg_catalog']}.{iceberg_config['iceberg_namespace']}.{table_name}"):
        df = spark.createDataFrame([], schema)
        df.write.format("iceberg").mode("overwrite").saveAsTable(f"{iceberg_config['iceberg_catalog']}.{iceberg_config['iceberg_namespace']}.{table_name}")
        logger.info(f"Iceberg table {table_name} created with schema: {schema}")

# Example usage
source_table_name = "your_source_table"
snowflake_schema = get_snowflake_table_schema(source_table_name)
iceberg_schema = create_iceberg_table_schema(snowflake_schema)
create_iceberg_table_if_not_exists(iceberg_config["iceberg_table"], iceberg_schema)

# Control Table Management
Manage the control table that tracks table names and batch_sk (batch sequence key).

In [None]:
# Control Table Management

# Function to initialize control table if it doesn't exist
def initialize_control_table():
    control_table_full_name = f"{control_table_config['control_table_schema']}.{control_table_config['control_table_name']}"
    if not spark.catalog.tableExists(control_table_full_name):
        schema = StructType([
            StructField("table_name", StringType(), False),
            StructField("batch_sk", IntegerType(), False)
        ])
        df = spark.createDataFrame([], schema)
        df.write.format("iceberg").mode("overwrite").saveAsTable(control_table_full_name)
        logger.info(f"Control table {control_table_full_name} created.")

# Function to update control table with new batch_sk
def update_control_table(table_name, batch_sk):
    control_table_full_name = f"{control_table_config['control_table_schema']}.{control_table_config['control_table_name']}"
    control_df = spark.table(control_table_full_name)
    existing_entry = control_df.filter(col("table_name") == table_name).collect()
    
    if existing_entry:
        control_df = control_df.withColumn("batch_sk", 
                                           col("batch_sk").when(col("table_name") == table_name, batch_sk).otherwise(col("batch_sk")))
    else:
        new_entry = spark.createDataFrame([(table_name, batch_sk)], control_df.schema)
        control_df = control_df.union(new_entry)
    
    control_df.write.format("iceberg").mode("overwrite").saveAsTable(control_table_full_name)
    logger.info(f"Control table {control_table_full_name} updated with table {table_name} and batch_sk {batch_sk}.")

# Function to revert control table update in case of failure
def revert_control_table_update(table_name, previous_batch_sk):
    update_control_table(table_name, previous_batch_sk)
    logger.info(f"Control table reverted to previous batch_sk {previous_batch_sk} for table {table_name}.")

# Example usage
initialize_control_table()
update_control_table("your_source_table", 1)
revert_control_table_update("your_source_table", 0)

# Initialize Control Table
Insert an entry with batch_sk = 0 if the table is not in control_table.

In [None]:
# Initialize Control Table

# Function to initialize control table if it doesn't exist
def initialize_control_table():
    control_table_full_name = f"{control_table_config['control_table_schema']}.{control_table_config['control_table_name']}"
    if not spark.catalog.tableExists(control_table_full_name):
        schema = StructType([
            StructField("table_name", StringType(), False),
            StructField("batch_sk", IntegerType(), False)
        ])
        df = spark.createDataFrame([], schema)
        df.write.format("iceberg").mode("overwrite").saveAsTable(control_table_full_name)
        logger.info(f"Control table {control_table_full_name} created.")

# Function to insert an entry with batch_sk = 0 if the table is not in control_table
def insert_initial_entry_if_not_exists(table_name):
    control_table_full_name = f"{control_table_config['control_table_schema']}.{control_table_config['control_table_name']}"
    control_df = spark.table(control_table_full_name)
    existing_entry = control_df.filter(col("table_name") == table_name).collect()
    
    if not existing_entry:
        new_entry = spark.createDataFrame([(table_name, 0)], control_df.schema)
        control_df = control_df.union(new_entry)
        control_df.write.format("iceberg").mode("overwrite").saveAsTable(control_table_full_name)
        logger.info(f"Inserted initial entry for table {table_name} with batch_sk = 0 in control table.")

# Example usage
initialize_control_table()
insert_initial_entry_if_not_exists("your_source_table")

# Update Control Table
Update control_table before inserting into Iceberg.

In [None]:
# Update Control Table

# Function to update control table before inserting into Iceberg
def update_control_table_before_insertion(table_name, new_batch_sk):
    control_table_full_name = f"{control_table_config['control_table_schema']}.{control_table_config['control_table_name']}"
    control_df = spark.table(control_table_full_name)
    
    # Get the previous batch_sk for the table
    previous_entry = control_df.filter(col("table_name") == table_name).collect()
    if previous_entry:
        previous_batch_sk = previous_entry[0]["batch_sk"]
    else:
        previous_batch_sk = 0
    
    # Update the control table with the new batch_sk
    update_control_table(table_name, new_batch_sk)
    
    # Return the previous batch_sk for potential rollback
    return previous_batch_sk

# Example usage
table_name = "your_source_table"
new_batch_sk = 2
previous_batch_sk = update_control_table_before_insertion(table_name, new_batch_sk)
logger.info(f"Control table updated for table {table_name} with new batch_sk {new_batch_sk}. Previous batch_sk was {previous_batch_sk}.")

# Revert Control Table Update
If the Iceberg insertion fails, revert the batch_sk update.

In [None]:
# Revert Control Table Update

# Function to revert control table update in case of failure
def revert_control_table_update(table_name, previous_batch_sk):
    control_table_full_name = f"{control_table_config['control_table_schema']}.{control_table_config['control_table_name']}"
    control_df = spark.table(control_table_full_name)
    
    # Revert the batch_sk to the previous value
    control_df = control_df.withColumn("batch_sk", 
                                       col("batch_sk").when(col("table_name") == table_name, previous_batch_sk).otherwise(col("batch_sk")))
    
    control_df.write.format("iceberg").mode("overwrite").saveAsTable(control_table_full_name)
    logger.info(f"Control table reverted to previous batch_sk {previous_batch_sk} for table {table_name}.")

# Example usage
table_name = "your_source_table"
previous_batch_sk = 1
revert_control_table_update(table_name, previous_batch_sk)

# Loading Strategies
Implement different loading strategies including Full Load, Chunk Load, Partition Load, Bucketing, and Incremental Load.

In [None]:
# Loading Strategies

# Function to perform full load
def full_load(source_table, target_table):
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("dbtable", source_table) \
        .load()
    df.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
    logger.info(f"Full load completed for table {source_table} into {target_table}")

# Function to perform chunk load
def chunk_load(source_table, target_table, chunk_size):
    offset = 0
    while True:
        query = f"(SELECT * FROM {source_table} LIMIT {chunk_size} OFFSET {offset})"
        df = spark.read \
            .format("snowflake") \
            .options(**snowflake_config) \
            .option("query", query) \
            .load()
        if df.count() == 0:
            break
        df.write.format("iceberg").mode("append").saveAsTable(target_table)
        offset += chunk_size
        logger.info(f"Chunk load completed for offset {offset} for table {source_table} into {target_table}")

# Function to perform partition load
def partition_load(source_table, target_table, partition_column):
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("dbtable", source_table) \
        .load()
    df.write.partitionBy(partition_column).format("iceberg").mode("overwrite").saveAsTable(target_table)
    logger.info(f"Partition load completed for table {source_table} into {target_table} by partition column {partition_column}")

# Function to perform bucketing
def bucketing_load(source_table, target_table, bucket_column, num_buckets):
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("dbtable", source_table) \
        .load()
    df.write.bucketBy(num_buckets, bucket_column).sortBy(bucket_column).format("iceberg").mode("overwrite").saveAsTable(target_table)
    logger.info(f"Bucketing load completed for table {source_table} into {target_table} by bucket column {bucket_column} with {num_buckets} buckets")

# Function to perform incremental load
def incremental_load(source_table, target_table, last_batch_sk):
    query = f"(SELECT * FROM {source_table} WHERE batch_sk > {last_batch_sk})"
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("query", query) \
        .load()
    df.write.format("iceberg").mode("append").saveAsTable(target_table)
    logger.info(f"Incremental load completed for table {source_table} into {target_table} with batch_sk > {last_batch_sk}")

# Example usage
if processing_modes["full_load"]:
    full_load("your_source_table", iceberg_config["iceberg_table"])
elif processing_modes["chunk_load"]:
    chunk_load("your_source_table", iceberg_config["iceberg_table"], chunk_size=1000)
elif processing_modes["partition_load"]:
    partition_load("your_source_table", iceberg_config["iceberg_table"], partition_column="your_partition_column")
elif processing_modes["bucketing"]:
    bucketing_load("your_source_table", iceberg_config["iceberg_table"], bucket_column="your_bucket_column", num_buckets=10)
elif processing_modes["incremental_load"]:
    last_batch_sk = 0  # This should be fetched from the control table
    incremental_load("your_source_table", iceberg_config["iceberg_table"], last_batch_sk)

# Full Load
Load the entire dataset from the source table to the target table.

In [None]:
# Full Load

# Function to perform full load
def full_load(source_table, target_table):
    # Read the entire dataset from the Snowflake source table
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("dbtable", source_table) \
        .load()
    
    # Write the dataset to the Iceberg target table
    df.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
    
    # Log the completion of the full load
    logger.info(f"Full load completed for table {source_table} into {target_table}")

# Example usage
if processing_modes["full_load"]:
    full_load("your_source_table", iceberg_config["iceberg_table"])

# Chunk Load
Load data in chunks from the source table to the target table.

In [None]:
# Chunk Load

# Function to perform chunk load
def chunk_load(source_table, target_table, chunk_size):
    offset = 0
    while True:
        query = f"(SELECT * FROM {source_table} LIMIT {chunk_size} OFFSET {offset})"
        df = spark.read \
            .format("snowflake") \
            .options(**snowflake_config) \
            .option("query", query) \
            .load()
        if df.count() == 0:
            break
        df.write.format("iceberg").mode("append").saveAsTable(target_table)
        offset += chunk_size
        logger.info(f"Chunk load completed for offset {offset} for table {source_table} into {target_table}")

# Example usage
if processing_modes["chunk_load"]:
    chunk_load("your_source_table", iceberg_config["iceberg_table"], chunk_size=1000)

# Partition Load
Load data based on partitions from the source table to the target table.

In [None]:
# Partition Load

# Function to perform partition load
def partition_load(source_table, target_table, partition_column):
    # Read the dataset from the Snowflake source table
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("dbtable", source_table) \
        .load()
    
    # Write the dataset to the Iceberg target table partitioned by the specified column
    df.write.partitionBy(partition_column).format("iceberg").mode("overwrite").saveAsTable(target_table)
    
    # Log the completion of the partition load
    logger.info(f"Partition load completed for table {source_table} into {target_table} by partition column {partition_column}")

# Example usage
if processing_modes["partition_load"]:
    partition_load("your_source_table", iceberg_config["iceberg_table"], partition_column="your_partition_column")

# Bucketing
Load data using bucketing strategy from the source table to the target table.

In [None]:
# Bucketing

# Function to perform bucketing load
def bucketing_load(source_table, target_table, bucket_column, num_buckets):
    # Read the dataset from the Snowflake source table
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("dbtable", source_table) \
        .load()
    
    # Write the dataset to the Iceberg target table using bucketing
    df.write.bucketBy(num_buckets, bucket_column).sortBy(bucket_column).format("iceberg").mode("overwrite").saveAsTable(target_table)
    
    # Log the completion of the bucketing load
    logger.info(f"Bucketing load completed for table {source_table} into {target_table} by bucket column {bucket_column} with {num_buckets} buckets")

# Example usage
if processing_modes["bucketing"]:
    bucketing_load("your_source_table", iceberg_config["iceberg_table"], bucket_column="your_bucket_column", num_buckets=10)

# Incremental Load
Load only the new or updated data from the source table to the target table.

In [None]:
# Incremental Load

# Function to perform incremental load
def incremental_load(source_table, target_table, last_batch_sk):
    # Query to fetch only new or updated data from the Snowflake source table
    query = f"(SELECT * FROM {source_table} WHERE batch_sk > {last_batch_sk})"
    
    # Read the incremental data from the Snowflake source table
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("query", query) \
        .load()
    
    # Write the incremental data to the Iceberg target table
    df.write.format("iceberg").mode("append").saveAsTable(target_table)
    
    # Log the completion of the incremental load
    logger.info(f"Incremental load completed for table {source_table} into {target_table} with batch_sk > {last_batch_sk}")

# Example usage
if processing_modes["incremental_load"]:
    # Fetch the last batch_sk from the control table
    control_table_full_name = f"{control_table_config['control_table_schema']}.{control_table_config['control_table_name']}"
    control_df = spark.table(control_table_full_name)
    last_batch_sk = control_df.filter(col("table_name") == source_table_name).select("batch_sk").collect()[0]["batch_sk"]
    
    # Perform incremental load
    incremental_load(source_table_name, iceberg_config["iceberg_table"], last_batch_sk)

# Logging Setup
Implement logging at each step to track progress and potential errors.

In [None]:
# Logging Setup

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Set logging level
logger.setLevel(logging_levels["info"])

# Example of how to use logging
logger.info("Logging setup complete.")
logger.info("Snowflake configuration: %s", snowflake_config)
logger.info("Iceberg configuration: %s", iceberg_config)
logger.info("Control table configuration: %s", control_table_config)
logger.info("Processing modes: %s", processing_modes)

# Merge Strategy
Provide configurable options for Append, Overwrite, and Merge (Upsert).

In [None]:
# Merge Strategy

# Function to perform append operation
def append_data(source_table, target_table):
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("dbtable", source_table) \
        .load()
    df.write.format("iceberg").mode("append").saveAsTable(target_table)
    logger.info(f"Append operation completed for table {source_table} into {target_table}")

# Function to perform overwrite operation
def overwrite_data(source_table, target_table):
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("dbtable", source_table) \
        .load()
    df.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
    logger.info(f"Overwrite operation completed for table {source_table} into {target_table}")

# Function to perform merge (upsert) operation
def merge_data(source_table, target_table, primary_key):
    source_df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("dbtable", source_table) \
        .load()
    
    target_df = spark.table(target_table)
    
    # Perform the merge (upsert) operation
    merged_df = source_df.alias("source").join(target_df.alias("target"), primary_key, "outer") \
        .select(
            *[col("source." + col_name).alias(col_name) for col_name in source_df.columns],
            *[col("target." + col_name).alias(col_name) for col_name in target_df.columns if col_name not in source_df.columns]
        )
    
    merged_df.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
    logger.info(f"Merge (upsert) operation completed for table {source_table} into {target_table}")

# Example usage
merge_strategy = "append"  # Change this to "overwrite" or "merge" as needed
primary_key = "id"  # Change this to the actual primary key column

if merge_strategy == "append":
    append_data("your_source_table", iceberg_config["iceberg_table"])
elif merge_strategy == "overwrite":
    overwrite_data("your_source_table", iceberg_config["iceberg_table"])
elif merge_strategy == "merge":
    merge_data("your_source_table", iceberg_config["iceberg_table"], primary_key)

# Append
Append new data to the target table.

In [None]:
# Append

# Function to append new data to the target table
def append_data(source_table, target_table):
    # Read new data from the Snowflake source table
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("dbtable", source_table) \
        .load()
    
    # Append the new data to the Iceberg target table
    df.write.format("iceberg").mode("append").saveAsTable(target_table)
    
    # Log the completion of the append operation
    logger.info(f"Append operation completed for table {source_table} into {target_table}")

# Example usage
append_data("your_source_table", iceberg_config["iceberg_table"])

# Overwrite
Overwrite the existing data in the target table.

In [None]:
# Overwrite

# Function to overwrite the existing data in the target table
def overwrite_data(source_table, target_table):
    # Read the entire dataset from the Snowflake source table
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("dbtable", source_table) \
        .load()
    
    # Overwrite the existing data in the Iceberg target table
    df.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
    
    # Log the completion of the overwrite operation
    logger.info(f"Overwrite operation completed for table {source_table} into {target_table}")

# Example usage
overwrite_data("your_source_table", iceberg_config["iceberg_table"])

# Merge (Upsert)
Merge new data with existing data in the target table.

In [None]:
# Merge (Upsert)

# Function to perform merge (upsert) operation
def merge_data(source_table, target_table, primary_key):
    # Read new data from the Snowflake source table
    source_df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("dbtable", source_table) \
        .load()
    
    # Read existing data from the Iceberg target table
    target_df = spark.table(target_table)
    
    # Perform the merge (upsert) operation
    merged_df = source_df.alias("source").join(target_df.alias("target"), primary_key, "outer") \
        .select(
            *[col("source." + col_name).alias(col_name) for col_name in source_df.columns],
            *[col("target." + col_name).alias(col_name) for col_name in target_df.columns if col_name not in source_df.columns]
        )
    
    # Write the merged data back to the Iceberg target table
    merged_df.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
    
    # Log the completion of the merge (upsert) operation
    logger.info(f"Merge (upsert) operation completed for table {source_table} into {target_table}")

# Example usage
primary_key = "id"  # Change this to the actual primary key column
merge_data("your_source_table", iceberg_config["iceberg_table"], primary_key)

# Data Change Handling
Capture INSERT, UPDATE, and DELETE operations from Snowflake and reflect these changes in Iceberg efficiently.

In [None]:
# Data Change Handling

# Function to capture INSERT operations from Snowflake and reflect in Iceberg
def handle_insert_operations(source_table, target_table):
    query = f"(SELECT * FROM {source_table} WHERE op_type = 'INSERT')"
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("query", query) \
        .load()
    df.write.format("iceberg").mode("append").saveAsTable(target_table)
    logger.info(f"INSERT operations handled for table {source_table} into {target_table}")

# Function to capture UPDATE operations from Snowflake and reflect in Iceberg
def handle_update_operations(source_table, target_table, primary_key):
    query = f"(SELECT * FROM {source_table} WHERE op_type = 'UPDATE')"
    source_df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("query", query) \
        .load()
    
    target_df = spark.table(target_table)
    
    merged_df = source_df.alias("source").join(target_df.alias("target"), primary_key, "left") \
        .select(
            *[col("source." + col_name).alias(col_name) for col_name in source_df.columns]
        )
    
    merged_df.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
    logger.info(f"UPDATE operations handled for table {source_table} into {target_table}")

# Function to capture DELETE operations from Snowflake and reflect in Iceberg
def handle_delete_operations(source_table, target_table, primary_key):
    query = f"(SELECT * FROM {source_table} WHERE op_type = 'DELETE')"
    delete_df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("query", query) \
        .load()
    
    target_df = spark.table(target_table)
    
    remaining_df = target_df.join(delete_df, primary_key, "left_anti")
    
    remaining_df.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
    logger.info(f"DELETE operations handled for table {source_table} into {target_table}")

# Example usage
handle_insert_operations("your_source_table", iceberg_config["iceberg_table"])
handle_update_operations("your_source_table", iceberg_config["iceberg_table"], primary_key="id")
handle_delete_operations("your_source_table", iceberg_config["iceberg_table"], primary_key="id")

# Capture INSERT Operations
Capture and handle INSERT operations from the source table.

In [None]:
# Capture INSERT Operations

# Function to capture and handle INSERT operations from the source table
def capture_insert_operations(source_table, target_table):
    # Query to fetch INSERT operations from the Snowflake source table
    query = f"(SELECT * FROM {source_table} WHERE op_type = 'INSERT')"
    
    # Read the INSERT operations from the Snowflake source table
    df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("query", query) \
        .load()
    
    # Append the INSERT operations to the Iceberg target table
    df.write.format("iceberg").mode("append").saveAsTable(target_table)
    
    # Log the completion of handling INSERT operations
    logger.info(f"INSERT operations captured and handled for table {source_table} into {target_table}")

# Example usage
capture_insert_operations("your_source_table", iceberg_config["iceberg_table"])

# Capture UPDATE Operations
Capture and handle UPDATE operations from the source table.

In [None]:
# Capture UPDATE Operations

# Function to capture and handle UPDATE operations from the source table
def capture_update_operations(source_table, target_table, primary_key):
    # Query to fetch UPDATE operations from the Snowflake source table
    query = f"(SELECT * FROM {source_table} WHERE op_type = 'UPDATE')"
    
    # Read the UPDATE operations from the Snowflake source table
    source_df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("query", query) \
        .load()
    
    # Read the existing data from the Iceberg target table
    target_df = spark.table(target_table)
    
    # Perform the merge (upsert) operation
    merged_df = source_df.alias("source").join(target_df.alias("target"), primary_key, "left") \
        .select(
            *[col("source." + col_name).alias(col_name) for col_name in source_df.columns]
        )
    
    # Write the merged data back to the Iceberg target table
    merged_df.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
    
    # Log the completion of handling UPDATE operations
    logger.info(f"UPDATE operations captured and handled for table {source_table} into {target_table}")

# Example usage
capture_update_operations("your_source_table", iceberg_config["iceberg_table"], primary_key="id")

# Capture DELETE Operations
Capture and handle DELETE operations from the source table.

In [None]:
# Capture DELETE Operations

# Function to capture and handle DELETE operations from the source table
def capture_delete_operations(source_table, target_table, primary_key):
    # Query to fetch DELETE operations from the Snowflake source table
    query = f"(SELECT * FROM {source_table} WHERE op_type = 'DELETE')"
    
    # Read the DELETE operations from the Snowflake source table
    delete_df = spark.read \
        .format("snowflake") \
        .options(**snowflake_config) \
        .option("query", query) \
        .load()
    
    # Read the existing data from the Iceberg target table
    target_df = spark.table(target_table)
    
    # Perform the anti join to remove the deleted records
    remaining_df = target_df.join(delete_df, primary_key, "left_anti")
    
    # Write the remaining data back to the Iceberg target table
    remaining_df.write.format("iceberg").mode("overwrite").saveAsTable(target_table)
    
    # Log the completion of handling DELETE operations
    logger.info(f"DELETE operations captured and handled for table {source_table} into {target_table}")

# Example usage
capture_delete_operations("your_source_table", iceberg_config["iceberg_table"], primary_key="id")

# Workflow Logic
Define the workflow logic for initial table creation & data insertion and existing table with new data.

In [None]:
# Workflow Logic

# Function to handle initial table creation and data insertion
def initial_table_creation_and_insertion(source_table, target_table):
    # Get the schema from the Snowflake source table
    snowflake_schema = get_snowflake_table_schema(source_table)
    
    # Create the Iceberg table schema from the Snowflake schema
    iceberg_schema = create_iceberg_table_schema(snowflake_schema)
    
    # Create the Iceberg table if it doesn't exist
    create_iceberg_table_if_not_exists(target_table, iceberg_schema)
    
    # Insert initial entry in control table if it doesn't exist
    insert_initial_entry_if_not_exists(source_table)
    
    # Perform full load from Snowflake to Iceberg
    full_load(source_table, target_table)
    
    # Update control table with new batch_sk
    new_batch_sk = 1  # This should be fetched from the source table
    update_control_table(source_table, new_batch_sk)
    
    logger.info(f"Initial table creation and data insertion completed for table {source_table} into {target_table}")

# Function to handle existing table with new data
def existing_table_with_new_data(source_table, target_table):
    # Fetch the last batch_sk from the control table
    control_table_full_name = f"{control_table_config['control_table_schema']}.{control_table_config['control_table_name']}"
    control_df = spark.table(control_table_full_name)
    last_batch_sk = control_df.filter(col("table_name") == source_table).select("batch_sk").collect()[0]["batch_sk"]
    
    # Perform incremental load from Snowflake to Iceberg
    incremental_load(source_table, target_table, last_batch_sk)
    
    # Update control table with new batch_sk
    new_batch_sk = last_batch_sk + 1  # This should be fetched from the source table
    previous_batch_sk = update_control_table_before_insertion(source_table, new_batch_sk)
    
    # If the Iceberg insertion fails, revert the batch_sk update
    try:
        incremental_load(source_table, target_table, last_batch_sk)
    except Exception as e:
        revert_control_table_update(source_table, previous_batch_sk)
        logger.error(f"Error during incremental load: {e}")
        raise e
    
    logger.info(f"Existing table with new data handled for table {source_table} into {target_table}")

# Example usage
initial_table_creation_and_insertion("your_source_table", iceberg_config["iceberg_table"])
existing_table_with_new_data("your_source_table", iceberg_config["iceberg_table"])

# Initial Table Creation & Data Insertion
Handle the case where the table is newly created and new data is inserted with an increasing batch_sk.

In [None]:
# Initial Table Creation & Data Insertion

# Function to handle initial table creation and data insertion
def initial_table_creation_and_insertion(source_table, target_table):
    # Get the schema from the Snowflake source table
    snowflake_schema = get_snowflake_table_schema(source_table)
    
    # Create the Iceberg table schema from the Snowflake schema
    iceberg_schema = create_iceberg_table_schema(snowflake_schema)
    
    # Create the Iceberg table if it doesn't exist
    create_iceberg_table_if_not_exists(target_table, iceberg_schema)
    
    # Insert initial entry in control table if it doesn't exist
    insert_initial_entry_if_not_exists(source_table)
    
    # Perform full load from Snowflake to Iceberg
    full_load(source_table, target_table)
    
    # Update control table with new batch_sk
    new_batch_sk = 1  # This should be fetched from the source table
    update_control_table(source_table, new_batch_sk)
    
    logger.info(f"Initial table creation and data insertion completed for table {source_table} into {target_table}")

# Example usage
initial_table_creation_and_insertion("your_source_table", iceberg_config["iceberg_table"])

# Existing Table with New Data
Handle the case where new data is inserted into an existing table, updating batch_sk.

In [None]:
# Existing Table with New Data

# Function to handle existing table with new data
def existing_table_with_new_data(source_table, target_table):
    # Fetch the last batch_sk from the control table
    control_table_full_name = f"{control_table_config['control_table_schema']}.{control_table_config['control_table_name']}"
    control_df = spark.table(control_table_full_name)
    last_batch_sk = control_df.filter(col("table_name") == source_table).select("batch_sk").collect()[0]["batch_sk"]
    
    # Update control table with new batch_sk
    new_batch_sk = last_batch_sk + 1  # This should be fetched from the source table
    previous_batch_sk = update_control_table_before_insertion(source_table, new_batch_sk)
    
    # Perform incremental load from Snowflake to Iceberg
    try:
        incremental_load(source_table, target_table, last_batch_sk)
    except Exception as e:
        revert_control_table_update(source_table, previous_batch_sk)
        logger.error(f"Error during incremental load: {e}")
        raise e
    
    logger.info(f"Existing table with new data handled for table {source_table} into {target_table}")

# Example usage
existing_table_with_new_data("your_source_table", iceberg_config["iceberg_table"])