# Step 1: Load the Data into the Data Lake(Bronze Layer)

In [19]:
# Import required libraries
from pyspark.sql import SparkSession

# Create a Spark session if not already available
spark = SparkSession.builder.appName("ReadFromOneLake").getOrCreate()

# File path in OneLake (ABFSS protocol)
file_path = "abfss://Sample_Medallion@onelake.dfs.fabric.microsoft.com/BronzeLayer.Lakehouse/Files/order_list-1.csv"

# Read the CSV file into a DataFrame using Spark
df_bronze = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df_bronze.show()


StatementMeta(, 7f759a30-dfad-46e1-860b-ff828fc1706a, 21, Finished, Available, Finished)

+-------------+--------+--------------------+---------+----------+--------+------------+----------+
|     order_no|customer|               email|productId|unit price|quantity|total amount|      date|
+-------------+--------+--------------------+---------+----------+--------+------------+----------+
|2020090110031|    Anna|anna_sample@sampl...|   S10001|       9.0|       8|        72.0|01/09/2020|
|2020090211039|Beatrice|beatrice_sample@s...|   S10002|      20.9|       2|        41.8|02/09/2020|
|2020080651022|    Anna|anna_sample@sampl...|   S10003|      15.0|       1|        15.0|06/08/2020|
|2020090691035|    Anna|anna_sample@sampl...|   S10004|      11.0|       2|        22.0|06/09/2020|
|2020070766036|   Cindy|cindy_sample@samp...|   S10007|      12.4|      10|       124.0|07/07/2020|
+-------------+--------+--------------------+---------+----------+--------+------------+----------+



# Step 2: Deduplication and Transformation (Silver Layer)</mark>

In [20]:
from pyspark.sql.functions import col, to_date

# Silver layer paths
silver_layer_path = "abfss://Sample_Medallion@onelake.dfs.fabric.microsoft.com/SilverLayer.Lakehouse/Tables"

orders_table_path = f"{silver_layer_path}/Orders"
customers_table_path = f"{silver_layer_path}/Customers"
products_table_path = f"{silver_layer_path}/Products"
order_details_table_path = f"{silver_layer_path}/OrderDetails"

# Step 1: Convert date column to proper format
df_bronze = df_bronze.withColumn("date", to_date(col("date"), "dd/MM/yyyy"))

# Step 2: Create Orders DataFrame
df_orders = df_bronze.select(
    col("order_no").alias("OrderID"),
    col("date").alias("OrderDate"),
    col("email").alias("Email")
).distinct()

# Step 3: Create Customers DataFrame
df_customers = df_bronze.select(
    col("email").alias("Email"),
    col("customer").alias("CustomerName")
).distinct()

# Step 4: Create Products DataFrame
df_products = df_bronze.select(
    col("productId").alias("ProductID"),
    col("unit price").alias("UnitPrice")
).distinct()

# Step 5: Create OrderDetails (Staging Table) DataFrame
df_order_details = df_bronze.select(
    col("order_no").alias("OrderID"),
    col("productId").alias("ProductID"),
    col("total amount").alias("TotalAmount"),
    col("quantity").alias("Quantity")
).distinct()

# Step 6: Save DataFrames as Delta tables in the Lakehouse
df_orders.write.format("delta").mode("overwrite").save(orders_table_path)
df_customers.write.format("delta").mode("overwrite").save(customers_table_path)
df_products.write.format("delta").mode("overwrite").save(products_table_path)
df_order_details.write.format("delta").mode("overwrite").save(order_details_table_path)


StatementMeta(, 7f759a30-dfad-46e1-860b-ff828fc1706a, 22, Finished, Available, Finished)

In [21]:
# Load and verify Orders table
df_orders_read = spark.read.format("delta").load(orders_table_path)
df_orders_read.show(10)

# Load and verify Customers table
df_customers_read = spark.read.format("delta").load(customers_table_path)
df_customers_read.show(10)

# Load and verify Products table
df_products_read = spark.read.format("delta").load(products_table_path)
df_products_read.show(10)

# Load and verify OrderDetails table
df_order_details_read = spark.read.format("delta").load(order_details_table_path)
df_order_details_read.show(10)


