# Incremental Ingestion Without Timestamp (Hash-Based CDC)

This notebook demonstrates CDC-style incremental ingestion from CSV files **without a timestamp column** using a **row hash comparison** approach.

## How It Works

1. **Load** source CSV files
2. **Generate** a hash for each row based on all columns
3. **Compare** with previously processed hashes stored in metadata
4. **Identify** new and changed records
5. **Write** incremental records to destination
6. **Update** the hash registry for next run

## Prerequisites

- CSV files (without timestamp) uploaded to `/data/source/`
- Storage account configured with Synapse workspace

## Configuration

Update the storage account name below to match your deployment.

In [None]:
# ============================================================================
# CONFIGURATION - Update these values for your environment
# ============================================================================

# Storage account name (from deployment output)
STORAGE_ACCOUNT = "<your-storage-account-name>"

# Container name
CONTAINER = "data"

# Paths
BASE_PATH = f"abfss://{CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net"
SOURCE_PATH = f"{BASE_PATH}/source/"
DESTINATION_PATH = f"{BASE_PATH}/destination/without_timestamp/"
METADATA_PATH = f"{BASE_PATH}/metadata/"

# Hash registry file (stores previously processed record hashes)
HASH_REGISTRY_FILE = f"{METADATA_PATH}hash_registry/"

# Source file pattern (files without timestamp column)
SOURCE_FILE_PATTERN = "products_no_ts*.csv"

# Primary key column for identifying records
PRIMARY_KEY_COLUMN = "product_id"

print(f"Source Path: {SOURCE_PATH}")
print(f"Destination Path: {DESTINATION_PATH}")
print(f"Metadata Path: {METADATA_PATH}")

## Step 1: Load Source Data

Read all CSV files from the source folder.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, concat_ws, md5, lit, current_timestamp,
    when, coalesce
)
from pyspark.sql.types import StringType
from datetime import datetime

# Initialize Spark session (already available in Synapse)
spark = SparkSession.builder.getOrCreate()

# Load source CSV files
source_file_path = f"{SOURCE_PATH}{SOURCE_FILE_PATTERN}"
print(f"Loading source files from: {source_file_path}")

try:
    # Read CSV files with header
    source_df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(source_file_path)
    
    total_records = source_df.count()
    print(f"\nTotal records in source: {total_records}")
    print(f"\nSource Schema:")
    source_df.printSchema()
    print(f"\nSample source data:")
    source_df.show(5, truncate=False)
    
except Exception as e:
    print(f"Error loading source files: {str(e)}")
    print("Make sure CSV files are uploaded to the source folder.")
    raise

## Step 2: Generate Row Hashes

Create a unique hash for each row based on all column values. This hash will be used to detect changes.

In [None]:
def generate_row_hash(df, exclude_columns=None):
    """
    Generate MD5 hash for each row based on all columns (except excluded ones).
    """
    if exclude_columns is None:
        exclude_columns = []
    
    # Get columns to hash (exclude specified columns)
    hash_columns = [c for c in df.columns if c not in exclude_columns]
    
    # Convert all columns to string and concatenate
    # Use coalesce to handle nulls
    concat_expr = concat_ws(
        "|",
        *[coalesce(col(c).cast(StringType()), lit("NULL")) for c in hash_columns]
    )
    
    # Generate MD5 hash
    return df.withColumn("_row_hash", md5(concat_expr))

# Generate hashes for source data
source_with_hash = generate_row_hash(source_df)

print("Source data with row hashes:")
source_with_hash.select(PRIMARY_KEY_COLUMN, "_row_hash").show(10, truncate=False)

## Step 3: Load Previous Hash Registry

Read the hash registry from previous runs to compare against.

In [None]:
def load_hash_registry():
    """
    Load the hash registry from previous runs.
    Returns empty DataFrame if no registry exists (first run).
    """
    try:
        registry_df = spark.read.parquet(HASH_REGISTRY_FILE)
        count = registry_df.count()
        print(f"Loaded existing hash registry with {count} records.")
        return registry_df
    except Exception as e:
        print(f"No existing hash registry found. This appears to be the first run.")
        # Return empty DataFrame with expected schema
        from pyspark.sql.types import StructType, StructField, StringType
        schema = StructType([
            StructField(PRIMARY_KEY_COLUMN, StringType(), True),
            StructField("_row_hash", StringType(), True),
            StructField("_registered_at", StringType(), True)
        ])
        return spark.createDataFrame([], schema)

# Load previous hash registry
previous_registry = load_hash_registry()
previous_count = previous_registry.count()
print(f"\nPrevious registry records: {previous_count}")

if previous_count > 0:
    print("\nPrevious registry sample:")
    previous_registry.show(5, truncate=False)

## Step 4: Identify New and Changed Records

Compare current hashes with previous registry to find:
- **New records**: Primary key not in previous registry
- **Changed records**: Primary key exists but hash is different

