In [0]:
from datetime import datetime, timedelta
import random
import uuid
import pandas as pd

In [0]:
catalog = 'lakeflow_demo'
schema = 'landing'

Generate and load Customer Data

In [0]:
def generate_cdc_data_customers(num_rows=20):
    # Available CDC operation types
    operations = ["INSERT", "UPDATE", "DELETE"] 
    data = []

    # Loop to generate the specified number of CDC records
    for i in range(num_rows):
        cust_id = i + 1
        
        # Randomly select a CDC operation
        op = random.choice(operations) 
        
        # Generate a random recent timestamp (within the last 60 minutes)
        ts = datetime.now() - timedelta(minutes=random.randint(0, 60)) 

        # Create the CDC record dictionary
        record = {
            "customer_id": cust_id,
            "name": f"Customer_{cust_id}",
            "email": f"customer_{cust_id}@example.com",
            # Generate a random US-style phone number suffix
            "phone": f"+1-202-555-{random.randint(1000,9999)}", 
            "op": op,
            # Format the timestamp as a string
            "ts": ts.strftime("%Y-%m-%d %H:%M:%S") 
        }
        
        # Add the record to the list
        data.append(record)

    # Convert the list of records into a pandas DataFrame
    return pd.DataFrame(data)

Generate and load Orders data

In [0]:
def generate_cdc_data_orders(num_rows=20):
    operations = ["INSERT", "UPDATE", "DELETE"]
    data = []

    for i in range(num_rows):
        # All lines below are indented to belong to the 'for' loop
        order_id = uuid.uuid4().hex[:8] # Unique order ID
        op = random.choice(operations) 
        ts = datetime.now() - timedelta(minutes=random.randint(0, 60)) 

        record = {
            "order_id": order_id,
            "customer_id": random.randint(1, 20), # Link to customers
            "order_amount": round(random.uniform(20, 500), 2), # Random amount
            "order_status": random.choice(["Pending", "Shipped", "Delivered", "Cancelled"]),
            "op": op,
            "ts": ts.strftime("%Y-%m-%d %H:%M:%S")
        }
        
        data.append(record)
        
    # The return statement should be outside the loop but inside the function body
    return pd.DataFrame(data)

In [0]:
customers_df = generate_cdc_data_customers()
orders_df = generate_cdc_data_orders()

In [0]:
customers_df = spark.createDataFrame(customers_df)
orders_df = spark.createDataFrame(orders_df)

In [0]:
# Paths to the volume
customers_volume_path = "/Volumes/lakeflow_demo/landing/cdc_data/customers"
orders_volume_path = "/Volumes/lakeflow_demo/landing/cdc_data/orders"

In [0]:
customers_df.coalesce(1) \
.write.mode("overwrite") \
.option("header", "true") \
.csv(customers_volume_path)

# Write orders CDC data as a single CSV file
orders_df.coalesce(1) \
.write.mode("overwrite") \
.option("header", "true") \
.csv(orders_volume_path)

print("Single CSV file per table written to Databricks Volume.")

Single CSV file per table written to Databricks Volume.
