In [0]:
# Import required libraries
from pyspark.sql.functions import col
from datetime import datetime

# Define JDBC connection parameters
jdbc_url = "jdbc:sqlserver://sanchi-cdc-server.database.windows.net:1433;database=RetailCDC_Jaipur"
jdbc_properties = {
    "user": "sqladmin",
    "password": "Sanjal@12345",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# ====== Order Table ======
try:
    df_order = spark.read.format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "[dbo].[Order]") \
        .option("user", jdbc_properties["user"]) \
        .option("password", jdbc_properties["password"]) \
        .option("driver", jdbc_properties["driver"]) \
        .load()

    df_order_cdc = df_order.filter(col("OrderDate") >= "2025-07-16")
    output_path_order = "dbfs:/CDC_output/order_" + datetime.now().strftime("%Y%m%d_%H%M") + ".parquet"
    df_order_cdc.write.mode("overwrite").parquet(output_path_order)
    print("Order table CDC extraction complete.")
except Exception as e:
    print(f"Order Error: {e}")

# ====== Customer Table ======
try:
    df_customer = spark.read.format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "[dbo].[Customer]") \
        .option("user", jdbc_properties["user"]) \
        .option("password", jdbc_properties["password"]) \
        .option("driver", jdbc_properties["driver"]) \
        .load()

    output_path_customer = "dbfs:/CDC_output/customer_" + datetime.now().strftime("%Y%m%d_%H%M") + ".parquet"
    df_customer.write.mode("overwrite").parquet(output_path_customer)
    print("Customer table CDC extraction complete.")
except Exception as e:
    print(f"Customer Error: {e}")

# ====== Product Table ======
try:
    df_product = spark.read.format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "[dbo].[Product]") \
        .option("user", jdbc_properties["user"]) \
        .option("password", jdbc_properties["password"]) \
        .option("driver", jdbc_properties["driver"]) \
        .load()

    output_path_product = "dbfs:/CDC_output/product_" + datetime.now().strftime("%Y%m%d_%H%M") + ".parquet"
    df_product.write.mode("overwrite").parquet(output_path_product)
    print("Product table CDC extraction complete.")
except Exception as e:
    print(f"Product Error: {e}")

# ====== Inventory Table ======
try:
    df_inventory = spark.read.format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "[dbo].[Inventory]") \
        .option("user", jdbc_properties["user"]) \
        .option("password", jdbc_properties["password"]) \
        .option("driver", jdbc_properties["driver"]) \
        .load()

    output_path_inventory = "dbfs:/CDC_output/inventory_" + datetime.now().strftime("%Y%m%d_%H%M") + ".parquet"
    df_inventory.write.mode("overwrite").parquet(output_path_inventory)
    print("Inventory table CDC extraction complete.")
except Exception as e:
    print(f"Inventory Error: {e}")

In [0]:
display(dbutils.fs.ls("dbfs:/CDC_output/"))

path,name,size,modificationTime
dbfs:/CDC_output/customer_20250717_0935.parquet/,customer_20250717_0935.parquet/,0,1752744901000
dbfs:/CDC_output/customer_20250717_0956.parquet/,customer_20250717_0956.parquet/,0,1752746163000
dbfs:/CDC_output/customer_20250717_1008.parquet/,customer_20250717_1008.parquet/,0,1752746889000
dbfs:/CDC_output/customer_20250717_1039.parquet/,customer_20250717_1039.parquet/,0,1752748797000
dbfs:/CDC_output/customer_20250717_1055.parquet/,customer_20250717_1055.parquet/,0,1752749717000
dbfs:/CDC_output/customer_20250717_1109.parquet/,customer_20250717_1109.parquet/,0,1752750587000
dbfs:/CDC_output/customer_20250717_1126.parquet/,customer_20250717_1126.parquet/,0,1752751582000
dbfs:/CDC_output/customer_20250717_1132.parquet/,customer_20250717_1132.parquet/,0,1752751930000
dbfs:/CDC_output/customer_20250717_1145.parquet/,customer_20250717_1145.parquet/,0,1752752712000
dbfs:/CDC_output/customer_20250717_1252.parquet/,customer_20250717_1252.parquet/,0,1752756735000


In [0]:
print("Order count:", df_order.count())
df_order.select("OrderDate").show(5)

In [0]:
print("Order count:", df_order.count())
print("Customer count:", df_customer.count())
print("Product count:", df_product.count())
print("Inventory count:", df_inventory.count())

