In [0]:
# Install the faker library
%pip install faker

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from delta.tables import DeltaTable
from faker import Faker
import pandas as pd

# --- Configuration ---
# Define the path for your Delta table in the Databricks File System (DBFS)
delta_table_path = "/tmp/delta/user_data"

# --- 1. Initialize Spark Session with Time Zone ---
# Setting the time zone is crucial for timestamp operations later
spark = SparkSession.builder \
    .appName("DeltaDataPipeline") \
    .config("spark.sql.session.timeZone", "Asia/Kolkata") \
    .getOrCreate()

# --- 2. Function to Generate Fake Data ---
fake = Faker()
def generate_fake_data(num_rows):
    """Generates a list of fake user data."""
    data = [{
        "Name": fake.name(),
        "Address": fake.address().replace("\n", ", "),
        "Email": fake.email()
    } for _ in range(num_rows)]
    return data

# --- 3. Create and Write Initial Data (First Run) ---
print("--- Creating initial table with 5 rows ---")
initial_data = generate_fake_data(5)
initial_df = spark.createDataFrame(initial_data)

# Write data to Delta Lake format, overwriting if it already exists for a clean start
initial_df.write.format("delta").mode("overwrite").save(delta_table_path)

print("Initial Delta table created.")
# Read and display the full table
initial_table = spark.read.format("delta").load(delta_table_path)
initial_table.show(truncate=False)

# --- 4. Append New Data using DeltaTable API ---
print("\n--- Appending 3 new rows ---")
new_data = generate_fake_data(3)
new_df = spark.createDataFrame(new_data)

# Use the DeltaTable API to append
delta_table = DeltaTable.forPath(spark, delta_table_path)
new_df.write.format('delta').mode('append').save(delta_table_path)

print("Append operation complete.")
# Retrieve and display the latest version of the table
latest_table = spark.read.format("delta").load(delta_table_path)
print(f"Total rows after append: {latest_table.count()}")
latest_table.show(truncate=False)

--- Creating initial table with 5 rows ---
Initial Delta table created.
+----------------------------------------------------------+------------------------+--------------+
|Address                                                   |Email                   |Name          |
+----------------------------------------------------------+------------------------+--------------+
|354 Ramirez Shoals Suite 082, Christopherborough, SC 30084|bcooley@example.net     |Darren Serrano|
|22926 Marie Plains, East Scottburgh, NV 12714             |larsonjanice@example.net|Pamela Johnson|
|063 Sarah Union Apt. 089, Port George, CO 92465           |joshua97@example.com    |Sandy Ortega  |
|236 Brian Wells Apt. 554, Jennifermouth, GA 82695         |michael24@example.org   |Dennis Dalton |
|84342 Williams Throughway Suite 673, Millerstad, DE 35766 |kcolon@example.net      |Crystal Baker |
+----------------------------------------------------------+------------------------+--------------+


--- Appending 3 n

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col
import time

# --- Configuration for this run ---
NUM_NEW_ROWS_TO_ADD = 4

# --- 1. Append a specified number of new rows ---
print(f"--- Appending {NUM_NEW_ROWS_TO_ADD} more rows incrementally ---")
incremental_data = generate_fake_data(NUM_NEW_ROWS_TO_ADD)
incremental_df = spark.createDataFrame(incremental_data)

delta_table = DeltaTable.forPath(spark, delta_table_path)
incremental_df.write.format('delta').mode('append').save(delta_table_path)

print("Incremental append complete.")
latest_table = spark.read.format("delta").load(delta_table_path)
print(f"Total rows now: {latest_table.count()}")
latest_table.show(truncate=False)

# --- 2. Track and Display Table Versions (History) ---
print("\n--- Displaying Delta Table History ---")
delta_table = DeltaTable.forPath(spark, delta_table_path)
history_df = delta_table.history()

# Display the full history, showing each operation (WRITE, MERGE, etc.)
history_df.select("version", "timestamp", "operation", "operationParameters").show(truncate=False)

# --- 3. Retrieve Data from a Previous Version ---
# Let's retrieve the first version (v0) of the table
print("\n--- Retrieving data from Version 0 ---")
try:
    version_zero_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
    version_zero_df.show(truncate=False)
except Exception as e:
    print(f"Could not retrieve version 0. Error: {e}")

# --- 4. Retrieve Data using a Timestamp ---
# Get the timestamp of version 1 to query it
try:
    timestamp_v1 = history_df.filter(col("version") == 1).select("timestamp").first()[0]
    timestamp_str = timestamp_v1.strftime("%Y-%m-%d %H:%M:%S")
    
    print(f"\n--- Retrieving data from timestamp '{timestamp_str}' (Version 1) ---")
    timestamp_df = spark.read.format("delta").option("timestampAsOf", timestamp_str).load(delta_table_path)
    timestamp_df.show(truncate=False)
except Exception as e:
    print(f"Could not retrieve by timestamp. Maybe version 1 doesn't exist? Error: {e}")

