In [0]:
# --- ADLS OAuth via Databricks secrets (scope: adls-oauth) ---
ACCOUNT = "sttaxistorage"  # your storage

spark.conf.set(f"fs.azure.account.auth.type.{ACCOUNT}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{ACCOUNT}.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")

spark.conf.set(f"fs.azure.account.oauth2.client.id.{ACCOUNT}.dfs.core.windows.net",
               dbutils.secrets.get("adls-oauth", "app-id"))
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{ACCOUNT}.dfs.core.windows.net",
               dbutils.secrets.get("adls-oauth", "app-secret"))
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{ACCOUNT}.dfs.core.windows.net",
               f"https://login.microsoftonline.com/{dbutils.secrets.get('adls-oauth','tenant-id')}/oauth2/token")

STORAGE   = ACCOUNT
CONTAINER = "taxi"
RAW_DIR   = f"abfss://{CONTAINER}@{STORAGE}.dfs.core.windows.net/raw/"
DELTA_BASE= f"abfss://{CONTAINER}@{STORAGE}.dfs.core.windows.net/delta/"


In [0]:
# ✅ Validation cell — run AFTER your secrets-based Spark config cell
# Checks: RAW file exists/readable, writes BRONZE/SILVER Delta, prints a PASS/FAIL summary.

from pyspark.sql import functions as F

ACCOUNT = "sttaxistorage"
CONTAINER = "taxi"
RAW_DIR    = f"abfss://{CONTAINER}@{ACCOUNT}.dfs.core.windows.net/raw/"
DELTA_BASE = f"abfss://{CONTAINER}@{ACCOUNT}.dfs.core.windows.net/delta/"
bronze_path = DELTA_BASE + "bronze/taxis"
silver_path = DELTA_BASE + "silver/taxis"

def _exists(path: str) -> bool:
    try:
        dbutils.fs.ls(path)
        return True
    except Exception:
        return False

def _list_names(path: str):
    try:
        return [f.name for f in dbutils.fs.ls(path)]
    except Exception:
        return []

# ---- RAW checks ----
raw_files = [n for n in _list_names(RAW_DIR) if n.startswith("taxis_") and n.endswith(".csv")]
raw_ok = len(raw_files) > 0

raw_rows = None
raw_err = None
if raw_ok:
    try:
        raw_df = spark.read.option("header", True).csv(RAW_DIR + "taxis_*.csv")
        raw_rows = raw_df.count()
        raw_ok = raw_rows > 0
    except Exception as e:
        raw_err = str(e)[:300]
        raw_ok = False

# ---- BRONZE ----
bronze_rows, bronze_ok, bronze_err = None, False, None
try:
    if not _exists(bronze_path):
        if raw_rows is None:
            raw_df = spark.read.option("header", True).csv(RAW_DIR + "taxis_*.csv")
            raw_rows = raw_df.count()
        (raw_df.write.mode("overwrite").format("delta").save(bronze_path))
    bronze_rows = spark.read.format("delta").load(bronze_path).count()
    bronze_ok = bronze_rows > 0
except Exception as e:
    bronze_err = str(e)[:300]
    bronze_ok = False

# ---- SILVER ----
silver_rows, silver_ok, silver_err = None, False, None
try:
    if raw_rows is None:
        raw_df = spark.read.option("header", True).csv(RAW_DIR + "taxis_*.csv")
        raw_rows = raw_df.count()
    silver_df = (
        raw_df.select(
            F.col("pickup").alias("pickup_ts"),
            F.col("dropoff").alias("dropoff_ts"),
            F.col("distance").cast("double").alias("trip_distance"),
            F.col("fare").cast("double").alias("fare_amount"),
            F.col("tip").cast("double").alias("tip_amount"),
            F.col("tolls").cast("double").alias("tolls_amount"),
            F.when(F.col("payment").isin("cash","credit"), F.initcap("payment"))
             .otherwise(F.lit("Other")).alias("payment_type")
        ).na.drop(subset=["trip_distance","fare_amount"])
    )
    (silver_df.write.mode("overwrite").format("delta").save(silver_path))
    silver_rows = spark.read.format("delta").load(silver_path).count()
    silver_ok = silver_rows > 0
except Exception as e:
    silver_err = str(e)[:300]
    silver_ok = False

mark = lambda ok: "✅" if ok else "❌"
print(f"{mark(raw_ok)} RAW exists & readable | files={len(raw_files)} rows={raw_rows if raw_rows is not None else 'n/a'}")
print(f"{mark(bronze_ok)} BRONZE Delta written | rows={bronze_rows if bronze_rows is not None else 'n/a'} @ {bronze_path}")
print(f"{mark(silver_ok)} SILVER Delta written | rows={silver_rows if silver_rows is not None else 'n/a'} @ {silver_path}")

overall = raw_ok and bronze_ok and silver_ok
print("\nRESULT:", "✅ PASS" if overall else "❌ CHECK ABOVE")
if not overall:
    if raw_err:    print("RAW error:", raw_err)
    if bronze_err: print("BRONZE error:", bronze_err)
    if silver_err: print("SILVER error:", silver_err)


✅ RAW exists & readable | files=1 rows=6433
✅ BRONZE Delta written | rows=6433 @ abfss://taxi@sttaxistorage.dfs.core.windows.net/delta/bronze/taxis
✅ SILVER Delta written | rows=6433 @ abfss://taxi@sttaxistorage.dfs.core.windows.net/delta/silver/taxis

