In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression as SparkLinearRegression
from pyspark.ml.regression import RandomForestRegressor as SparkRFRegressor
from pyspark.ml.classification import LogisticRegression as SparkLogisticRegression
from pyspark.ml.classification import RandomForestClassifier as SparkRFClassifier
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

# ---------------------------------------------------------
# 1. Load nemsis_subset
# ---------------------------------------------------------
df_spark = spark.table("nemsis_subset")
print("Raw rows:", df_spark.count())
display(df_spark.limit(5))

# ---------------------------------------------------------
# 2. Rename columns
# ---------------------------------------------------------
rename_map = {
    "PcrKey": "incident_id",
    "eDispatch_01": "dispatch_center_id",
    "eDispatch_02": "dispatch_call_id",
    "eResponse_05": "service_requested_type",
    "eResponse_07": "unit_transport_capability",
    "eResponse_23": "response_misc_field",
    "eScene_01": "first_unit_on_scene_flag",
    "eScene_06": "num_patients_at_scene",
    "eScene_07": "mass_casualty_flag",
    "eScene_08": "scene_factor_1",
    "eScene_09": "primary_symptom_or_scene_descr",
    "eSituation_01": "chief_complaint_location",
    "eSituation_02": "possible_injury_flag",
    "eSituation_07": "primary_impression",
    "eSituation_08": "secondary_impression",
    "eSituation_13": "trauma_score_or_severity",
    "eSituation_18": "provider_narrative_1",
    "eSituation_20": "provider_narrative_2",
    "eOutcome_01": "hospital_destination_code",
    "eOutcome_02": "ed_disposition",
    "eOutcome_11": "hospital_admit_time",
    "eOutcome_16": "hospital_discharge_time",
    "eOutcome_18": "long_term_outcome",
    "ePatient_15": "patient_age_value",
    "ePatient_16": "patient_age_units",
    "eResponse_08": "crew_size_or_delay_type",
    "eResponse_12": "response_additional_mode",
}

for old, new in rename_map.items():
    if old in df_spark.columns:
        df_spark = df_spark.withColumnRenamed(old, new)

Raw rows: 10127


PcrKey,eDispatch_01,eDispatch_02,eResponse_05,eResponse_07,eResponse_23,eScene_01,eScene_06,eScene_07,eScene_08,eScene_09,eSituation_01,eSituation_02,eSituation_07,eSituation_08,eSituation_13,eSituation_18,eSituation_20,eOutcome_01,eOutcome_02,eOutcome_11,eOutcome_16,eOutcome_18,ePatient_15,ePatient_16,eResponse_08,eResponse_12
761495,2301061,2302001,2205001,2207023,2223001,9923003,2707005,9923001,7701003,Y92.01,Not Recorded,7701003,7701003,7701003,7701003,Not Applicable,Unknown,7701003,7701003,Not Applicable,Not Applicable,Not Applicable,7701003,7701003,,
22298602,2301003,2302007,2205001,2207015,2223001,9923003,2707005,9923001,7701001,Y92.29,Not Applicable,9922001,2807011,2808011,2813003,Not Applicable,Unknown,7701003,7701003,Not Applicable,Not Applicable,Not Applicable,37,2516009,,2212015.0
61958750,2301061,2302001,2205001,2207017,2223001,9923003,2707005,9923001,7701001,Y92.12,Not Recorded,9922001,2807015,7701003,2813005,Not Applicable,Unknown,7701003,7701003,Not Applicable,Not Applicable,Not Applicable,78,2516009,,2212015.0
108615464,2301061,2302001,2205001,2207015,2223001,9923003,2707003,7701003,7701003,Y92.01,Not Recorded,7701003,7701003,7701003,7701003,Not Applicable,Unknown,7701003,7701003,Not Applicable,Not Applicable,Not Applicable,7701003,7701003,,
113783964,2301071,2302003,2205007,2207017,2223005,9923003,2707005,7701003,7701003,Y92.23,Not Recorded,7701003,7701003,7701003,2813005,Not Applicable,Unknown,7701003,7701003,Not Applicable,Not Applicable,Not Applicable,91,2516009,,2212015.0


