In [0]:
data = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/mkatika@gmu.edu/data_457__1_.csv")

In [0]:
data = data.withColumnRenamed("29 - Average Daily Traffic", "Average_Daily_Traffic") \
          .withColumnRenamed("49 - Structure Length (ft.)", "Structure_Length_ft") \
          .withColumnRenamed("CAT29 - Deck Area (sq. ft.)", "Deck_Area_sq_ft") \
          .withColumnRenamed("CAT10 - Bridge Condition", "Bridge_Condition")

In [0]:
data.select("Average_Daily_Traffic", "Structure_Length_ft", "Deck_Area_sq_ft","Bridge_Condition").show()


+---------------------+-------------------+---------------+----------------+
|Average_Daily_Traffic|Structure_Length_ft|Deck_Area_sq_ft|Bridge_Condition|
+---------------------+-------------------+---------------+----------------+
|          0.081369151|        0.047372452|    0.027325572|               1|
|          0.077845176|        0.053973232|    0.029509697|               1|
|          0.080762565|        0.405492141|    0.430442709|               1|
|          0.025967649|        0.089748733|     0.04353149|               1|
|          0.096822646|        0.006236096|    0.015751157|               1|
|          0.068168689|        0.010539368|    0.021221594|               1|
|          0.067273252|        0.143357281|    0.076987655|               1|
|          0.057885615|        0.113416724|    0.061418171|               1|
|          0.057885615|         0.04810182|    0.027414673|               1|
|          0.158001155|        0.001786952|    0.010980796|               1|

In [0]:
data.printSchema()


root
 |-- 8 - Structure Number: string (nullable = true)
 |-- 27 - Year Built: string (nullable = true)
 |-- Average_Daily_Traffic: string (nullable = true)
 |-- Structure_Length_ft: string (nullable = true)
 |-- Bridge_Condition: string (nullable = true)
 |-- Bridge Age (yr): string (nullable = true)
 |-- Deck_Area_sq_ft: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Age Range: string (nullable = true)
 |-- Traffic Category: string (nullable = true)
 |-- 43A - Main Span Material_Concrete Continuous: string (nullable = true)
 |-- 43A - Main Span Material_Other: string (nullable = true)
 |-- 43A - Main Span Material_Prestressed Concrete: string (nullable = true)
 |-- 43A - Main Span Material_Prestressed Concrete Continuous: string (nullable = true)
 |-- 43A - Main Span Material_Steel: string (nullable = true)
 |-- 43A - Main Span Material_Steel Continuous: string (nullable = true)
 |-- 43B - Main Span Design_Box Beam or Girders - Multiple: string (nullable = true)
 |-

In [0]:
from pyspark.sql.functions import col
data = data.withColumn("Average_Daily_Traffic", col("Average_Daily_Traffic").cast("double"))
data = data.withColumn("Age", col("Age").cast("double"))
data = data.withColumn("Structure_Length_ft", col("Structure_Length_ft").cast("double"))
data = data.withColumn("Deck_Area_sq_ft", col("Deck_Area_sq_ft").cast("double"))


In [0]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

data = data.withColumn("Bridge_Condition", col("Bridge_Condition").cast("double"))

# Defining the features (including categorical and numerical columns)
features = ['Average_Daily_Traffic', 'Age', 'Structure_Length_ft', 'Deck_Area_sq_ft']

# Definining the target label 
target_label = "Bridge_Condition"

# Assemble the feature vector
assembler = VectorAssembler(inputCols=features, outputCol="features")

# Creating the Random Forest Regressor
rf_regressor = RandomForestRegressor(labelCol=target_label, featuresCol="features", numTrees=100, maxDepth=5)

# Creating a pipeline to combine feature engineering and model training
pipeline = Pipeline(stages=[assembler, rf_regressor])

# Splitting the data into training and test sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

# Fitting the model on the training data
model = pipeline.fit(train_data)

# Making the predictions on the test data
predictions = model.transform(test_data)


In [0]:
# Displays the predictions with actual and predicted values
predictions.select("Bridge_Condition", "prediction", "features").show(10, truncate=False)


+----------------+------------------+------------------------------------------+
|Bridge_Condition|prediction        |features                                  |
+----------------+------------------+------------------------------------------+
|2.0             |1.1664520549050121|[0.027151935,67.0,0.094781372,0.101106758]|
|1.0             |1.169998365967218 |[0.017908723,59.0,0.113416724,0.117343919]|
|2.0             |1.1260349392421327|[0.021663778,99.0,0.047044236,0.020068492]|
|2.0             |1.2903065532465465|[0.021663778,39.0,0.078370592,0.046737959]|
|2.0             |1.1635477462459416|[0.041883304,73.0,0.068560592,0.05109348] |
|2.0             |1.1721047381190624|[0.042172155,71.0,0.051821597,0.038267026]|
|1.0             |1.1828754919340876|[0.020797227,59.0,0.054337916,0.040274685]|
|2.0             |1.7873057059728479|[0.021663778,16.0,0.041391634,0.031784659]|
|2.0             |1.1617306717411389|[0.03870595,69.0,0.095948361,0.080273389] |
|1.0             |1.25386501

