In [0]:
from pyspark.sql.functions import current_timestamp, col, regexp_replace, split
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType

raw_path = "/Volumes/workspace/default/raw_data/raw_marketing_events.csv"

# 1) Read as text
df_text = spark.read.text(raw_path)

# 2) Remove surrounding quotes
df_lines = df_text.select(regexp_replace(col("value"), '^"|"$', "").alias("line"))

# 3) Separate header and data rows
header = df_lines.first()["line"]
data_lines = df_lines.filter(col("line") != header)

# 4) Split into columns
parts = split(col("line"), ",")

df_raw = data_lines.select(
    parts.getItem(0).alias("event_time"),
    parts.getItem(1).cast("int").alias("customer_id"),
    parts.getItem(2).alias("channel"),
    parts.getItem(3).cast("double").alias("spend"),
    parts.getItem(4).cast("int").alias("conversion"),
    parts.getItem(5).cast("double").alias("revenue"),
)

# 5) Add ingestion metadata (UC-friendly)
df_raw = (
    df_raw
    .withColumn("ingest_time", current_timestamp())
    .withColumn("source_file", col("_metadata.file_path"))
)

display(df_raw)


event_time,customer_id,channel,spend,conversion,revenue,ingest_time,source_file
2025-01-05 10:12:00,1001,Search,25.5,1,120.0,2026-01-15T21:53:42.082Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-05 11:00:00,1002,Social,10.0,0,0.0,2026-01-15T21:53:42.082Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-06 09:30:00,1003,TV,80.0,1,200.0,2026-01-15T21:53:42.082Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-06 12:10:00,1001,Email,2.0,1,30.0,2026-01-15T21:53:42.082Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-06 12:10:00,1001,Email,2.0,1,30.0,2026-01-15T21:53:42.082Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-07 15:45:00,1004,Search,18.0,0,0.0,2026-01-15T21:53:42.082Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-08 08:20:00,1005,Social,12.0,1,60.0,2026-01-15T21:53:42.082Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-08 08:20:00,1005,social,12.0,1,60.0,2026-01-15T21:53:42.082Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-09 20:05:00,1006,TV,90.0,0,0.0,2026-01-15T21:53:42.082Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-10 14:00:00,1007,Email,3.0,1,40.0,2026-01-15T21:53:42.082Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv


In [0]:
bronze_path = "/Volumes/workspace/default/raw_data/bronze_marketing_events"

(
    df_raw.write
    .format("delta")
    .mode("overwrite")
    .save(bronze_path)
)

print("✅ Bronze Delta saved at:", bronze_path)


✅ Bronze Delta saved at: /Volumes/workspace/default/raw_data/bronze_marketing_events


In [0]:
df_bronze = spark.read.format("delta").load(bronze_path)
print("Row count:", df_bronze.count())
display(df_bronze)


Row count: 10


event_time,customer_id,channel,spend,conversion,revenue,ingest_time,source_file
2025-01-05 10:12:00,1001,Search,25.5,1,120.0,2026-01-15T21:54:19.160Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-05 11:00:00,1002,Social,10.0,0,0.0,2026-01-15T21:54:19.160Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-06 09:30:00,1003,TV,80.0,1,200.0,2026-01-15T21:54:19.160Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-06 12:10:00,1001,Email,2.0,1,30.0,2026-01-15T21:54:19.160Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-06 12:10:00,1001,Email,2.0,1,30.0,2026-01-15T21:54:19.160Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-07 15:45:00,1004,Search,18.0,0,0.0,2026-01-15T21:54:19.160Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-08 08:20:00,1005,Social,12.0,1,60.0,2026-01-15T21:54:19.160Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-08 08:20:00,1005,social,12.0,1,60.0,2026-01-15T21:54:19.160Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-09 20:05:00,1006,TV,90.0,0,0.0,2026-01-15T21:54:19.160Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
2025-01-10 14:00:00,1007,Email,3.0,1,40.0,2026-01-15T21:54:19.160Z,dbfs:/Volumes/workspace/default/raw_data/raw_marketing_events.csv
