In [0]:
dbutils.fs.mv("dbfs:/FileStore/tables/store_sales.csv", "dbfs:/FileStore/data_pipeline/raw_data/store_sales.csv")
dbutils.fs.mv("dbfs:/FileStore/tables/products.csv", "dbfs:/FileStore/data_pipeline/raw_data/products.csv")
dbutils.fs.mv("dbfs:/FileStore/tables/stores.csv", "dbfs:/FileStore/data_pipeline/raw_data/stores.csv")

Out[7]: True

In [0]:
from datetime import datetime

def log_message(message):
    log_path = "dbfs:/FileStore/data_pipeline/logs/log.txt"
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    full_msg = f"[{timestamp}] {message}\n"
    dbutils.fs.put(log_path, full_msg, overwrite=False)

In [0]:
from datetime import datetime

def log_message(message):
    log_path = "dbfs:/FileStore/data_pipeline/logs/log.txt"
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] {message}\n"
    
    try:
        # Try to read existing content
        existing = dbutils.fs.head(log_path, 1000000)
    except:
        existing = ""
    
    # Append new log entry
    dbutils.fs.put(log_path, existing + log_entry, overwrite=True)

In [0]:
from pyspark.sql.functions import to_date, col, lower, trim, expr

try:
    log_message("Transformation started.")
    
    # Clean
    sales_df = sales_df.withColumn("sale_date", to_date(col("sale_date")))
    sales_df = sales_df.withColumn("quantity", col("quantity").cast("int"))
    sales_df = sales_df.withColumn("unit_price", col("unit_price").cast("double"))
    sales_df = sales_df.withColumn("discount", col("discount").cast("double"))
    products_df = products_df.withColumn("cost_price", col("cost_price").cast("double"))
    products_df = products_df.withColumn("category", lower(trim(col("category"))))
    stores_df = stores_df.withColumn("region", lower(trim(col("region"))))
    
    # Join
    joined_df = sales_df.join(products_df, "product_id", "left") \
                        .join(stores_df, "store_id", "left")
    
    # Derived columns
    final_df = joined_df.withColumn("gross_amount", expr("quantity * unit_price")) \
        .withColumn("net_amount", expr("gross_amount - discount")) \
        .withColumn("profit", expr("net_amount - (quantity * cost_price)"))
    
    log_message("Transformation completed successfully.")
except Exception as e:
    log_message(f"Transformation failed: {e}")

Wrote 489 bytes.
Wrote 550 bytes.


In [0]:
try:
    log_message("Storage started.")
    output_path = "dbfs:/FileStore/data_pipeline/clean_data/cleaned_sales"
    final_df.coalesce(1).write.mode("overwrite").option("header", True).csv(output_path)
    log_message("Storage completed successfully.")
except Exception as e:
    log_message(f"Storage failed: {e}")

Wrote 589 bytes.
Wrote 643 bytes.


In [0]:
display(dbutils.fs.head("dbfs:/FileStore/data_pipeline/logs/log.txt", 500))

[Truncated to first 500 bytes]
'[2025-11-10 13:59:11] Ingestion started.\n[2025-11-10 14:00:37] Transformation started.\n[2025-11-10 14:00:37] Transformation completed successfully.\n[2025-11-10 14:00:41] Storage started.\n[2025-11-10 14:00:44] Storage completed successfully.\n[2025-11-10 14:02:18] Transformation started.\n[2025-11-10 14:02:19] Transformation completed successfully.\n\n[2025-11-10 14:02:22] Storage started.\n[2025-11-10 14:02:25] Storage completed successfully.\n\n[2025-11-10 14:03:13] Transformation started.\n[2025-11-10'