In [0]:
# =============================================
# COMPLETE CLEANUP + SETUP FOR MEDALLION PIPELINE
# Run this cell FIRST (and only once, or whenever you want a fresh start)
# =============================================

from pyspark.sql.utils import AnalysisException

print("Starting full cleanup and setup of Unity Catalog structure...\n")

# Step 1: Drop the entire catalog if it exists (complete cleanup)
try:
    spark.sql("DROP CATALOG IF EXISTS it_ticket_medallion CASCADE")
    print("Existing catalog 'it_ticket_medallion' dropped (including all schemas, volumes, and data)")
except AnalysisException as e:
    print("No existing catalog to drop or already clean")

# Step 2: Re-create everything fresh
print("\nCreating new catalog, schemas, and volumes...")

spark.sql("CREATE CATALOG IF NOT EXISTS it_ticket_medallion")
print("Catalog created: it_ticket_medallion")

spark.sql("CREATE SCHEMA IF NOT EXISTS it_ticket_medallion.bronze")
spark.sql("CREATE SCHEMA IF NOT EXISTS it_ticket_medallion.silver")
spark.sql("CREATE SCHEMA IF NOT EXISTS it_ticket_medallion.gold")
print("Schemas created: bronze, silver, gold")

spark.sql("CREATE VOLUME IF NOT EXISTS it_ticket_medallion.bronze.raw_delta")
spark.sql("CREATE VOLUME IF NOT EXISTS it_ticket_medallion.silver.clean_delta")
spark.sql("CREATE VOLUME IF NOT EXISTS it_ticket_medallion.gold.aggregated_delta")
spark.sql("CREATE VOLUME IF NOT EXISTS it_ticket_medallion.gold.ticket_aggregates_csv")
print("Volumes created:")
print("  - bronze.raw_delta")
print("  - silver.clean_delta")
print("  - gold.aggregated_delta")
print("  - gold.ticket_aggregates_csv (for CSV export)")

print("\nUnity Catalog structure is now clean and ready!")
print("You can now safely run the Bronze → Silver → Gold pipeline cells multiple times.")

In [0]:
# Read Raw Data
import pandas as pd
from pyspark.sql import SparkSession

# Workspace path
csv_path = "/Workspace/Users/avi@aidamant.com/servicenow-pipeline/servicenow_incidents_10k.csv"

# Read with Pandas
pandas_df = pd.read_csv(csv_path)

# Convert to Spark DataFrame
raw_df = spark.createDataFrame(pandas_df)

# Print row count, schema, first 20 records
print(f"Raw row count: {raw_df.count()}")
raw_df.printSchema()
display(raw_df.limit(20))

In [0]:
# Bronze Layer
bronze_volume_path = "/Volumes/it_ticket_medallion/bronze/raw_delta"

print("Overwriting Bronze layer...")
# Forces ~4 files
raw_df.repartition(4).write.format("delta").mode("overwrite").save(bronze_volume_path)
print("✓ Bronze overwritten successfully")

print(f"Bronze row count: {spark.read.format('delta').load(bronze_volume_path).count()}")

print("✓ Bronze Delta in volume")
display(spark.read.format("delta").load(bronze_volume_path).limit(10))

In [0]:
# Silver Layer
from pyspark.sql.functions import col, to_timestamp

bronze_df = spark.read.format("delta").load(bronze_volume_path)

# Normalization + cleaning
silver_df = bronze_df.select([col(c).alias(c.lower().replace(" ", "_").replace(".", "_").replace("-", "_")) for c in bronze_df.columns])

silver_df = silver_df.filter(col("number").isNotNull())

silver_df = silver_df.withColumn("opened_at", to_timestamp(col("opened_at"), "M/d/yyyy H:mm")) \
                    .withColumn("closed_at", to_timestamp(col("closed_at"), "M/d/yyyy H:mm"))

numeric_cols = ["priority", "impact", "urgency"]
for c in numeric_cols:
    if c in silver_df.columns:
        silver_df = silver_df.withColumn(c, col(c).cast("int"))

print("Overwriting Silver layer...")
silver_volume_path = "/Volumes/it_ticket_medallion/silver/clean_delta"
silver_df.write.format("delta").mode("overwrite").save(silver_volume_path)
print("✓ Silver overwritten successfully")

print(f"Silver row count: {spark.read.format('delta').load(silver_volume_path).count()}")

display(silver_df.limit(10))

In [0]:
# Gold Layer
silver_df = spark.read.format("delta").load(silver_volume_path)

gold_df = silver_df.groupBy("state", "priority").count() \
                   .withColumnRenamed("count", "ticket_count") \
                   .orderBy(col("ticket_count").desc())

print("Overwriting Gold layer...")
gold_volume_path = "/Volumes/it_ticket_medallion/gold/aggregated_delta"
gold_df.write.format("delta").mode("overwrite").save(gold_volume_path)
print("✓ Gold Delta layer saved")

print(f"Gold row count: {spark.read.format('delta').load(gold_volume_path).count()}")

display(gold_df)

# Export Gold as single CSV into the dedicated volume
gold_csv_volume_path = "/Volumes/it_ticket_medallion/gold/ticket_aggregates_csv"

gold_df.coalesce(1).write \
    .option("header", "true") \
    .mode("overwrite") \
    .csv(gold_csv_volume_path)

print(f"✓ Gold CSV exported successfully!")
print(f"Location: {gold_csv_volume_path}")
print("Find & download in Catalog Explorer:")
print("   it_ticket_medallion → gold → ticket_aggregates_csv → part-00000-*.csv")
print("   Right-click the CSV file → Download")