In [0]:
dbutils.fs.ls("dbfs:/")


In [0]:
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime
import pytz
from faker import Faker
from apscheduler.schedulers.background import BackgroundScheduler

from pyspark.sql import SparkSession
from delta.tables import DeltaTable


In [0]:
%pip install apscheduler,faker

In [0]:
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime
import pytz
from faker import Faker
from apscheduler.schedulers.background import BackgroundScheduler

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta.tables import DeltaTable

# Configuration
volume_path = os.getenv("VOLUME_PATH", "/Volumes/workspace/default/spark_df")
NUM_ROWS = int(os.getenv("NUM_ROWS", 10))
INTERVAL_MINUTES = int(os.getenv("INTERVAL_MINUTES", 5))
TIMEZONE = os.getenv("TIMEZONE", "UTC")  # e.g., "Asia/Kolkata"

SMTP_HOST = os.getenv("SMTP_HOST", "smtp.example.com")
SMTP_PORT = int(os.getenv("SMTP_PORT", 587))
SMTP_USER = os.getenv("SMTP_USER", "user@example.com")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "password")
EMAIL_FROM = os.getenv("EMAIL_FROM", "pipeline@example.com")
EMAIL_TO = os.getenv("EMAIL_TO", "recipient@example.com")

# Initialize Faker and Spark session
faker = Faker()

def generate_fake_data(n: int):
    """
    Generate n rows of fake data (Name, Address, Email).
    """
    data = [(faker.name(), faker.address().replace("\n", ", "), faker.email()) for _ in range(n)]
    return spark.createDataFrame(data, ["Name", "Address", "Email"])


def append_to_delta(df):
    """
    Append a DataFrame to the Delta table at volume_path, creating it if necessary.
    """
    if DeltaTable.isDeltaTable(spark, volume_path):
        df.write.format("delta").mode("append").save(volume_path)
    else:
        df.write.format("delta").mode("overwrite").save(volume_path)


def get_latest_version():
    """
    Retrieve the latest version and timestamp of the Delta table.
    """
    dt = DeltaTable.forPath(spark, volume_path)
    history_df = dt.history()  # full history as DataFrame
    latest_row = history_df \
        .orderBy(col("timestamp").cast("timestamp").desc()) \
        .limit(1) \
        .collect()[0]
    return latest_row["version"], latest_row["timestamp"]


def read_latest_data():
    """
    Read the full Delta table at its latest version.
    """
    return spark.read.format("delta").load(volume_path)


def send_email_notification(version, timestamp, new_count):
    """
    Send an HTML email summarizing the latest ingestion.
    """
    msg = MIMEMultipart('alternative')
    msg['Subject'] = f"Delta Ingestion Update - Version {version}"
    msg['From'] = EMAIL_FROM
    msg['To'] = EMAIL_TO

    html = f"""
    <html>
      <body>
        <h2>Delta Table Ingestion Summary</h2>
        <p><strong>Version:</strong> {version}</p>
        <p><strong>Timestamp:</strong> {timestamp}</p>
        <p><strong>Rows Appended:</strong> {new_count}</p>
      </body>
    </html>
    """

    part = MIMEText(html, 'html')
    msg.attach(part)

    with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
        server.starttls()
        server.login(SMTP_USER, SMTP_PASSWORD)
        server.sendmail(EMAIL_FROM, EMAIL_TO.split(","), msg.as_string())


def pipeline_job():
    """
    The main pipeline job: generate data, append, track, notify.
    """
    new_df = generate_fake_data(NUM_ROWS)
    append_to_delta(new_df)

    version, ts = get_latest_version()
    appended_count = new_df.count()

    # send_email_notification(version, ts, appended_count)

    print(f"[{datetime.now()}] Appended {appended_count} rows to Delta table at version {version}.")


if __name__ == "__main__":
    pipeline_job()


In [0]:
for i in range(10):
    if __name__ == "__main__":
        pipeline_job()
    print("Running Times",i)
