In [0]:
dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()


In [0]:
%run "./000_setup_storage"

In [0]:
# Source file from GitHub (raw CSV)
orders_csv_url = "https://raw.githubusercontent.com/abhishektripathi27/databricks-etl-pipeline/main/data/orders.csv"

# Raw landing path in ADLS
raw_orders_path = f"{raw_path}/orders/orders.csv"

# Bronze Delta table path in ADLS
bronze_orders_path = f"{bronze_path}/orders"


In [0]:
import requests

response = requests.get(orders_csv_url)

if response.status_code != 200:
    raise Exception(f"Failed to download file. Status code: {response.status_code}")

dbutils.fs.put(raw_orders_path, response.text, overwrite=True)

print("orders.csv uploaded to RAW zone:", raw_orders_path)


In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS bronze_orders
USING DELTA
LOCATION '{bronze_orders_path}'
""")

print("✅ Registered table: bronze_orders")


In [0]:
bronze_df = spark.read.format("delta").load(bronze_orders_path)

print("Bronze count:", bronze_df.count())
display(bronze_df)


In [0]:
bronze_table_path = bronze_path + "orders/"
bronze_orders = spark.read.format("delta").load(bronze_table_path)


In [0]:
# # Databricks notebook source
# # -----------------------------
# # STEP 0: Absolute ABFS paths
# raw_path        = "abfss://datalake@etldatalakeabhi.dfs.core.windows.net/raw/"
# bronze_path     = "abfss://datalake@etldatalakeabhi.dfs.core.windows.net/bronze/"
# checkpoint_path = "abfss://datalake@etldatalakeabhi.dfs.core.windows.net/checkpoint/"

# # Optional: Test access
# display(dbutils.fs.ls(raw_path))


In [0]:
# Read from GitHub
import pandas as pd
from pyspark.sql.functions import current_timestamp

pdf = pd.read_csv(github_url)
bronze_df = spark.createDataFrame(pdf).withColumn("ingestion_timestamp", current_timestamp())

# Bad/Good rows separation using min_order_amount and cancelled_status
valid_condition = (
    (col("order_id").isNotNull()) &
    (col("status").isNotNull()) &
    ((col("amount").isNotNull()) | (col("status") == cancelled_status))
)


In [0]:
# Databricks notebook source
# -----------------------------
# STEP 2: Separate good vs bad rows
from pyspark.sql.functions import col, when

# Conditional validation
valid_condition = (
    (col("order_id").isNotNull()) &
    (col("status").isNotNull()) &
    ((col("amount").isNotNull()) | (col("status") == "cancelled"))
)

# Good rows → Bronze
good_bronze = bronze_df.filter(valid_condition)

good_bronze.show()

# Bad rows → RAW quarantine
bad_rows = bronze_df.filter(~valid_condition) \
    .withColumn("bad_data_timestamp", current_timestamp()) \
    .withColumn(
        "error_reason",
        when(col("order_id").isNull(), "Missing order_id")
        .when(col("status").isNull(), "Missing status")
        .when((col("amount").isNull()) & (col("status") != "cancelled"), "Amount missing for non-cancelled order")
        .otherwise("Unknown error")
    )

bad_rows.show()

In [0]:
# Databricks notebook source
# -----------------------------
# STEP 3: Write bad rows to RAW quarantine folder
bad_rows.write.format("delta").mode("append").save(raw_path + "bad_orders/")

spark.sql("""
CREATE TABLE IF NOT EXISTS ws_databricks_etl.bronze.bad_orders
USING DELTA
LOCATION 'abfss://datalake@etldatalakeabhi.dfs.core.windows.net/raw/bad_orders/'
""")


Optional: Register in Unity Catalog

In [0]:
# Databricks notebook source
# -----------------------------
# STEP 4: Upsert good rows into Bronze Delta
from delta.tables import DeltaTable

bronze_table_path = bronze_path + "orders/"

# Create table if doesn't exist
if not DeltaTable.isDeltaTable(spark, bronze_table_path):
    good_bronze.write.format("delta").mode("overwrite").save(bronze_table_path)

# Merge / upsert
bronze_table = DeltaTable.forPath(spark, bronze_table_path)
bronze_table.alias("bronze").merge(
    good_bronze.alias("raw"),
    "bronze.order_id = raw.order_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Register table in Unity Catalog
spark.sql(f"""
CREATE TABLE IF NOT EXISTS ws_databricks_etl.bronze.orders
USING DELTA
LOCATION '{bronze_table_path}'
""")


In [0]:
from pyspark.sql.functions import current_timestamp

# Read CSV from raw folder
orders_df = (
    spark.read.format("delta")
    .option("header", True)
    .option("inferSchema", True)
    .load(raw_path + "orders/orders.csv")
    .withColumn("ingestion_timestamp", current_timestamp())
)

# Show first few rows
orders_df.display()

# Write to Bronze Delta
orders_df.write.format("delta").mode("overwrite").save(bronze_path + "orders")

# Register in Unity Catalog
spark.sql(f"""
CREATE TABLE IF NOT EXISTS ws_databricks_etl.default.bronze_orders
USING DELTA
LOCATION '{bronze_path}orders'
""")

print("Bronze Orders table created successfully!")


In [0]:
%sql
-- select * from ws_databricks_etl.bronze.bad_orders
select * from ws_databricks_etl.bronze.orders