In [None]:
def identify_incremental_records(current_df, previous_registry_df, pk_column):
    """
    Identify new and changed records by comparing hashes.
    Returns DataFrame with incremental records and change type.
    """
    # Rename columns in previous registry to avoid conflicts
    prev_renamed = previous_registry_df.select(
        col(pk_column).alias(f"_prev_{pk_column}"),
        col("_row_hash").alias("_prev_hash")
    )
    
    # Left join current with previous
    joined = current_df.join(
        prev_renamed,
        current_df[pk_column] == prev_renamed[f"_prev_{pk_column}"],
        "left"
    )
    
    # Identify change type
    # - NEW: no previous record (prev_pk is null)
    # - CHANGED: previous record exists but hash is different
    # - UNCHANGED: previous record exists and hash is same
    result = joined.withColumn(
        "_change_type",
        when(col(f"_prev_{pk_column}").isNull(), lit("NEW"))
        .when(col("_row_hash") != col("_prev_hash"), lit("CHANGED"))
        .otherwise(lit("UNCHANGED"))
    )
    
    # Filter to only new and changed records
    incremental = result.filter(col("_change_type") != "UNCHANGED")
    
    # Drop temporary columns
    incremental = incremental.drop(f"_prev_{pk_column}", "_prev_hash")
    
    return incremental

# Identify incremental records
incremental_df = identify_incremental_records(
    source_with_hash, 
    previous_registry, 
    PRIMARY_KEY_COLUMN
)

# Count by change type
print("Change Summary:")
incremental_df.groupBy("_change_type").count().show()

incremental_count = incremental_df.count()
print(f"\nTotal incremental records: {incremental_count}")

if incremental_count > 0:
    print("\nIncremental records:")
    incremental_df.show(truncate=False)

## Step 5: Write Incremental Data to Destination

Append the new/changed records to the destination folder.

In [None]:
if incremental_count > 0:
    # Add processing metadata
    output_df = incremental_df.withColumn("_processed_at", current_timestamp())
    
    # Select columns for output (include change type for tracking)
    output_columns = [c for c in source_df.columns] + ["_change_type", "_processed_at"]
    output_df = output_df.select(output_columns)
    
    # Write to destination (append mode)
    print(f"Writing {incremental_count} records to: {DESTINATION_PATH}")
    
    output_df.write \
        .mode("append") \
        .option("header", "true") \
        .csv(DESTINATION_PATH)
    
    print(f"\nSuccessfully wrote {incremental_count} records to destination.")
    print("\nOutput sample:")
    output_df.show(5, truncate=False)
else:
    print("No new or changed records to write. Skipping write operation.")

## Step 6: Update Hash Registry

Save the current hashes to the registry for the next run.

In [None]:
def update_hash_registry(current_df, pk_column):
    """
    Update the hash registry with current record hashes.
    This overwrites the previous registry with the complete current state.
    """
    # Create registry with primary key, hash, and timestamp
    registry_df = current_df.select(
        col(pk_column),
        col("_row_hash"),
        current_timestamp().alias("_registered_at")
    )
    
    # Write registry (overwrite mode - full snapshot)
    registry_df.write \
        .mode("overwrite") \
        .parquet(HASH_REGISTRY_FILE)
    
    count = registry_df.count()
    print(f"Hash registry updated with {count} records.")
    return count

# Update hash registry with current state
registry_count = update_hash_registry(source_with_hash, PRIMARY_KEY_COLUMN)
print(f"\nRegistry now contains {registry_count} record hashes.")

## Step 7: Verify Results

Check the destination folder and current hash registry state.

In [None]:
# Read and display destination data
print("=" * 60)
print("VERIFICATION: Current State")
print("=" * 60)

try:
    dest_df = spark.read \
        .option("header", "true") \
        .csv(DESTINATION_PATH)
    
    dest_count = dest_df.count()
    print(f"\nTotal records in destination: {dest_count}")
    
    # Show change type distribution
    print("\nRecords by change type:")
    dest_df.groupBy("_change_type").count().show()
    
    print(f"\nDestination data sample:")
    dest_df.show(10, truncate=False)
except Exception as e:
    print(f"No data in destination yet or error reading: {str(e)}")

# Read current hash registry
print(f"\n{'=' * 60}")
print("Current Hash Registry State:")
print("=" * 60)
try:
    registry_df = spark.read.parquet(HASH_REGISTRY_FILE)
    print(f"\nRegistry contains {registry_df.count()} records.")
    registry_df.show(10, truncate=False)
except Exception as e:
    print(f"No hash registry found: {str(e)}")

## Summary

This notebook demonstrated:

1. **Hash-based CDC**: Using MD5 hashes to detect changes without timestamps
2. **Change Detection**: Identifying NEW and CHANGED records
3. **State Persistence**: Hash registry stored for subsequent runs

### To Test Incremental Behavior:

1. Run this notebook with the initial dataset
2. Upload a new CSV file with:
   - New records (new product_id values)
   - Modified records (existing product_id with changed values)
3. Run the notebook again - only new/changed records will be processed

### Key Points:

- The hash registry is stored as Parquet in `/data/metadata/hash_registry/`
- Each run appends new data to `/data/destination/without_timestamp/`
- The `_change_type` column indicates whether a record is NEW or CHANGED
- The `_processed_at` column tracks when records were ingested

### Comparison with Timestamp-Based Approach:

| Aspect | Timestamp-Based | Hash-Based |
|--------|-----------------|------------|
| Requires timestamp column | Yes | No |
| Detects any field change | Only if timestamp updated | Yes |
| Storage overhead | Minimal (single value) | Higher (hash per record) |
| Processing complexity | Lower | Higher |