In [0]:
# Databricks notebook: 01_ingest_clean_write_delta.py
from pyspark.sql import functions as F
from pyspark.sql.types import *
RAW_DBFS_PATH = '/Volumes/workspace/default/usaccidents_volume' # change if you uploaded different path
DELTA_PATH = '/Volumes/workspace/default/usaccidents_volume/us_accidents_delta' # persistent delta path
DATABASE = 'madsc102'
TABLE_NAME = 'usaccidents_volume'


# 1) Read CSV
print('Reading CSV from:', RAW_DBFS_PATH)
df_raw = spark.read.option('header', True).option('inferSchema', True).csv(RAW_DBFS_PATH)
print('Raw rows:', df_raw.count())
display(df_raw.limit(5))


# 2) Quick schema normalization and cleaning
# Common Kaggle US Accidents column examples: Start_Time, End_Time, Start_Lat, Start_Lng, Distance(mi), Severity, City, State


df = df_raw
print("--- DataFrame Schema ---")
df_raw.printSchema()
print("------------------------")
import pyspark.sql.functions as F # Assuming F is pyspark.sql.functions

def safe_to_timestamp(col):
    return F.to_timestamp(F.col(col), 'yyyy-MM-dd HH:mm:ss') # <-- **Indented**

# create working columns
df = df.withColumn('start_time', safe_to_timestamp('Start_Time')) \
# ... (rest of the code)


# create working columns
df = df.withColumn('start_time', safe_to_timestamp('Start_Time')) \
.withColumn('latitude', F.col('Start_Lat').cast('double')) \
.withColumn('longitude', F.col('Start_Lng').cast('double')) \
.withColumn('distance_miles', F.col('Distance(mi)').cast('double')) \
.withColumn('severity', F.col('Severity').cast('int')) \
.withColumn('city', F.trim(F.col('City'))) \
.withColumn('state', F.trim(F.col('State')))


# engineered features
df = df.withColumn('duration_minutes', (F.unix_timestamp('start_time'))/60.0) \
.withColumn('start_date', F.to_date('start_time')) \
.withColumn('start_hour', F.hour('start_time')) \
.withColumn('is_weekend', F.when(F.dayofweek('start_time').isin([1,7]), True).otherwise(False))


# Drop rows missing core fields (remove 'end_time')
core_cols = ['start_time', 'latitude', 'longitude', 'severity']
df_clean = df.dropna(subset=core_cols)
print('After dropping nulls rows:', df_clean.count())


# Select final schema (add more columns from raw if you want)
final_df = df_clean.select(
'ID','start_time','start_date','start_hour','duration_minutes','severity',
'city','state','latitude','longitude','distance_miles','is_weekend'
)


# 3) Write Delta table (overwrite for initial run)
print('Writing Delta to:', 'dbfs://' + DELTA_PATH)
## 4) Register table in Metastore using DataFrame API

## 3 & 4) Write and Register Delta table as a Managed Table

# 1. Ensure the parent SCHEMA/DATABASE exists
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {DATABASE}")

# 2. Perform the write AND registration in ONE operation
# IMPORTANT: Remove the .option("path", ...) and the .save() command.
# This makes it a Managed Table, storing data in the default UC location.
print(f"Writing data and registering table: {DATABASE}.{TABLE_NAME}")

final_df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable(f"{DATABASE}.{TABLE_NAME}")

print(f'Table created: {DATABASE}.{TABLE_NAME}')

# 5) Quick sanity queries (can remain the same)
print('Total rows in table:')
print(spark.table(f'{DATABASE}.{TABLE_NAME}').count())


display(spark.sql(f"SELECT severity, COUNT(*) as cnt FROM {DATABASE}.{TABLE_NAME} GROUP BY severity ORDER BY severity"))

Reading CSV from: /Volumes/workspace/default/usaccidents_volume
Raw rows: 7061773


ID,Severity,Start_Time,Start_Lat,Start_Lng,Distance(mi),City,County,State,Zipcode,Temperature(F),Humidity(%),Pressure(in),Visibility(mi),Wind_Direction,Wind_Speed(mph),Weather_Condition,Amenity,Bump,Crossing,Give_Way,Junction,No_Exit,Railway,Roundabout,Station,Stop,Traffic_Calming,Traffic_Signal,Turning_Loop,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight,PrecipitationUpdate
3,2,2016-02-08T06:49:27.000Z,39.063148,-84.032608,0.01,Williamsburg,Clermont,OH,45176,36.0,100,29.67,10.0,SW,3.5,Overcast,N,N,N,N,N,N,N,N,N,N,N,Y,N,Night,Night,Day,Day,0.01
4,3,2016-02-08T07:23:34.000Z,39.747753,-84.20558199999998,0.01,Dayton,Montgomery,OH,45417,35.1,96,29.64,9.0,SW,4.6,Mostly Cloudy,N,N,N,N,N,N,N,N,N,N,N,N,N,Night,Day,Day,Day,0.01
5,2,2016-02-08T07:39:07.000Z,39.627781,-84.188354,0.01,Dayton,Montgomery,OH,45459,36.0,89,29.65,6.0,SW,3.5,Mostly Cloudy,N,N,N,N,N,N,N,N,N,N,N,Y,N,Day,Day,Day,Day,0.01
6,3,2016-02-08T07:44:26.000Z,40.10059,-82.92519399999998,0.01,Westerville,Franklin,OH,43081,37.9,97,29.63,7.0,SSW,3.5,Light Rain,N,N,N,N,N,N,N,N,N,N,N,N,N,Day,Day,Day,Day,0.03
7,2,2016-02-08T07:59:35.000Z,39.758274,-84.23050699999997,0.0,Dayton,Montgomery,OH,45417-2476,34.0,100,29.66,7.0,WSW,3.5,Overcast,N,N,N,N,N,N,N,N,N,N,N,N,N,Day,Day,Day,Day,0.01


--- DataFrame Schema ---
root
 |-- ID: integer (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Humidity(%): integer (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- Visibility(mi): double (nullable = true)
 |-- Wind_Direction: string (nullable = true)
 |-- Wind_Speed(mph): double (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Amenity: string (nullable = true)
 |-- Bump: string (nullable = true)
 |-- Crossing: string (nullable = true)
 |-- Give_Way: string (nullable = true)
 |-- Junction: string (nullable = true)
 |-- No_Exit: string (nullable = true)
 |-- Railway: str

severity,cnt
1,65178
2,5680330
3,1137268
4,178997