StatementMeta(, 7f759a30-dfad-46e1-860b-ff828fc1706a, 23, Finished, Available, Finished)

+-------------+----------+--------------------+
|      OrderID| OrderDate|               Email|
+-------------+----------+--------------------+
|2020090691035|2020-09-06|anna_sample@sampl...|
|2020090110031|2020-09-01|anna_sample@sampl...|
|2020070766036|2020-07-07|cindy_sample@samp...|
|2020090211039|2020-09-02|beatrice_sample@s...|
|2020080651022|2020-08-06|anna_sample@sampl...|
+-------------+----------+--------------------+

+--------------------+------------+
|               Email|CustomerName|
+--------------------+------------+
|cindy_sample@samp...|       Cindy|
|beatrice_sample@s...|    Beatrice|
|anna_sample@sampl...|        Anna|
+--------------------+------------+

+---------+---------+
|ProductID|UnitPrice|
+---------+---------+
|   S10003|     15.0|
|   S10001|      9.0|
|   S10007|     12.4|
|   S10002|     20.9|
|   S10004|     11.0|
+---------+---------+

+-------------+---------+-----------+--------+
|      OrderID|ProductID|TotalAmount|Quantity|
+-------------+------

# Step 3: Data Modeling for Analytics (Gold Layer)

In [22]:
from pyspark.sql.functions import col, year, month, dayofmonth, weekofyear, date_format, lpad
from pyspark.sql.types import DateType
import datetime

# Lakehouse paths
gold_layer_path = "abfss://Sample_Medallion@onelake.dfs.fabric.microsoft.com/GoldLayer.Lakehouse/Tables"
dim_date_path = f"{gold_layer_path}/DimDate"

# Load existing DimDate table if it exists
try:
    existing_dim_date = spark.read.format("delta").load(dim_date_path)
    max_existing_date = existing_dim_date.agg({"Date": "max"}).collect()[0][0]
    if max_existing_date:
        start_date = max_existing_date + datetime.timedelta(days=1)
    else:
        raise Exception("DimDate table exists but has no rows.")
except:
    # If table does not exist or is empty, calculate from Orders table
    earliest_date = df_orders.agg({"OrderDate": "min"}).collect()[0][0]
    if not isinstance(earliest_date, datetime.date):
        earliest_date = earliest_date.date()
    start_date = earliest_date

# Define end date (3 years forward from start date)
end_date = start_date + datetime.timedelta(days=3 * 366)  # Leap year-safe

# Generate new dates for the range
date_range = [(start_date + datetime.timedelta(days=i)) for i in range((end_date - start_date).days + 1)]

# Create a new DataFrame
new_dim_date = spark.createDataFrame([(d,) for d in date_range], ["Date"]).withColumn("Date", col("Date").cast(DateType()))

# Add additional date attributes, including DateKey
new_dim_date = new_dim_date.withColumn("DateKey", date_format(col("Date"), "yyyyMMdd")) \
                           .withColumn("Year", year(col("Date"))) \
                           .withColumn("Month", month(col("Date"))) \
                           .withColumn("Day", dayofmonth(col("Date"))) \
                           .withColumn("WeekOfYear", weekofyear(col("Date"))) \
                           .withColumn("DayName", date_format(col("Date"), "EEEE")) \
                           .withColumn("MonthName", date_format(col("Date"), "MMMM"))

# Merge new dates with existing DimDate (if applicable)
if 'existing_dim_date' in locals():
    dim_date = existing_dim_date.union(new_dim_date).distinct()
else:
    dim_date = new_dim_date

# Save the updated DimDate table
dim_date.write.format("delta").mode("overwrite").save(dim_date_path)

print("DimDate table successfully created/updated and registered!")


StatementMeta(, 7f759a30-dfad-46e1-860b-ff828fc1706a, 24, Finished, Available, Finished)

DimDate table successfully created/updated and registered!


In [23]:
# Load and verify OrderDetails table
dim_date_read = spark.read.format("delta").load(dim_date_path)
dim_date_read.show(10)

StatementMeta(, 7f759a30-dfad-46e1-860b-ff828fc1706a, 25, Finished, Available, Finished)

