In [0]:
import os
import zipfile
import smtplib
from datetime import datetime
from email.message import EmailMessage
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

In [0]:
spark = SparkSession.builder.appName("CDCPipeline").getOrCreate()

In [0]:
jdbc_hostname = "nishchay-sql.database.windows.net"
jdbc_port = 1433
jdbc_database = "projectsql"
jdbc_username = "Nishchay@nishchay-sql"
jdbc_password = "Nish@4321"
jdbc_url = (
    f"jdbc:sqlserver://{jdbc_hostname}:{jdbc_port};database={jdbc_database};"
    "encrypt=true;trustServerCertificate=false;loginTimeout=30;"
)
connection_properties = {
    "user": jdbc_username,
    "password": jdbc_password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

cdc_tables = {
    "Customer": "cdc.dbo_Customer_CT",
    "Product": "cdc.dbo_Product_CT",
    "Order": "cdc.dbo_Orders_CT",
    "Inventory": "cdc.dbo_Inventory_CT"
}

In [0]:
sender_email = "mohannishchay110404@gmail.com"
receiver_email = "mohannishchay6@gmail.com"
app_password = "tdzscdxzprpilofy"
smtp_server = "smtp.gmail.com"
smtp_port = 465

In [0]:
output_path = "/mnt/datalake/cdc_output"
csv_temp_path = "/dbfs/tmp/cdc_exports"
zip_path = "/dbfs/tmp/cdc_exports/changed_data.zip"

In [0]:
def download_jdbc_driver():
    driver_path = "/databricks/jars/mssql-jdbc-12.2.0.jre11.jar"
    if not os.path.exists(driver_path):
        os.system(
            "wget https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/"
            "12.2.0.jre11/mssql-jdbc-12.2.0.jre11.jar -P /databricks/jars/"
        )
    print(f"JDBC driver check: {'Found' if os.path.exists(driver_path) else 'Downloaded'}")

def process_cdc_tables():
    current_time = datetime.now()
    os.makedirs(csv_temp_path, exist_ok=True)

    for table_name, cdc_table in cdc_tables.items():
        try:
            df_cdc = spark.read.jdbc(
                url=jdbc_url,
                table=cdc_table,
                properties=connection_properties
            )

            if df_cdc.count() == 0:
                print(f"No changes detected in {table_name}")
                continue

            df_cdc = df_cdc.withColumn("load_timestamp", lit(current_time))

            delta_path = f"{output_path}/{table_name}"
            df_cdc.write.format("delta").mode("append").save(delta_path)
            print(f"Saved CDC data for {table_name} to {delta_path}")

            csv_path = f"{csv_temp_path}/{table_name}.csv"
            df_cdc.coalesce(1).write.mode("overwrite").option("header", "true").csv(csv_path)
            print(f"Saved CSV for {table_name} to {csv_path}")

        except Exception as e:
            print(f"Error processing {table_name}: {str(e)}")

In [0]:
def create_zip_file():
    try:
        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for table_name in cdc_tables.keys():
                csv_file = f"{csv_temp_path}/{table_name}.csv"
                if os.path.exists(csv_file):
                    zipf.write(csv_file, os.path.basename(csv_file))
                    print(f"Added {csv_file} to zip")
        print(f"Created zip file at {zip_path}")
        return os.path.exists(zip_path)
    except Exception as e:
        print(f"Error creating zip file: {str(e)}")
        return False

In [0]:
def send_email_notification():
    try:
        msg = EmailMessage()
        msg['Subject'] = "CDC Data Export: Pipeline Success"
        msg['From'] = sender_email
        msg['To'] = receiver_email
        msg.set_content(
            "The CDC pipeline completed successfully. "
            "Attached is the zip file containing the latest CDC exports for all tables."
        )

        if os.path.exists(zip_path):
            with open(zip_path, "rb") as file:
                file_data = file.read()
                file_name = os.path.basename(zip_path)
                msg.add_attachment(
                    file_data,
                    maintype="application",
                    subtype="octet-stream",
                    filename=file_name
                )
        else:
            print("Zip file not found, sending email without attachment")
            msg.set_content("The CDC pipeline completed successfully, but no new data was found.")

        with smtplib.SMTP_SSL(smtp_server, smtp_port) as smtp:
            smtp.login(sender_email, app_password)
            smtp.send_message(msg)
        print("✅ Email sent successfully!")

    except Exception as e:
        print(f"Error sending email: {str(e)}")

In [0]:
if __name__ == "__main__":
    try:
        download_jdbc_driver()
        process_cdc_tables()
        if create_zip_file():
            send_email_notification()
        else:
            print("No new data to email")
    except Exception as e:
        print(f"Pipeline failed: {str(e)}")

JDBC driver check: Found
Saved CDC data for Customer to /mnt/datalake/cdc_output/Customer
Saved CSV for Customer to /dbfs/tmp/cdc_exports/Customer.csv
Saved CDC data for Product to /mnt/datalake/cdc_output/Product
Saved CSV for Product to /dbfs/tmp/cdc_exports/Product.csv
Saved CDC data for Order to /mnt/datalake/cdc_output/Order
Saved CSV for Order to /dbfs/tmp/cdc_exports/Order.csv
Saved CDC data for Inventory to /mnt/datalake/cdc_output/Inventory
Saved CSV for Inventory to /dbfs/tmp/cdc_exports/Inventory.csv
Created zip file at /dbfs/tmp/cdc_exports/changed_data.zip
✅ Email sent successfully!