In [0]:
# ---------------------------------------------------------
# 3. Synthetic time index and pseudo-hours
# ---------------------------------------------------------
# Sort by incident_id (or any stable key) and add a sequential index
w_order = Window.orderBy("incident_id")
df_spark = df_spark.withColumn("seq_idx", F.row_number().over(w_order) - 1)

# assume N incidents per pseudo-hour
INCIDENTS_PER_HOUR = 20
df_spark = df_spark.withColumn(
    "pseudo_hour",
    (F.col("seq_idx") / INCIDENTS_PER_HOUR).cast(IntegerType())
)

df_spark = df_spark.withColumn(
    "pseudo_day",
    (F.col("pseudo_hour") / 24).cast(IntegerType())
).withColumn(
    "pseudo_hour_of_day",
    (F.col("pseudo_hour") % 24).cast(IntegerType())
)

display(df_spark.select("incident_id", "seq_idx",
                        "pseudo_hour", "pseudo_day",
                        "pseudo_hour_of_day").limit(10))



incident_id,seq_idx,pseudo_hour,pseudo_day,pseudo_hour_of_day
761495,0,0,0,0
22298602,1,0,0,0
61958750,2,0,0,0
108615464,3,0,0,0
113783964,4,0,0,0
120114659,5,0,0,0
158329806,6,0,0,0
161891691,7,0,0,0
172785857,8,0,0,0
199244042,9,0,0,0


In [0]:
# ---------------------------------------------------------
# 4. Age + case‑mix / scene features at incident level
# ---------------------------------------------------------
df_spark = df_spark.withColumn(
    "age_years",
    F.col("patient_age_value").cast(DoubleType())
)

df_spark = df_spark.withColumn(
    "is_pediatric",
    F.when(F.col("age_years") < 18, 1).otherwise(0)
).withColumn(
    "is_geriatric",
    F.when(F.col("age_years") >= 65, 1).otherwise(0)
)

df_spark = df_spark.withColumn(
    "num_patients_at_scene_num",
    F.col("num_patients_at_scene").cast(DoubleType())
)

df_spark = df_spark.withColumn(
    "possible_injury_bin",
    F.when(F.col("possible_injury_flag").isNotNull(), 1).otherwise(0)
)

df_spark = df_spark.withColumn(
    "trauma_score",
    F.col("trauma_score_or_severity").cast(DoubleType())
)

trauma_stats = df_spark.select("trauma_score") \
                       .where(F.col("trauma_score").isNotNull()) \
                       .approxQuantile("trauma_score", [0.75], 0.01)
trauma_thr = trauma_stats[0] if trauma_stats else None

if trauma_thr is not None:
    df_spark = df_spark.withColumn(
        "trauma_high",
        F.when(F.col("trauma_score") >= F.lit(trauma_thr), 1).otherwise(0)
    )
else:
    df_spark = df_spark.withColumn("trauma_high", F.lit(0))



In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType

# 5. Age + case‑mix / scene features at incident level (robust casting)

# age_years as double, tolerate bad strings
df_spark = df_spark.withColumn(
    "age_years",
    F.expr("try_cast(patient_age_value as double)")
)

df_spark = df_spark.withColumn(
    "is_pediatric",
    F.when(F.col("age_years") < 18, 1).otherwise(0)
).withColumn(
    "is_geriatric",
    F.when(F.col("age_years") >= 65, 1).otherwise(0)
)

# num_patients_at_scene as double, tolerate bad strings
df_spark = df_spark.withColumn(
    "num_patients_at_scene_num",
    F.expr("try_cast(num_patients_at_scene as double)")
)

# any non‑NULL possible_injury_flag -> 1
df_spark = df_spark.withColumn(
    "possible_injury_bin",
    F.when(F.col("possible_injury_flag").isNotNull(), 1).otherwise(0)
)