In [0]:
# Create an evaluator for MSE
evaluator_mse = RegressionEvaluator(labelCol="Bridge_Condition", predictionCol="prediction", metricName="mse")

# Calculate MSE
mse = evaluator_mse.evaluate(predictions)

print(f"Mean Squared Error (MSE): {mse}")

Mean Squared Error (MSE): 0.23658667492379346


In [0]:
# Create an evaluator for MAE
evaluator_mae = RegressionEvaluator(labelCol="Bridge_Condition", predictionCol="prediction", metricName="mae")

# Calculate MAE
mae = evaluator_mae.evaluate(predictions)

print(f"Mean Absolute Error (MAE): {mae}")

Mean Absolute Error (MAE): 0.38845112742692495


In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define a parameter grid for RandomForest
param_grid = (ParamGridBuilder()
              .addGrid(rf_regressor.numTrees, [50, 100, 150])
              .addGrid(rf_regressor.maxDepth, [5, 10,15])
              .build())

# Set up CrossValidator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator_mse,
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters
cv_model = crossval.fit(train_data)

# Use the best model found during cross-validation
best_model = cv_model.bestModel

# Make predictions with the best model
best_predictions = best_model.transform(test_data)

# Show predictions
best_predictions.select("prediction", "Bridge_Condition", "Age", "Average_Daily_Traffic").show(10)



In [0]:
# Create an evaluator for MSE
evaluator_mse = RegressionEvaluator(labelCol="Bridge_Condition", predictionCol="prediction", metricName="mse")

# Calculate MSE
mse = evaluator_mse.evaluate(best_predictions)