+----------+--------+----+-----+---+----------+---------+---------+
|      Date| DateKey|Year|Month|Day|WeekOfYear|  DayName|MonthName|
+----------+--------+----+-----+---+----------+---------+---------+
|2020-08-16|20200816|2020|    8| 16|        33|   Sunday|   August|
|2020-12-20|20201220|2020|   12| 20|        51|   Sunday| December|
|2020-12-21|20201221|2020|   12| 21|        52|   Monday| December|
|2020-08-05|20200805|2020|    8|  5|        32|Wednesday|   August|
|2020-08-21|20200821|2020|    8| 21|        34|   Friday|   August|
|2020-08-18|20200818|2020|    8| 18|        34|  Tuesday|   August|
|2020-11-27|20201127|2020|   11| 27|        48|   Friday| November|
|2020-12-04|20201204|2020|   12|  4|        49|   Friday| December|
|2020-12-25|20201225|2020|   12| 25|        52|   Friday| December|
|2020-11-17|20201117|2020|   11| 17|        47|  Tuesday| November|
+----------+--------+----+-----+---+----------+---------+---------+
only showing top 10 rows



In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("StarSchema").getOrCreate()

# Define paths to the Silver and Gold Layer
silver_layer_base_path = "abfss://Sample_Medallion@onelake.dfs.fabric.microsoft.com/SilverLayer.Lakehouse/Tables/"
gold_layer_base_path = "abfss://Sample_Medallion@onelake.dfs.fabric.microsoft.com/GoldLayer.Lakehouse/Tables/"

# Load the Silver Layer tables (Customers, Products, Orders, OrderDetails)
customers_df = spark.read.format("delta").load(f"{silver_layer_base_path}Customers")
products_df = spark.read.format("delta").load(f"{silver_layer_base_path}Products")
orders_df = spark.read.format("delta").load(f"{silver_layer_base_path}Orders")
orders_details_df = spark.read.format("delta").load(f"{silver_layer_base_path}OrderDetails")

# Load the DimDate table from the Gold Layer
dim_date_df = spark.read.format("delta").load(f"{gold_layer_base_path}DimDate")

# Create DimCustomer table using 'Email' as the natural key
dim_customer = customers_df.select(
    col("Email").alias("CustomerEmail"),
    col("CustomerName")
)

# Create DimProduct table
dim_product = products_df.select(
    col("ProductID").alias("ProductID"),
    col("UnitPrice").alias("UnitPrice")
)

# Create FactSales table by joining Orders with Customers, Products, Orders, Orders_details
fact_sales = orders_df.join(customers_df, orders_df.Email == customers_df.Email, "inner") \
    .join(orders_details_df, orders_df.OrderID == orders_details_df.OrderID, "inner") \
    .join(products_df, products_df.ProductID == orders_details_df.ProductID, "inner") \
    .select(
        orders_df.OrderID,
        customers_df.Email.alias("CustomerEmail"),
        products_df.ProductID,
        orders_df.OrderDate,
        orders_details_df.TotalAmount,
        orders_details_df.Quantity
    )

# Define paths to save the tables
fact_sales_path = "abfss://Sample_Medallion@onelake.dfs.fabric.microsoft.com/GoldLayer.Lakehouse/Tables/FactSales"
dim_customer_path = "abfss://Sample_Medallion@onelake.dfs.fabric.microsoft.com/GoldLayer.Lakehouse/Tables/DimCustomer"
dim_product_path = "abfss://Sample_Medallion@onelake.dfs.fabric.microsoft.com/GoldLayer.Lakehouse/Tables/DimProduct"

# Write FactSales and Dim tables to Delta format
fact_sales.write.format("delta").mode("overwrite").save(fact_sales_path)
dim_customer.write.format("delta").mode("overwrite").save(dim_customer_path)
dim_product.write.format("delta").mode("overwrite").save(dim_product_path)

# Output success message
print("Star Schema tables (FactSales, DimCustomer, DimProduct) successfully created!")


StatementMeta(, 7f759a30-dfad-46e1-860b-ff828fc1706a, 26, Finished, Available, Finished)

Star Schema tables (FactSales, DimCustomer, DimProduct) successfully created!


In [25]:
# Load and verify Orders table
df_orders_read = spark.read.format("delta").load(fact_sales_path)
df_orders_read.show(10)