# trauma_score with try_cast
df_spark = df_spark.withColumn(
    "trauma_score",
    F.expr("try_cast(trauma_score_or_severity as double)")
)

# compute trauma_high threshold only on non‑NULL scores
trauma_stats = (
    df_spark.select("trauma_score")
            .where(F.col("trauma_score").isNotNull())
            .approxQuantile("trauma_score", [0.75], 0.01)
)
trauma_thr = trauma_stats[0] if trauma_stats else None

if trauma_thr is not None:
    df_spark = df_spark.withColumn(
        "trauma_high",
        F.when(F.col("trauma_score") >= F.lit(trauma_thr), 1).otherwise(0)
    )
else:
    df_spark = df_spark.withColumn("trauma_high", F.lit(0))

display(df_spark.select("age_years", "num_patients_at_scene_num",
                        "trauma_score").limit(10))




age_years,num_patients_at_scene_num,trauma_score
7701003.0,2707005.0,7701003.0
37.0,2707005.0,2813003.0
78.0,2707005.0,2813005.0
7701003.0,2707003.0,7701003.0
91.0,2707005.0,2813005.0
73.0,2707005.0,2813005.0
7701003.0,2707003.0,7701003.0
75.0,2707005.0,2813003.0
39.0,2707005.0,2813001.0
2.0,2707005.0,2813005.0


In [0]:
# ---------------------------------------------------------
# 6. High‑strain flag (top 25% call_volume)
# ---------------------------------------------------------
q = agg_spark.approxQuantile("call_volume", [0.75], 0.01)[0]
agg_spark = agg_spark.withColumn(
    "high_strain",
    F.when(F.col("call_volume") >= F.lit(q), 1).otherwise(0)
)

display(agg_spark.groupBy("high_strain").count())



high_strain,count
1,506
0,1