RESULT: ✅ PASS


In [0]:
# ---- Data Quality (simple) ----
from pyspark.sql import functions as F

checks = {
    "trip_distance_positive":  (F.col("distance").cast("double") > 0),
    "fare_nonnegative":        (F.col("fare").cast("double") >= 0),
    "payment_type_allowed":    (F.col("payment").isin("cash", "credit"))
}

violations = []
for name, expr in checks.items():
    cnt = raw_df.filter(~expr).count()
    if cnt > 0:
        violations.append((name, cnt))

if violations:
    msg = "DQ failed: " + ", ".join([f"{n}={c}" for n,c in violations])
    raise Exception(msg)

print("Data quality ✅ — all checks passed")


In [0]:
raw_df = (spark.read.option("header", True).csv(RAW_DIR + "taxis_*.csv"))
raw_df.printSchema()
display(raw_df.limit(5))
print("raw rows:", raw_df.count())


root
 |-- pickup: string (nullable = true)
 |-- dropoff: string (nullable = true)
 |-- passengers: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- tip: string (nullable = true)
 |-- tolls: string (nullable = true)
 |-- total: string (nullable = true)
 |-- color: string (nullable = true)
 |-- payment: string (nullable = true)
 |-- pickup_zone: string (nullable = true)
 |-- dropoff_zone: string (nullable = true)
 |-- pickup_borough: string (nullable = true)
 |-- dropoff_borough: string (nullable = true)



pickup,dropoff,passengers,distance,fare,tip,tolls,total,color,payment,pickup_zone,dropoff_zone,pickup_borough,dropoff_borough
2019-03-23 20:21:09,2019-03-23 20:27:24,1,1.6,7.0,2.15,0.0,12.95,yellow,credit card,Lenox Hill West,UN/Turtle Bay South,Manhattan,Manhattan
2019-03-04 16:11:55,2019-03-04 16:19:00,1,0.79,5.0,0.0,0.0,9.3,yellow,cash,Upper West Side South,Upper West Side South,Manhattan,Manhattan
2019-03-27 17:53:01,2019-03-27 18:00:25,1,1.37,7.5,2.36,0.0,14.16,yellow,credit card,Alphabet City,West Village,Manhattan,Manhattan
2019-03-10 01:23:59,2019-03-10 01:49:51,1,7.7,27.0,6.15,0.0,36.95,yellow,credit card,Hudson Sq,Yorkville West,Manhattan,Manhattan
2019-03-30 13:27:42,2019-03-30 13:37:14,3,2.16,9.0,1.1,0.0,13.4,yellow,credit card,Midtown East,Yorkville West,Manhattan,Manhattan


raw rows: 6433


In [0]:
bronze_path = DELTA_BASE + "bronze/taxis"
(raw_df.write.mode("overwrite").format("delta").save(bronze_path))

print("bronze rows:", spark.read.format("delta").load(bronze_path).count())
display(dbutils.fs.ls(DELTA_BASE + "bronze"))


bronze rows: 6433


path,name,size,modificationTime
abfss://taxi@sttaxistorage.dfs.core.windows.net/delta/bronze/taxis/,taxis/,0,1756264337000


In [0]:
from pyspark.sql import functions as F

silver_df = (
    raw_df.select(
        F.col("pickup").alias("pickup_ts"),
        F.col("dropoff").alias("dropoff_ts"),
        F.col("distance").cast("double").alias("trip_distance"),
        F.col("fare").cast("double").alias("fare_amount"),
        F.col("tip").cast("double").alias("tip_amount"),
        F.col("tolls").cast("double").alias("tolls_amount"),
        F.when(F.col("payment").isin("cash","credit"), F.initcap("payment"))
         .otherwise(F.lit("Other")).alias("payment_type")
    )
    .na.drop(subset=["trip_distance","fare_amount"])
)

silver_path = DELTA_BASE + "silver/taxis"
(silver_df.write.mode("overwrite").format("delta").save(silver_path))

print("silver rows:", spark.read.format("delta").load(silver_path).count())
display(dbutils.fs.ls(DELTA_BASE + "silver"))


silver rows: 6433


path,name,size,modificationTime
abfss://taxi@sttaxistorage.dfs.core.windows.net/delta/silver/taxis/,taxis/,0,1756264369000


In [0]:
spark.sql(f"DESCRIBE HISTORY delta.`{bronze_path}`").show(truncate=False)


+-------+-------------------+---------------+-----------------------+---------+------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+-----------------------------------------------------------------------------------------------------------------+------------+-------------------------------------------------+
|version|timestamp          |userId         |userName               |operation|operationParameters                                         |job |notebook          |clusterId           |readVersion|isolationLevel   |isBlindAppend|operationMetrics                                                                                                 |userMetadata|engineInfo                                       |
+-------+-------------------+---------------+-----------------------+---------+------------------------------------------------------------+----+------------------+--------------------+-

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS default;

CREATE TABLE IF NOT EXISTS hive_metastore.default.bronze_taxis
USING DELTA
LOCATION 'abfss://taxi@sttaxistorage.dfs.core.windows.net/delta/bronze/taxis';

CREATE TABLE IF NOT EXISTS hive_metastore.default.silver_taxis
USING DELTA
LOCATION 'abfss://taxi@sttaxistorage.dfs.core.windows.net/delta/silver/taxis';