# Load and verify Customers table
df_customers_read = spark.read.format("delta").load(dim_customer_path)
df_customers_read.show(10)

# Load and verify Products table
df_products_read = spark.read.format("delta").load(dim_product_path)
df_products_read.show(10)

# Load and verify OrderDetails table
df_order_details_read = spark.read.format("delta").load(dim_date_path)
df_order_details_read.show(10)


StatementMeta(, 7f759a30-dfad-46e1-860b-ff828fc1706a, 27, Finished, Available, Finished)

+-------------+--------------------+---------+----------+-----------+--------+
|      OrderID|       CustomerEmail|ProductID| OrderDate|TotalAmount|Quantity|
+-------------+--------------------+---------+----------+-----------+--------+
|2020090691035|anna_sample@sampl...|   S10004|2020-09-06|       22.0|       2|
|2020090110031|anna_sample@sampl...|   S10001|2020-09-01|       72.0|       8|
|2020070766036|cindy_sample@samp...|   S10007|2020-07-07|      124.0|      10|
|2020090211039|beatrice_sample@s...|   S10002|2020-09-02|       41.8|       2|
|2020080651022|anna_sample@sampl...|   S10003|2020-08-06|       15.0|       1|
+-------------+--------------------+---------+----------+-----------+--------+

+--------------------+------------+
|       CustomerEmail|CustomerName|
+--------------------+------------+
|cindy_sample@samp...|       Cindy|
|beatrice_sample@s...|    Beatrice|
|anna_sample@sampl...|        Anna|
+--------------------+------------+

+---------+---------+
|ProductID|Un

# Step 4: Send mail to customers

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_trunc, last_day, lit, month, year
from datetime import datetime, timedelta

# Initialize Spark session
spark = SparkSession.builder.appName("CustomerEmailSummary").getOrCreate()

# Define GoldLayer paths
fact_sales_path = "abfss://Sample_Medallion@onelake.dfs.fabric.microsoft.com/GoldLayer.Lakehouse/Tables/FactSales"
dim_customer_path = "abfss://Sample_Medallion@onelake.dfs.fabric.microsoft.com/GoldLayer.Lakehouse/Tables/DimCustomer"

# Load FactSales and DimCustomer tables
fact_sales_df = spark.read.format("delta").load(fact_sales_path)
dim_customer_df = spark.read.format("delta").load(dim_customer_path)


StatementMeta(, 7f759a30-dfad-46e1-860b-ff828fc1706a, 28, Finished, Available, Finished)

In [27]:
# Get the first and last day of the previous month
today = datetime.today()
first_day_last_month = (today.replace(day=1) - timedelta(days=1)).replace(day=1)
last_day_last_month = first_day_last_month.replace(day=1) + timedelta(days=31)
last_day_last_month = last_day_last_month.replace(day=1) - timedelta(days=1)

# Filter FactSales for the previous month
last_month_sales = fact_sales_df.filter(
    (col("OrderDate") >= lit(first_day_last_month)) &
    (col("OrderDate") <= lit(last_day_last_month))
)

StatementMeta(, 7f759a30-dfad-46e1-860b-ff828fc1706a, 29, Finished, Available, Finished)

In [28]:
# Order details: select relevant fields and sort by purchase time
order_details = last_month_sales.select(
    "CustomerEmail", "OrderID", "ProductID", "OrderDate", "TotalAmount", "Quantity"
).orderBy("OrderDate")

StatementMeta(, 7f759a30-dfad-46e1-860b-ff828fc1706a, 30, Finished, Available, Finished)

In [29]:
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pyspark.sql import functions as F
from datetime import datetime, timedelta
import logging

# Configure logging
logging.basicConfig(level=logging.DEBUG)

# Email configuration
sender_email = "testhuman930507@gmail.com"
sender_password = "kqkm najy oajv vblq"
smtp_server = "smtp.gmail.com"
smtp_port = 465

# Determine last month's date range
today = datetime.today()
first_day_this_month = datetime(today.year, today.month, 1)
last_day_last_month = first_day_this_month - timedelta(days=1)
first_day_last_month = datetime(last_day_last_month.year, last_day_last_month.month, 1)

