# Data Processing Code

This juypter notebook processes the data from dbfs

## This section imports the raw datasets and pre-processed them for analysis

In [0]:
from pyspark.sql import SparkSession, functions as F
from functools import reduce

# -----------------------------------------------
# Initialize Spark session
# -----------------------------------------------
spark = SparkSession.builder.appName("SouthernRuttingPreprocessing").getOrCreate()

# -----------------------------------------------
# Define base path
# -----------------------------------------------
base_path = "/Volumes/workspace/mlrutting-3/mlrutting-3/"

In [0]:
# -----------------------------------------------
# Step 1: Load CSV datasets
# -----------------------------------------------
humidity_df   = spark.read.option("header", True).option("inferSchema", True).csv(base_path + "humidity.csv")
precip_df     = spark.read.option("header", True).option("inferSchema", True).csv(base_path + "precipitation.csv")
rutting_df    = spark.read.option("header", True).option("inferSchema", True).csv(base_path + "rutting.csv")
solar_df      = spark.read.option("header", True).option("inferSchema", True).csv(base_path + "solar.csv")
temp_df       = spark.read.option("header", True).option("inferSchema", True).csv(base_path + "temp.csv")
traffic_df    = spark.read.option("header", True).option("inferSchema", True).csv(base_path + "traffic_volume.csv")
wind_df       = spark.read.option("header", True).option("inferSchema", True).csv(base_path + "wind.csv")
grid_df       = spark.read.option("header", True).option("inferSchema", True).csv(base_path + "merra_grid_section.csv")

print("All CSVs loaded successfully")

In [0]:
# -----------------------------------------------
# Step 2: Keep only COUNT5–COUNT13 in traffic data (remove CODE01–CODE04)
# -----------------------------------------------
count_cols = [f"COUNT{i}" for i in range(5, 14)]

# Drop CODE01–CODE04 if they exist
traffic_df = traffic_df.drop(*[f"COUNT{i:02d}" for i in range(1, 5)])

print("Filtered traffic_volume.csv to include only COUNT5–COUNT13 (removed CODE01–CODE04)")
traffic_df.take(5)


In [0]:
# -----------------------------------------------
# Step 3: Merge all climate-related datasets
# -----------------------------------------------
climate_dfs = [humidity_df, precip_df, solar_df, temp_df, wind_df,grid_df]

merged_climate = reduce(
    lambda left, right: left.join(right, on=["MERRA_ID", "YEAR", "MONTH"], how="outer"),
    climate_dfs
)
print("✅ Climate datasets merged")

In [0]:
# -----------------------------------------------
# Step 4: Merge climate + traffic datasets
# -----------------------------------------------
merged_climate_traffic = merged_climate.join(
    traffic_df, on=["SHRP_ID", "YEAR", "MONTH"], how="left"
)

print("✅ Added traffic volume data (COUNT5–COUNT13)")

In [0]:
# -----------------------------------------------
# Step 5: Merge with MERRA grid section to attach SHRP_ID, state, lat/lon, elevation
# -----------------------------------------------
merged_climate_traffic = merged_climate_traffic.join(
    grid_df, on="MERRA_ID", how="left"
)

print("✅ Added SHRP_ID and grid info to climate + traffic data")

# Now the merged dataset has SHRP_ID — use that as the join key for rutting

In [0]:
# -----------------------------------------------
# Step 5: Prepare rutting dataset (extract YEAR, MONTH)
# -----------------------------------------------
rutting_df = rutting_df.withColumn("Survey_Date", F.to_timestamp("Survey_Date"))
rutting_df = rutting_df.withColumn("YEAR", F.year("Survey_Date"))
rutting_df = rutting_df.withColumn("MONTH", F.month("Survey_Date"))

In [0]:
# -----------------------------------------------
# Step 6: Merge with rutting data using SHRP_ID, YEAR, and MONTH
# -----------------------------------------------
rutting_df = rutting_df.withColumn("Survey_Date", F.to_timestamp("Survey_Date"))
rutting_df = rutting_df.withColumn("YEAR", F.year("Survey_Date"))
rutting_df = rutting_df.withColumn("MONTH", F.month("Survey_Date"))

final_df = rutting_df.join(
    merged_climate_traffic,
    on=["SHRP_ID", "YEAR", "MONTH"],
    how="left"
)

print("✅ Final merged dataset (Rutting + Climate + Traffic + Grid)")

In [0]:
# -----------------------------------------------
# Step 6: Merge everything together
# -----------------------------------------------
final_df = rutting_df.join(
    merged_climate_traffic,
    on=["MERRA_ID","SHRP_ID", "YEAR", "MONTH"],
    how="left"
)

print("✅ All datasets merged into final DataFrame")

In [0]:
# -----------------------------------------------
# Step 7: Filter by Southern states
# -----------------------------------------------
southern_states = [
    "DE", "MD", "DC", "VA", "WV",
    "KY", "TN", "NC", "SC", "GA", "FL",
    "AL", "MS", "AR", "LA", "OK", "TX"
]

# Ensure STATE column exists before filtering
if "STATE" in final_df.columns:
    final_df = final_df.filter(F.col("STATE").isin(southern_states))
    print("✅ Filtered dataset to include only Southern states")
else:
    print("⚠️ No STATE column found — skipping Southern state filtering")