FULL STRUCTURED ETL PIPELINE (Databricks Free Compatible)

üîπ STEP 1 ‚Äî Initialize Environment

In [None]:

# ================================
# STEP 1: Initialize Environment
# ================================
import os
import shutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    DoubleType
)

# Start Spark Session
spark = SparkSession.builder.appName("RetailSalesPipeline").getOrCreate()   # type: ignore


# Unity Catalog Volume Info
CATALOG = "workspace"
SCHEMA = "default"
VOLUME = "my_etl_data"
VOLUME_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}"


# Create Volume (if not exists)
spark.sql(f"""CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.{VOLUME}""")

print("‚úÖ Storage Volume Ready at:")
print(VOLUME_PATH)

üîπ STEP 2 ‚Äî Data Bridge (Extract ‚Üí Volume)

In [None]:
# ================================
# STEP 2: Data Bridge (Extract)
# ================================

# Current working directory (inside Databricks job)
current_dir = os.getcwd()


# Expected bundle path
local_sync_path = os.path.join(
    os.path.dirname(current_dir),
    "datasets",
    "2015-summary.csv"
)


# Target Volume Path
target_volume_file = f"{VOLUME_PATH}/2015-summary.csv"


print("üìÇ Source File:", local_sync_path)
print("üì¶ Target File:", target_volume_file)


try:

    # Create folder if missing
    os.makedirs(os.path.dirname(target_volume_file), exist_ok=True)

    # Copy file to Volume
    shutil.copy(local_sync_path, target_volume_file)

    print("‚úÖ File copied to Volume successfully")

except Exception as e:

    print("‚ö†Ô∏è Auto copy failed.")
    print("üëâ Please upload file manually to Volume/input")
    print("Error:", e)


üîπ STEP 3 ‚Äî Read & Transform (Cleanse)

In [None]:
# ================================
# STEP 3: Read & Transform
# ================================


# Read CSV from Volume
raw_sales_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(target_volume_file)


print("üìã Raw Data Preview:")
raw_sales_df.show(5)


# Data Cleansing
cleansed_sales_df = raw_sales_df \
    .withColumn("ingestion_date", lit("2026-02-10")) \
    .filter(col("TOTAL_COUNT").isNotNull())


print("‚ú® Cleansing Completed")

üîπ STEP 4 ‚Äî Load (Silver Layer)

In [None]:
# ================================
# STEP 4: Load (Silver Layer)
# ================================


# Output Path
output_path = f"{VOLUME_PATH}/silver_sales_data"


try:

    # Write Parquet
    cleansed_sales_df \
        .write \
        .mode("overwrite") \
        .parquet(output_path)


    print("üéâ JOB SUCCESS")
    print("Saved at:", output_path)


    # Verify
    print("üìà Verification Preview:")

    spark.read \
        .parquet(output_path) \
        .select(
            "COUNTRY_1",
            "TOTAL_COUNT",
            "ingestion_date"
        ) \
        .show(5)


except Exception as e:

    print("‚ùå Load Failed")
    print("Error:", e)
