#### Setup Demo
1. Create a schema
2. Create a volume `source_data` inside that schema
3. Generate source files and then upload to the various volume paths specificed by the SDP code

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS bdunm_catalog.ldp_demo

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS bdunm_catalog.ldp_demo.source_data

#### Generate standard (normal upsert incremental) sample files

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
import random
from datetime import datetime, timedelta

# Define schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("create_timestamp", TimestampType(), True),
    StructField("attribute1", StringType(), True),
    StructField("attribute2", StringType(), True),
    StructField("attribute3", StringType(), True)
])

# Generate random data
def generate_data(ids, start_time):
    data = []
    for i in ids:
        data.append((i, start_time + timedelta(minutes=random.randint(0, 1000)),
                     f"attr1_{random.randint(1, 100)}",
                     f"attr2_{random.randint(1, 100)}",
                     f"attr3_{random.randint(1, 100)}"))
    return data

# File 1: Unique IDs
data1 = generate_data(range(1, 6), datetime.now())
df1 = spark.createDataFrame(data1, schema)

# File 2: Unique IDs
data2 = generate_data(range(6, 11), datetime.now())
df2 = spark.createDataFrame(data2, schema)

# File 3: Duplicate IDs
data3 = generate_data([1, 1, 2, 2, 3], datetime.now())
df3 = spark.createDataFrame(data3, schema)

# File 4: Some IDs same as File 3 with more recent timestamps
data4 = generate_data([1, 2, 3, 4, 5], datetime.now() + timedelta(days=1))
df4 = spark.createDataFrame(data4, schema)



In [0]:
df1.display()

id,create_timestamp,attribute1,attribute2,attribute3
1,2025-11-08T07:27:52.186Z,attr1_88,attr2_33,attr3_86
2,2025-11-08T00:40:52.186Z,attr1_74,attr2_43,attr3_31
3,2025-11-08T00:36:52.186Z,attr1_14,attr2_32,attr3_53
4,2025-11-07T19:27:52.186Z,attr1_27,attr2_95,attr3_16
5,2025-11-07T23:40:52.186Z,attr1_49,attr2_96,attr3_46


In [0]:
# Write CSV files with a single file output
df1.coalesce(1).write.csv("/Volumes/bdunm_catalog/ldp_demo/source_data/raw/file1.csv", header=True, mode="overwrite")
df2.coalesce(1).write.csv("/Volumes/bdunm_catalog/ldp_demo/source_data/raw/file2.csv", header=True, mode="overwrite")
df3.coalesce(1).write.csv("/Volumes/bdunm_catalog/ldp_demo/source_data/raw/file3.csv", header=True, mode="overwrite")
df4.coalesce(1).write.csv("/Volumes/bdunm_catalog/ldp_demo/source_data/raw/file4.csv", header=True, mode="overwrite")

#### Generate CDC sample files

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
import random
from datetime import datetime, timedelta

# Define schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("create_timestamp", TimestampType(), True),
    StructField("attribute1", StringType(), True),
    StructField("attribute2", StringType(), True),
    StructField("attribute3", StringType(), True),
    StructField("cdc_operation", StringType(), True)  # Add CDC column
])

# Generate random data
def generate_data(ids, start_time, operation):
    data = []
    for i in ids:
        data.append((i, start_time + timedelta(minutes=random.randint(0, 1000)),
                     f"attr1_{random.randint(1, 100)}",
                     f"attr2_{random.randint(1, 100)}",
                     f"attr3_{random.randint(1, 100)}",
                     operation))  # Add CDC operation
    return data

# File 1: Unique IDs
data1 = generate_data(range(1, 6), datetime.now(), "INSERT")
df1 = spark.createDataFrame(data1, schema)

# File 2: Unique IDs
data2 = generate_data(range(6, 11), datetime.now(), "INSERT")
df2 = spark.createDataFrame(data2, schema)

# File 3: Duplicate IDs
data3 = generate_data([1, 1, 2, 2, 3], datetime.now(), "UPDATE")
df3 = spark.createDataFrame(data3, schema)

# File 4: Some IDs same as File 3 with more recent timestamps
data4 = generate_data([1, 2, 3, 4, 5], datetime.now() + timedelta(days=1), "UPDATE")
# Add a delete operation for an ID
data4.append((6, datetime.now() + timedelta(days=1), None, None, None, "DELETE"))
df4 = spark.createDataFrame(data4, schema)

In [0]:
df1.display()

id,create_timestamp,attribute1,attribute2,attribute3,cdc_operation
1,2025-11-07T20:13:21.809Z,attr1_62,attr2_44,attr3_12,INSERT
2,2025-11-07T21:06:21.809Z,attr1_23,attr2_58,attr3_90,INSERT
3,2025-11-07T21:58:21.809Z,attr1_21,attr2_65,attr3_25,INSERT
4,2025-11-08T03:26:21.809Z,attr1_80,attr2_95,attr3_36,INSERT
5,2025-11-08T04:30:21.809Z,attr1_84,attr2_14,attr3_48,INSERT


In [0]:
# Write CSV files
df1.coalesce(1).write.csv("/Volumes/bdunm_catalog/ldp_demo/source_data/raw/file1_CDC.csv", header=True)
df2.coalesce(1).write.csv("/Volumes/bdunm_catalog/ldp_demo/source_data/raw/file2_CDC.csv", header=True)
df3.coalesce(1).write.csv("/Volumes/bdunm_catalog/ldp_demo/source_data/raw/file3_CDC.csv", header=True)
df4.coalesce(1).write.csv("/Volumes/bdunm_catalog/ldp_demo/source_data/raw/file4_CDC.csv", header=True)

#### Validating Results

In [0]:
%sql
-- Space to explore resultant tables and validate if loading behavior is as expected