--- Appending 4 more rows incrementally ---
Incremental append complete.
Total rows now: 12
+------------------------------------------------------------+---------------------------+-------------------+
|Address                                                     |Email                      |Name               |
+------------------------------------------------------------+---------------------------+-------------------+
|354 Ramirez Shoals Suite 082, Christopherborough, SC 30084  |bcooley@example.net        |Darren Serrano     |
|22926 Marie Plains, East Scottburgh, NV 12714               |larsonjanice@example.net   |Pamela Johnson     |
|063 Sarah Union Apt. 089, Port George, CO 92465             |joshua97@example.com       |Sandy Ortega       |
|236 Brian Wells Apt. 554, Jennifermouth, GA 82695           |michael24@example.org      |Dennis Dalton      |
|84342 Williams Throughway Suite 673, Millerstad, DE 35766   |kcolon@example.net         |Crystal Baker      |
|95390 Mosley Estate

In [0]:
# Import all necessary libraries
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from faker import Faker
import pandas as pd
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# --- 1. Configuration ---
delta_table_path = "/tmp/delta/user_data"
NUM_ROWS_TO_APPEND = 5  # Number of new rows to add in each run
SENDER_EMAIL = "sahilsrivastava773@gmail.com"
RECIPIENT_EMAIL = "sahilsriv773@gmail.com"

# --- 2. Initialize Spark Session ---
spark = SparkSession.builder \
    .appName("ScheduledDeltaPipeline") \
    .config("spark.sql.session.timeZone", "Asia/Kolkata") \
    .getOrCreate()

# --- 3. Define Helper Functions ---
fake = Faker()
def generate_fake_data(num_rows):
    """Generates a list of fake user data."""
    return [{
        "Name": fake.name(),
        "Address": fake.address().replace("\n", ", "),
        "Email": fake.email()
    } for _ in range(num_rows)]

def send_summary_email(summary_html, rows_appended):
    """Sends an email notification with a nicely designed HTML summary."""
    try:
        # Retrieve the stored app password from Databricks secrets
        app_password = dbutils.secrets.get(scope="email_creds", key="app-password")
        
        # Email content
        subject = f"Data Pipeline Success: {rows_appended} New Rows Appended"
        message = MIMEMultipart("alternative")
        message["Subject"] = subject
        message["From"] = SENDER_EMAIL
        message["To"] = RECIPIENT_EMAIL
        
        # --- New HTML Template with CSS ---
        body = f"""
        <html>
        <head>
        <style>
            body {{
                font-family: Arial, sans-serif;
                background-color: #f4f4f9;
                margin: 0;
                padding: 0;
            }}
            .container {{
                max-width: 700px;
                margin: 20px auto;
                background-color: #ffffff;
                border-radius: 8px;
                box-shadow: 0 2px 4px rgba(0,0,0,0.1);
                overflow: hidden;
            }}
            .header {{
                background-color: #4a90e2;
                color: #ffffff;
                padding: 20px;
                text-align: center;
            }}
            .header h2 {{
                margin: 0;
            }}
            .content {{
                padding: 30px;
                line-height: 1.6;
                color: #333333;
            }}
            .content p {{
                margin: 0 0 15px 0;
            }}
            .data-table {{
                width: 100%;
                border-collapse: collapse;
                margin-top: 20px;
            }}
            .data-table th, .data-table td {{
                border: 1px solid #dddddd;
                padding: 12px;
                text-align: left;
            }}
            .data-table th {{
                background-color: #f2f2f2;
                font-weight: bold;
            }}
            .footer {{
                background-color: #f4f4f9;
                color: #888888;
                font-size: 12px;
                text-align: center;
                padding: 20px;
            }}
        </style>
        </head>
        <body>
            <div class="container">
                <div class="header">
                    <h2>Data Ingestion Summary</h2>
                </div>
                <div class="content">
                    <p>The pipeline ran successfully and appended <strong>{rows_appended}</strong> new rows to the Delta table.</p>
                    <h3>Appended Data:</h3>
                    {summary_html}
                </div>
                <div class="footer">
                    <p>This is an automated notification from your Azure Databricks pipeline.</p>
                </div>
            </div>
        </body>
        </html>
        """
        message.attach(MIMEText(body, "html"))

        # Connect to Gmail's SMTP server and send the email
        with smtplib.SMTP("smtp.gmail.com", 587) as server:
            server.starttls()
            server.login(SENDER_EMAIL, app_password)
            server.sendmail(SENDER_EMAIL, RECIPIENT_EMAIL, message.as_string())
        print("Email notification sent successfully.")
        
    except Exception as e:
        print(f"Error sending email: {e}")

# --- 4. Main Pipeline Logic ---
print(f"Starting pipeline run. Appending {NUM_ROWS_TO_APPEND} new rows.")
new_data = generate_fake_data(NUM_ROWS_TO_APPEND)

if not new_data:
    print("No new data to append. Exiting.")
else:
    new_df = spark.createDataFrame(pd.DataFrame(new_data))
    
    # Append the new data
    new_df.write.format("delta").mode("append").save(delta_table_path)
    
    print(f"Successfully appended {len(new_data)} rows.")
    
    # --- 5. Prepare and Send Notification ---
    # Convert the newly appended data to an HTML table for the email
    summary_html = pd.DataFrame(new_data).to_html(index=False, classes='data-table')
    
    # Send the email
    send_summary_email(summary_html, len(new_data))

print("Pipeline execution finished.")

Starting pipeline run. Appending 5 new rows.
Successfully appended 5 rows.
Email notification sent successfully.
Pipeline execution finished.