print(f"Mean Squared Error (MSE): {mse}")

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-716040670073768>:5[0m
[1;32m      2[0m evaluator_mse [38;5;241m=[39m RegressionEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mBridge_Condition[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124mmse[39m[38;5;124m"[39m)
[1;32m      4[0m [38;5;66;03m# Calculate MSE[39;00m
[0;32m----> 5[0m mse [38;5;241m=[39m evaluator_mse[38;5;241m.[39mevaluate(best_predictions)
[1;32m      7[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mMean Squared Error (MSE): [39m[38;5;132;01m{[39;00mmse[38;5;132;01m}[39;00m[38;5;124m"[39m)

[0;31mNameError[0m: name 'best_predictions' is not defined

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define a parameter grid for RandomForest
param_grid = (ParamGridBuilder()
              .addGrid(rf_regressor.numTrees, [50, 100, 150])
              .addGrid(rf_regressor.maxDepth, [5, 10])
              .build())

# Set up CrossValidator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator_mae,
                          numFolds=3)

# Run cross-validation, and choose the best set of parameters
cv_model = crossval.fit(train_data)

# Use the best model found during cross-validation
best_model = cv_model.bestModel

# Make predictions with the best model
best_predictions = best_model.transform(test_data)

# Show predictions
best_predictions.select("prediction", "Bridge_Condition", "Age", "Average_Daily_Traffic").show(10)


+------------------+----------------+----+---------------------+
|        prediction|Bridge_Condition| Age|Average_Daily_Traffic|
+------------------+----------------+----+---------------------+
| 1.087291575174665|             2.0|67.0|          0.027151935|
|1.1594287963800016|             1.0|59.0|          0.017908723|
|0.9475386267335884|             2.0|99.0|          0.021663778|
|1.3102864253644082|             2.0|39.0|          0.021663778|
|1.1028051430424446|             2.0|73.0|          0.041883304|
| 1.117589662171173|             2.0|71.0|          0.042172155|
|1.1543113696403013|             1.0|59.0|          0.020797227|
|1.8194569830841723|             2.0|16.0|          0.021663778|
|1.0741097716189876|             2.0|69.0|           0.03870595|
|1.2244242279675337|             1.0|48.0|          0.017908723|
+------------------+----------------+----+---------------------+
only showing top 10 rows



In [0]:
# Create an evaluator for MAE
evaluator_mae = RegressionEvaluator(labelCol="Bridge_Condition", predictionCol="prediction", metricName="mae")

# Calculate MAE
mae = evaluator_mae.evaluate(best_predictions)

print(f"Mean Absolute Error (MAE): {mae}")

Mean Absolute Error (MAE): 0.37771647231483


In [0]:
# Create an evaluator for MSE
evaluator_rmse = RegressionEvaluator(labelCol="Bridge_Condition", predictionCol="prediction", metricName="rmse")

# Calculate MSE
rmse = evaluator_rmse.evaluate(predictions)

print(f"Mean Squared Error (MSE): {rmse}")

Mean Squared Error (MSE): 0.4864017628707707


In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define a parameter grid for RandomForest
param_grid = (ParamGridBuilder()
              .addGrid(rf_regressor.numTrees, [50, 100, 150])
              .addGrid(rf_regressor.maxDepth, [5, 10])
              .build())

# Set up CrossValidator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator_r2
                          ,
                          numFolds=3)

# Run cross-validation, and choose the best set of parameters
cv_model = crossval.fit(train_data)

# Use the best model found during cross-validation
best_model = cv_model.bestModel

# Make predictions with the best model
best_predictions = best_model.transform(test_data)

# Show predictions
best_predictions.select("prediction", "Bridge_Condition", "Age", "Average_Daily_Traffic").show(10)

+------------------+----------------+----+---------------------+
|        prediction|Bridge_Condition| Age|Average_Daily_Traffic|
+------------------+----------------+----+---------------------+
| 1.078047515577768|             2.0|67.0|          0.027151935|
|1.1481259359621372|             1.0|59.0|          0.017908723|
|0.9536288850277712|             2.0|99.0|          0.021663778|
|1.3255595007011474|             2.0|39.0|          0.021663778|
|1.1053868572827028|             2.0|73.0|          0.041883304|
| 1.128734380061941|             2.0|71.0|          0.042172155|
|1.1603490423843532|             1.0|59.0|          0.020797227|
|1.8362174648208682|             2.0|16.0|          0.021663778|
|1.0661196789195218|             2.0|69.0|           0.03870595|
|1.2129738488942714|             1.0|48.0|          0.017908723|
+------------------+----------------+----+---------------------+
only showing top 10 rows



In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# Cast target column to double
data = data.withColumn("Bridge_Condition", col("Bridge_Condition").cast("double"))

# Define features and target
features = ['Average_Daily_Traffic', 'Age', 'Structure_Length_ft', 'Deck_Area_sq_ft']
target_label = "Bridge_Condition"

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=features, outputCol="features")

# Define the Gradient-Boosted Trees Regressor
gbt_regressor = GBTRegressor(labelCol=target_label, featuresCol="features", maxIter=100, maxDepth=5)

# Create a pipeline
pipeline = Pipeline(stages=[assembler, gbt_regressor])

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

# Fit the model on training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions_new = model.transform(test_data)

# Display some predictions
predictions_new.select("prediction", "Bridge_Condition", "features").show(10)





+------------------+----------------+--------------------+
|        prediction|Bridge_Condition|            features|
+------------------+----------------+--------------------+
|1.1324436608064887|             2.0|[0.027151935,67.0...|
| 1.224769723175897|             1.0|[0.017908723,59.0...|
|0.8455447344077983|             2.0|[0.021663778,99.0...|
|1.3386361707367473|             2.0|[0.021663778,39.0...|
| 1.140383284398651|             2.0|[0.041883304,73.0...|
|1.1698591698108003|             2.0|[0.042172155,71.0...|
|1.1225640965578854|             1.0|[0.020797227,59.0...|
| 1.810320937954274|             2.0|[0.021663778,16.0...|
|1.0711339153489663|             2.0|[0.03870595,69.0,...|
|1.2478482742664712|             1.0|[0.017908723,48.0...|
+------------------+----------------+--------------------+
only showing top 10 rows



In [0]:
# Initializing the Regression Evaluator
evaluator_mae = RegressionEvaluator(labelCol="Bridge_Condition", predictionCol="prediction", metricName="mae")
evaluator_mse = RegressionEvaluator(labelCol="Bridge_Condition", predictionCol="prediction", metricName="mse")

# Computing the metrics
mae = evaluator_mae.evaluate(predictions)
mse = evaluator_mse.evaluate(predictions)

# Printing the results
print(f"Mean Absolute Error (MAE): {mae}")
print(f"Mean Squared Error (MSE): {mse}")



Mean Absolute Error (MAE): 0.38845112742692495
Mean Squared Error (MSE): 0.23658667492379346


In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# ParamGrid for GBT
param_grid = (ParamGridBuilder()
              .addGrid(gbt_regressor.maxDepth, [5, 10])
              .addGrid(gbt_regressor.maxIter, [50, 100])
              .build())

# CrossValidator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator_rmse,
                          numFolds=3)

cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel


In [0]:
# Initialize Regression Evaluator
evaluator_rmse = RegressionEvaluator(labelCol="Bridge_Condition", predictionCol="prediction", metricName="rmse")
rmse = evaluator_rmse.evaluate(best_model)
print(f"Root Mean Squared Error (RMSE): {rmse}")

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-4057650989922533>:2[0m
[1;32m      1[0m [38;5;66;03m# Initialize Regression Evaluator[39;00m
[0;32m----> 2[0m evaluator_rmse [38;5;241m=[39m RegressionEvaluator(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mBridge_Condition[39m[38;5;124m"[39m, predictionCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, metricName[38;5;241m=[39m[38;5;124m"[39m[38;5;124mrmse[39m[38;5;124m"[39m)
[1;32m      3[0m rmse [38;5;241m=[39m evaluator_rmse[38;5;241m.[39mevaluate(best_model)
[1;32m      4[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mRoot Mean Squared Error (RMSE): [39m[38;5;132;01m{[39;00mrmse[38;5;132;01m}[39;00m[38;5;124m"[39m)

[0;31mNameError[0m: name 'RegressionEvaluator' is not defined