In [0]:
display(dbutils.fs.ls("dbfs:/CDC_output/"))

path,name,size,modificationTime
dbfs:/CDC_output/customer_20250717_0935.parquet/,customer_20250717_0935.parquet/,0,1752744901000
dbfs:/CDC_output/customer_20250717_0956.parquet/,customer_20250717_0956.parquet/,0,1752746163000
dbfs:/CDC_output/customer_20250717_1008.parquet/,customer_20250717_1008.parquet/,0,1752746889000
dbfs:/CDC_output/customer_20250717_1039.parquet/,customer_20250717_1039.parquet/,0,1752748797000
dbfs:/CDC_output/customer_20250717_1055.parquet/,customer_20250717_1055.parquet/,0,1752749717000
dbfs:/CDC_output/customer_20250717_1109.parquet/,customer_20250717_1109.parquet/,0,1752750587000
dbfs:/CDC_output/customer_20250717_1126.parquet/,customer_20250717_1126.parquet/,0,1752751582000
dbfs:/CDC_output/customer_20250717_1132.parquet/,customer_20250717_1132.parquet/,0,1752751930000
dbfs:/CDC_output/customer_20250717_1145.parquet/,customer_20250717_1145.parquet/,0,1752752712000
dbfs:/CDC_output/customer_20250717_1252.parquet/,customer_20250717_1252.parquet/,0,1752756735000


In [0]:
print("Order count before write:", df_order.count())
df_order.show(3)

In [0]:
from pyspark.sql.functions import col

# Step 2: Load and filter Order table
df_order = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "[dbo].[Order]") \
    .option("user", jdbc_properties["user"]) \
    .option("password", jdbc_properties["password"]) \
    .option("driver", jdbc_properties["driver"]) \
    .load()

print("Full Order table row count:", df_order.count())
df_order.show(5)

# Apply CDC filter (only if rows exist!)
df_cdc = df_order.filter(col("OrderDate") >= "2025-07-16")

print("CDC Order count:", df_cdc.count())
df_cdc.show(5)

In [0]:
df_cdc.write.mode("overwrite").parquet("dbfs:/CDC_output/order_20250717_1610.parquet")

In [0]:
display(dbutils.fs.ls("dbfs:/CDC_output/"))

path,name,size,modificationTime
dbfs:/CDC_output/customer_20250717_0935.parquet/,customer_20250717_0935.parquet/,0,1752744901000
dbfs:/CDC_output/customer_20250717_0956.parquet/,customer_20250717_0956.parquet/,0,1752746163000
dbfs:/CDC_output/customer_20250717_1008.parquet/,customer_20250717_1008.parquet/,0,1752746889000
dbfs:/CDC_output/customer_20250717_1039.parquet/,customer_20250717_1039.parquet/,0,1752748797000
dbfs:/CDC_output/customer_20250717_1055.parquet/,customer_20250717_1055.parquet/,0,1752749717000
dbfs:/CDC_output/customer_20250717_1109.parquet/,customer_20250717_1109.parquet/,0,1752750587000
dbfs:/CDC_output/customer_20250717_1126.parquet/,customer_20250717_1126.parquet/,0,1752751582000
dbfs:/CDC_output/customer_20250717_1132.parquet/,customer_20250717_1132.parquet/,0,1752751930000
dbfs:/CDC_output/customer_20250717_1145.parquet/,customer_20250717_1145.parquet/,0,1752752712000
dbfs:/CDC_output/customer_20250717_1252.parquet/,customer_20250717_1252.parquet/,0,1752756735000


In [0]:
from pyspark.sql.functions import col
from datetime import datetime

table_name = "Customer"
output_file_prefix = "customer"
date_column_candidates = ["CreatedAt", "UpdatedAt"]

df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", f"[dbo].[{table_name}]") \
    .option("user", jdbc_properties["user"]) \
    .option("password", jdbc_properties["password"]) \
    .option("driver", jdbc_properties["driver"]) \
    .load()

print(f"{table_name} row count:", df.count())
df.show()

valid_column = None
for col_candidate in date_column_candidates:
    if col_candidate in df.columns:
        valid_column = col_candidate
        break

if valid_column:
    df_cdc = df.filter(col(valid_column) >= "2025-07-16")
    print(f"✅ CDC {table_name} count:", df_cdc.count())

    timestamp = datetime.now().strftime("%H%M")
    output_path = f"dbfs:/CDC_output/{output_file_prefix}_20250717_{timestamp}.parquet"
    df_cdc.write.mode("overwrite").parquet(output_path)
    print(f"✅ CDC {table_name} data written to {output_path}")