In [0]:
# ---------------------------------------------------------
# 7. Assemble features and time‑ordered train/test split
# ---------------------------------------------------------
feature_cols = [
    "hour_of_day", "day_index",
    "avg_num_patients", "max_num_patients",
    "possible_injury_prop", "trauma_high_prop",
    "pediatric_prop", "geriatric_prop",
    "service_mode_nunique",
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

data_ml = assembler.transform(agg_spark).select(
    "pseudo_hour", "features", "call_volume", "high_strain"
).orderBy("pseudo_hour")

w2 = Window.orderBy("pseudo_hour")
data_ml = data_ml.withColumn("row_idx", F.row_number().over(w2) - 1)

total = data_ml.count()
split_idx = int(total * 0.7)

train_reg = data_ml.filter(F.col("row_idx") < split_idx)
test_reg  = data_ml.filter(F.col("row_idx") >= split_idx)

train_cls = train_reg
test_cls  = test_reg

print("Train hours:", train_reg.count(), " Test hours:", test_reg.count())



Train hours: 354  Test hours: 153


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler

# ---------------------------------------------------------
# 8. Assemble features and time‑ordered train/test split (robust)
# ---------------------------------------------------------
feature_cols = [
    "hour_of_day", "day_index",
    "avg_num_patients", "max_num_patients",
    "possible_injury_prop", "trauma_high_prop",
    "pediatric_prop", "geriatric_prop",
    "service_mode_nunique",
]

# force all features to double with try_cast to avoid invalid numeric values
for c in feature_cols:
    df_expr = f"try_cast({c} as double)"
    agg_spark = agg_spark.withColumn(c, F.expr(df_expr))

# replace any remaining NULLs with 0
agg_spark = agg_spark.fillna({c: 0.0 for c in feature_cols})

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

data_ml = (
    assembler
    .transform(agg_spark)
    .select("pseudo_hour", "features", "call_volume", "high_strain")
    .orderBy("pseudo_hour")
)

w2 = Window.orderBy("pseudo_hour")
data_ml = data_ml.withColumn("row_idx", F.row_number().over(w2) - 1)

total = data_ml.count()
split_idx = int(total * 0.7)

train_reg = data_ml.filter(F.col("row_idx") < split_idx)
test_reg  = data_ml.filter(F.col("row_idx") >= split_idx)

train_cls = train_reg
test_cls  = test_reg

print("Train hours:", train_reg.count(), " Test hours:", test_reg.count())



Train hours: 354  Test hours: 153


In [0]:
# 1) Show schema of data_ml actually used in fit()
data_ml.printSchema()

# 2) Look for any '.' values in all numeric-looking columns BEFORE assembling
display(
    agg_spark.select(
        "call_volume", "high_strain",
        "hour_of_day", "day_index",
        "avg_num_patients", "max_num_patients",
        "possible_injury_prop", "trauma_high_prop",
        "pediatric_prop", "geriatric_prop",
        "service_mode_nunique"
    ).where(
        (F.col("call_volume") == ".") |
        (F.col("high_strain") == ".") |
        (F.col("hour_of_day") == ".") |
        (F.col("day_index") == ".") |
        (F.col("avg_num_patients") == ".") |
        (F.col("max_num_patients") == ".") |
        (F.col("possible_injury_prop") == ".") |
        (F.col("trauma_high_prop") == ".") |
        (F.col("pediatric_prop") == ".") |
        (F.col("geriatric_prop") == ".") |
        (F.col("service_mode_nunique") == ".")
    ).limit(50)
)

# 3) Show a few rows of train_reg that will go into LinearRegression
display(train_reg.limit(20))


root
 |-- pseudo_hour: integer (nullable = true)
 |-- features: vectorudt (nullable = true)
 |-- call_volume: double (nullable = false)
 |-- high_strain: double (nullable = false)
 |-- row_idx: integer (nullable = false)





[0;31m---------------------------------------------------------------------------[0m
[0;31mNumberFormatException[0m                     Traceback (most recent call last)
File [0;32m<command-5387237097138909>, line 5[0m
[1;32m      2[0m data_ml[38;5;241m.[39mprintSchema()
[1;32m      4[0m [38;5;66;03m# 2) Look for any '.' values in all numeric-looking columns BEFORE assembling[39;00m
[0;32m----> 5[0m display(
[1;32m      6[0m     agg_spark[38;5;241m.[39mselect(
[1;32m      7[0m         [38;5;124m"[39m[38;5;124mcall_volume[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mhigh_strain[39m[38;5;124m"[39m,
[1;32m      8[0m         [38;5;124m"[39m[38;5;124mhour_of_day[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mday_index[39m[38;5;124m"[39m,
[1;32m      9[0m         [38;5;124m"[39m[38;5;124mavg_num_patients[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mmax_num_patients[39m[38;5;124m"[39m,
[1;32m     10[0m         [38;5;124m"[39m[38

In [0]:
# In Databricks, after building agg_spark
display(
    agg_spark.select("pseudo_hour", "call_volume",
                     "avg_num_patients", "possible_injury_prop",
                     "pediatric_prop", "geriatric_prop")
)

# simple stats
agg_spark.select("call_volume").summary().show()

# distribution of call_volume
display(
    agg_spark.select("call_volume")
             .groupBy("call_volume")
             .count()
             .orderBy("call_volume")
)




[0;31m---------------------------------------------------------------------------[0m
[0;31mNumberFormatException[0m                     Traceback (most recent call last)
File [0;32m<command-5387237097138910>, line 2[0m
[1;32m      1[0m [38;5;66;03m# In Databricks, after building agg_spark[39;00m
[0;32m----> 2[0m display(
[1;32m      3[0m     agg_spark[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mpseudo_hour[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mcall_volume[39m[38;5;124m"[39m,
[1;32m      4[0m                      [38;5;124m"[39m[38;5;124mavg_num_patients[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mpossible_injury_prop[39m[38;5;124m"[39m,
[1;32m      5[0m                      [38;5;124m"[39m[38;5;124mpediatric_prop[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mgeriatric_prop[39m[38;5;124m"[39m)
[1;32m      6[0m )
[1;32m      8[0m [38;5;66;03m# simple stats[39;00m
[1;32m      9[0m agg_spark[38;5;241m.[39mselect([3

In [0]:


from pyspark.ml.regression import LinearRegression as SparkLinearRegression
from pyspark.ml.regression import RandomForestRegressor as SparkRFRegressor
from pyspark.ml.classification import LogisticRegression as SparkLogisticRegression
from pyspark.ml.classification import RandomForestClassifier as SparkRFClassifier
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

# regression evaluators
reg_evaluator_mae = RegressionEvaluator(
    labelCol="call_volume", predictionCol="prediction", metricName="mae"
)
reg_evaluator_r2 = RegressionEvaluator(
    labelCol="call_volume", predictionCol="prediction", metricName="r2"
)

# Linear Regression
lr = SparkLinearRegression(featuresCol="features", labelCol="call_volume")
lr_model = lr.fit(train_reg)
pred_lr = lr_model.transform(test_reg)

print("=== Linear Regression (call volume) ===")
print("MAE:", reg_evaluator_mae.evaluate(pred_lr))
print("R2 :", reg_evaluator_r2.evaluate(pred_lr))

# Random Forest Regressor
rf_reg = SparkRFRegressor(
    featuresCol="features",
    labelCol="call_volume",
    numTrees=200,
    maxDepth=10,
    seed=42
)
rf_reg_model = rf_reg.fit(train_reg)
pred_rf_reg = rf_reg_model.transform(test_reg)

print("=== RF Regressor (call volume) ===")
print("MAE:", reg_evaluator_mae.evaluate(pred_rf_reg))
print("R2 :", reg_evaluator_r2.evaluate(pred_rf_reg))

# Classification
bin_eval = BinaryClassificationEvaluator(
    labelCol="high_strain",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

log_reg = SparkLogisticRegression(
    featuresCol="features",
    labelCol="high_strain",
    maxIter=100
)
log_model = log_reg.fit(train_cls)
pred_log = log_model.transform(test_cls)

print("=== Logistic Regression (high-strain) ===")
try:
    print("ROC-AUC:", bin_eval.evaluate(pred_log))
except Exception as e:
    print("ROC-AUC not defined:", e)
pred_log.groupBy("high_strain", "prediction").count().show()

rf_cls = SparkRFClassifier(
    featuresCol="features",
    labelCol="high_strain",
    numTrees=300,
    maxDepth=10,
    seed=42
)
rf_cls_model = rf_cls.fit(train_cls)
pred_rf_cls = rf_cls_model.transform(test_cls)

print("=== RF Classifier (high-strain) ===")
try:
    print("ROC-AUC:", bin_eval.evaluate(pred_rf_cls))
except Exception as e:
    print("ROC-AUC not defined:", e)
pred_rf_cls.groupBy("high_strain", "prediction").count().show()




[0;31m---------------------------------------------------------------------------[0m
[0;31mSparkException[0m                            Traceback (most recent call last)
File [0;32m<command-5387237097138908>, line 24[0m
[1;32m     16[0m [38;5;66;03m# Random Forest Regressor[39;00m
[1;32m     17[0m rf_reg [38;5;241m=[39m SparkRFRegressor(
[1;32m     18[0m     featuresCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mfeatures[39m[38;5;124m"[39m,
[1;32m     19[0m     labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mcall_volume[39m[38;5;124m"[39m,
[0;32m   (...)[0m
[1;32m     22[0m     seed[38;5;241m=[39m[38;5;241m42[39m
[1;32m     23[0m )
[0;32m---> 24[0m rf_reg_model [38;5;241m=[39m rf_reg[38;5;241m.[39mfit(train_reg)
[1;32m     25[0m pred_rf_reg [38;5;241m=[39m rf_reg_model[38;5;241m.[39mtransform(test_reg)
[1;32m     27[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124m=== RF Regressor (call volume) ===[39m[38;5;124m"[39m)

File [