### AIT 614 - Big Data Essentials <br>
#### Project Title: FHWA Bridge Conditions Analysis Using Big Data Techniques
#### 1. Spark with Python
#### TEAM 4
<hr>

Course Section #: AIT 614 - 003 <br>
#### Team Members
1. Aryan Patel Kolagani - G01517560 <br>
2. Rithvik Madhavaram - G01501806 <br>
3. Chetan Muppavarapu - G01504057 <br>
4. Srivaths Nrusimha Rao Chengal - G01512113 <br>
5. Vaibhav Hasu - G01517039 <br>

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace



# Loading the dataset from DBFS
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/akolagan@gmu.edu/FHWA_Bridge_Conditions_Dataset.csv")

# Cleaning non-numeric columns (remove $ and , and convert to numeric)
columns_to_clean = [
    "Repair_Cost_USD", "Deck_Condition", "Daily_Traffic", "Age_Years",
    "Deterioration_Rate", "Failure_Probability", "Truck_Percentage",
    "Repair_Time_Days", "Length_Meters"
]

# Apply cleaning and cast to double
for col_name in columns_to_clean:
    df = df.withColumn(f"{col_name}_clean", regexp_replace(col(col_name), "[$,%]", ""))
    df = df.withColumn(f"{col_name}_clean", col(f"{col_name}_clean").cast("double"))

# Droping rows with nulls in key columns
df_clean = df.dropna(subset=[
    "Deck_Condition_clean", "Daily_Traffic_clean", "Age_Years_clean", "Repair_Cost_USD_clean"
])

# Droping original versions of conflicting columns before renaming
df_clean = df_clean.drop("Age_Years", "Deck_Condition", "Daily_Traffic", "Repair_Cost_USD",
                         "Deterioration_Rate", "Failure_Probability", "Truck_Percentage", 
                         "Repair_Time_Days", "Length_Meters")

# Renaming cleaned columns to final names
df_clean = df_clean \
    .withColumnRenamed("Deck_Condition_clean", "Deck_Condition") \
    .withColumnRenamed("Daily_Traffic_clean", "Daily_Traffic") \
    .withColumnRenamed("Age_Years_clean", "Age_Years") \
    .withColumnRenamed("Repair_Cost_USD_clean", "Repair_Cost_USD") \
    .withColumnRenamed("Deterioration_Rate_clean", "Deterioration_Rate") \
    .withColumnRenamed("Failure_Probability_clean", "Failure_Probability") \
    .withColumnRenamed("Truck_Percentage_clean", "Truck_Percentage") \
    .withColumnRenamed("Repair_Time_Days_clean", "Repair_Time_Days") \
    .withColumnRenamed("Length_Meters_clean", "Length_Meters")

# Exploratory Data Queries

# a. Sample of cleaned dataset
df_clean.select("Bridge_ID", "Material", "Age_Years", "Deck_Condition", "Daily_Traffic", "Repair_Cost_USD").show(10)

# b. Count of bridges by Deck Condition
df_clean.groupBy("Deck_Condition").count().orderBy("Deck_Condition").show()

# c. Average Repair Cost by Material
df_clean.groupBy("Material").avg("Repair_Cost_USD").orderBy("Material").show()

# d. Bridges older than 60 years with poor Deck Condition
df_clean.filter((col("Age_Years") > 60) & (col("Deck_Condition") <= 4)) \
    .select("Bridge_ID", "Material", "Age_Years", "Deck_Condition") \
    .orderBy("Age_Years", ascending=False).show(10)

# e. Top 10 bridges by Daily Traffic
df_clean.orderBy(col("Daily_Traffic").desc()) \
    .select("Bridge_ID", "State", "Daily_Traffic", "Deck_Condition").show(10)

# Saving cleaned dataset to DBFS for reuse by ML and dashboard teams
df_clean.write.mode("overwrite").csv("/dbfs/FileStore/cleaned_fhwa_bridge_data.csv", header=True)

# Printing schema to verify final structure
df_clean.printSchema()

+---------+---------+---------+--------------+-------------+------------------+
|Bridge_ID| Material|Age_Years|Deck_Condition|Daily_Traffic|   Repair_Cost_USD|
+---------+---------+---------+--------------+-------------+------------------+
|   100000|    Steel|     17.0|           6.0|      12899.0| 4315877.039064087|
|   100001|Composite|     52.0|           6.0|      47458.0|1881782.6492399995|
|   100002|Composite|     35.0|           3.0|      68505.0|3648371.4340184284|
|   100003|   Timber|     60.0|           4.0|      10563.0|1828414.1550813331|
|   100004|  Masonry|     90.0|           8.0|     157293.0|1867925.1355395797|
|   100005|    Steel|     76.0|           6.0|      99947.0| 2699402.113745118|
|   100006|   Timber|     25.0|           8.0|       9686.0|1496286.4476785637|
|   100007|    Steel|     86.0|           5.0|     133008.0|1959640.3036547482|
|   100008|Composite|     20.0|           4.0|     177541.0|3547937.3828598824|
|   100009| Concrete|     81.0|         