else:
    print(f"❌ No valid CDC column found in {table_name} table.")

In [0]:
table_name = "Product"
output_file_prefix = "product"
date_column_candidates = ["CreatedAt", "UpdatedAt"]
df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", f"[dbo].[{table_name}]") \
    .option("user", jdbc_properties["user"]) \
    .option("password", jdbc_properties["password"]) \
    .option("driver", jdbc_properties["driver"]) \
    .load()

print(f"{table_name} row count:", df.count())
df.show()

valid_column = None
for col_candidate in date_column_candidates:
    if col_candidate in df.columns:
        valid_column = col_candidate
        break

if valid_column:
    df_cdc = df.filter(col(valid_column) >= "2025-07-16")
    print(f"✅ CDC {table_name} count:", df_cdc.count())

    timestamp = datetime.now().strftime("%H%M")
    output_path = f"dbfs:/CDC_output/{output_file_prefix}_20250717_{timestamp}.parquet"
    df_cdc.write.mode("overwrite").parquet(output_path)
    print(f"✅ CDC {table_name} data written to {output_path}")
else:
    print(f"❌ No valid CDC column found in {table_name} table.")



In [0]:
from pyspark.sql.functions import col
from datetime import datetime

# Read Product table
df_product = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "[dbo].[Product]") \
    .option("user", jdbc_properties["user"]) \
    .option("password", jdbc_properties["password"]) \
    .option("driver", jdbc_properties["driver"]) \
    .load()

# CDC filter using CreatedAt
df_product_cdc = df_product.filter(col("CreatedAt") >= "2025-07-16")

# Show result (optional)
df_product_cdc.show()

# Save filtered CDC data
output_path = "dbfs:/CDC_output/product_" + datetime.now().strftime("%Y%m%d_%H%M") + ".parquet"
df_product_cdc.write.mode("overwrite").parquet(output_path)

print(f"✅ CDC Product count: {df_product_cdc.count()}")

In [0]:
table_name = "Inventory"
output_file_prefix = "inventory"
date_column_candidates = ["UpdatedAt"]
df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", f"[dbo].[{table_name}]") \
    .option("user", jdbc_properties["user"]) \
    .option("password", jdbc_properties["password"]) \
    .option("driver", jdbc_properties["driver"]) \
    .load()

print(f"{table_name} row count:", df.count())
df.show()

valid_column = None
for col_candidate in date_column_candidates:
    if col_candidate in df.columns:
        valid_column = col_candidate
        break

if valid_column:
    df_cdc = df.filter(col(valid_column) >= "2025-07-16")
    print(f"✅ CDC {table_name} count:", df_cdc.count())

    timestamp = datetime.now().strftime("%H%M")
    output_path = f"dbfs:/CDC_output/{output_file_prefix}_20250717_{timestamp}.parquet"
    df_cdc.write.mode("overwrite").parquet(output_path)
    print(f"✅ CDC {table_name} data written to {output_path}")
else:
    print(f"❌ No valid CDC column found in {table_name} table.")

# rest same as above

In [0]:
table_name = "Order"
output_file_prefix = "order"
date_column_candidates = ["OrderDate"]
df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", f"[dbo].[{table_name}]") \
    .option("user", jdbc_properties["user"]) \
    .option("password", jdbc_properties["password"]) \
    .option("driver", jdbc_properties["driver"]) \
    .load()

print(f"{table_name} row count:", df.count())
df.show()

valid_column = None
for col_candidate in date_column_candidates:
    if col_candidate in df.columns:
        valid_column = col_candidate
        break

if valid_column:
    df_cdc = df.filter(col(valid_column) >= "2025-07-16")
    print(f"✅ CDC {table_name} count:", df_cdc.count())

    timestamp = datetime.now().strftime("%H%M")
    output_path = f"dbfs:/CDC_output/{output_file_prefix}_20250717_{timestamp}.parquet"
    df_cdc.write.mode("overwrite").parquet(output_path)
    print(f"✅ CDC {table_name} data written to {output_path}")
else:
    print(f"❌ No valid CDC column found in {table_name} table.")

# rest same as above

In [0]:
final_output_df.write.mode("overwrite").option("header", "true").csv("dbfs:/mnt/output/final_cdc/")