# Set specific testing dates for last month
first_day_last_month = datetime(2020, 9, 1)
last_day_last_month = datetime(2020, 9, 30)

logging.debug(f"Last month: {first_day_last_month} to {last_day_last_month}")

# Filter data for last month
logging.debug("Filtering data for monthly summary and order details...")

monthly_summary = fact_sales_df.filter(
    (F.col("OrderDate") >= first_day_last_month) & 
    (F.col("OrderDate") <= last_day_last_month)
).groupBy("CustomerEmail").agg(
    F.countDistinct("OrderID").alias("NumberOfOrders"),
    F.sum("TotalAmount").alias("TotalAmount")
)

order_details = fact_sales_df.filter(
    (F.col("OrderDate") >= first_day_last_month) & 
    (F.col("OrderDate") <= last_day_last_month)
).select(
    "CustomerEmail", "OrderID", "ProductID", "OrderDate", "Quantity", "TotalAmount"
).orderBy("OrderDate")

logging.info("Preview of monthly summary:")
monthly_summary.show()

logging.info("Preview of order details:")
order_details.show()

logging.info("Preview of customers:")
dim_customer_df.show()

# Iterate through each customer and send email
logging.debug("Starting email loop...")
for customer in dim_customer_df.collect():
    customer_email = customer["CustomerEmail"]
    customer_name = customer["CustomerName"]
    logging.info(f"Processing email for customer: {customer_email} ({customer_name})")

    # Get the customer's summary and orders
    customer_summary = monthly_summary.filter(F.col("CustomerEmail") == customer_email).collect()
    customer_orders = order_details.filter(F.col("CustomerEmail") == customer_email).collect()

    # Skip if no orders in the last month
    if not customer_summary:
        logging.info(f"No orders found for {customer_email} in the last month. Skipping.")
        continue

    # Prepare email content
    summary_text = f"""
    Hi {customer_name},
    
    Here is your monthly summary for {first_day_last_month.strftime('%B %Y')}:
    - Number of Orders: {customer_summary[0]['NumberOfOrders']}
    - Total Amount Spent: ${customer_summary[0]['TotalAmount']:.2f}
    """

    orders_text = "\n".join([
        f"OrderID: {order['OrderID']}, ProductID: {order['ProductID']}, "
        f"Date: {order['OrderDate']}, Quantity: {order['Quantity']}, Amount: ${order['TotalAmount']:.2f}"
        for order in customer_orders
    ])

    email_body = summary_text + "\n\nDetails of your orders:\n" + orders_text

    logging.debug(f"Email body for {customer_email}: {email_body}")

    # Construct and send email
    try:
        message = MIMEMultipart()
        message["From"] = sender_email
        message["To"] = customer_email
        message["Subject"] = "Your Monthly Order Summary"
        message.attach(MIMEText(email_body, "plain"))

        with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:
            server.login(sender_email, sender_password)
            server.sendmail(sender_email, customer_email, message.as_string())
            logging.info(f"Email sent to {customer_email}!")
    except Exception as e:
        logging.error(f"Failed to send email to {customer_email}: {e}")


StatementMeta(, 7f759a30-dfad-46e1-860b-ff828fc1706a, 31, Finished, Available, Finished)

+--------------------+--------------+-----------+
|       CustomerEmail|NumberOfOrders|TotalAmount|
+--------------------+--------------+-----------+
|beatrice_sample@s...|             1|       41.8|
|anna_sample@sampl...|             2|       94.0|
+--------------------+--------------+-----------+

+--------------------+-------------+---------+----------+--------+-----------+
|       CustomerEmail|      OrderID|ProductID| OrderDate|Quantity|TotalAmount|
+--------------------+-------------+---------+----------+--------+-----------+
|anna_sample@sampl...|2020090110031|   S10001|2020-09-01|       8|       72.0|
|beatrice_sample@s...|2020090211039|   S10002|2020-09-02|       2|       41.8|
|anna_sample@sampl...|2020090691035|   S10004|2020-09-06|       2|       22.0|
+--------------------+-------------+---------+----------+--------+-----------+

+--------------------+------------+
|       CustomerEmail|CustomerName|
+--------------------+------------+
|cindy_sample@samp...|       Cindy|
|