# Task 3: Human Harm Prediction with Random Forest

**Student:** Nicholas Fleischhauer  
**Date:** December 2, 2025

## Research Question
**Which factors are the strongest predictors of human harm?**  
Can we determine if 'human' factors (location) are more predictive than 'storm' factors (EVENT_TYPE, MAGNITUDE_TYPE)?

## Approach
- **Task:** Binary Classification (predict if harm will occur)
- **Target:** `has_harm` (1 if total harm > 0, else 0)
- **Features:** Storm factors (EVENT_TYPE, MAGNITUDE, MAGNITUDE_TYPE) + Location factors (STATE, EVENT_COUNT_PER_CZ)
- **Model:** Random Forest Classifier with hyperparameter tuning
- **Evaluation:** Train/Validation/Test split (70/15/15)


## 1. Setup and Data Loading


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, count, when, coalesce, lit
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
import time
import json


In [2]:
# Create SparkSession
spark = SparkSession.builder \
    .appName("Task3_HarmPrediction") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("WARN")

print(f"Spark version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark version: 4.0.1
Spark UI: http://671658836a76:4040


In [3]:
# Load data
csv_path = "/home/sparkdev/app/Task2/storm_g2020.csv"
# TODO: Change to GCS for final run
# csv_path = "gs://msds-694-cohort-14-group12/storm_data.csv"

df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("multiLine", "true") \
    .csv(csv_path)

print(f"✓ Loaded {df.count():,} rows with {len(df.columns)} columns")


[Stage 2:>                                                          (0 + 1) / 1]

✓ Loaded 371,544 rows with 51 columns


                                                                                

## 2. Feature Engineering


In [4]:
# Create target variable
df = df.withColumn('TOTAL_HARM',
    coalesce(col('INJURIES_DIRECT'), lit(0)) +
    coalesce(col('INJURIES_INDIRECT'), lit(0)) +
    coalesce(col('DEATHS_DIRECT'), lit(0)) +
    coalesce(col('DEATHS_INDIRECT'), lit(0))
)

df = df.withColumn('has_harm', when(col('TOTAL_HARM') > 0, 1).otherwise(0))

# Maybe remove something even as simple as printing statistics prior to doing the splitting of data?
# OR maybe this isn't necessary, since we might be onnly working with the train and validation locally and
# assuming that the test set is the unseen data in the cloud.
# If so, then I didn't have time to account for the local
# data being disjoint from the cloud data.
print("Class distribution:")
df.groupBy('has_harm').count().orderBy('has_harm').show()


Class distribution:


[Stage 5:>                                                          (0 + 1) / 1]

+--------+------+
|has_harm| count|
+--------+------+
|       0|366940|
|       1|  4604|
+--------+------+



                                                                                

In [5]:
# Create event count per county (population proxy)
event_counts = df.groupBy('STATE', 'CZ_NAME') \
    .agg(count('*').alias('EVENT_COUNT_PER_CZ'))

df = df.join(event_counts, on=['STATE', 'CZ_NAME'], how='left')
df = df.withColumn('EVENT_COUNT_PER_CZ', coalesce(col('EVENT_COUNT_PER_CZ'), lit(1)))

# Handle missing values
df = df.withColumn('MAGNITUDE', coalesce(col('MAGNITUDE'), lit(0.0)))
df = df.withColumn('MAGNITUDE_TYPE', coalesce(col('MAGNITUDE_TYPE'), lit('NONE')))

# Filter invalid rows
df = df.filter(
    col('EVENT_TYPE').isNotNull() &
    col('STATE').isNotNull()
)

print(f"✓ Clean dataset: {df.count():,} rows")


[Stage 8:>                                                          (0 + 1) / 1]

✓ Clean dataset: 371,544 rows


                                                                                

## 3. Train/Validation/Test Split


In [6]:
# Select modeling features
feature_cols = ['EVENT_TYPE', 'STATE', 'MAGNITUDE_TYPE', 'MAGNITUDE', 'EVENT_COUNT_PER_CZ', 'has_harm']
df_model = df.select(feature_cols)

# Split: 70% train, 15% validation, 15% test (NO caching - simpler & avoids warnings)
seed = 42
train_df, temp_df = df_model.randomSplit([0.7, 0.3], seed=seed)
val_df, test_df = temp_df.randomSplit([0.5, 0.5], seed=seed)

print(f"Train set: {train_df.count():,}")
print(f"Validation set: {val_df.count():,}")
print(f"Test set (SACRED): {test_df.count():,}")


                                                                                

Train set: 260,518


                                                                                

Validation set: 55,665


                                                                                

Test set (SACRED): 55,361


## 4. Build ML Pipeline & Train Model


In [7]:
# Build pipeline
indexers = [
    StringIndexer(inputCol='EVENT_TYPE', outputCol='EVENT_TYPE_idx', handleInvalid='keep'),
    StringIndexer(inputCol='STATE', outputCol='STATE_idx', handleInvalid='keep'),
    StringIndexer(inputCol='MAGNITUDE_TYPE', outputCol='MAGNITUDE_TYPE_idx', handleInvalid='keep')
]

encoders = [
    OneHotEncoder(inputCol='EVENT_TYPE_idx', outputCol='EVENT_TYPE_vec'),
    OneHotEncoder(inputCol='STATE_idx', outputCol='STATE_vec'),
    OneHotEncoder(inputCol='MAGNITUDE_TYPE_idx', outputCol='MAGNITUDE_TYPE_vec')
]

assembler = VectorAssembler(
    inputCols=['EVENT_TYPE_vec', 'STATE_vec', 'MAGNITUDE_TYPE_vec', 'MAGNITUDE', 'EVENT_COUNT_PER_CZ'],
    outputCol='features',
    handleInvalid='keep'
)

# Random Forest
rf = RandomForestClassifier(
    labelCol='has_harm',
    featuresCol='features',
    numTrees=100,
    maxDepth=10,
    seed=seed
)

pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

print("✓ Pipeline built with 3 indexers, 3 encoders, 1 assembler, 1 RF classifier")


✓ Pipeline built with 3 indexers, 3 encoders, 1 assembler, 1 RF classifier


In [8]:
# Train model (SIMPLIFIED - removed caching to avoid issues)
print("=" * 60)
print("TRAINING RANDOM FOREST MODEL")
print("=" * 60)
print("Training on 260K rows...")
print("(This may take 2-5 minutes...)")
print()

start_time = time.time()
model = pipeline.fit(train_df)
train_time = time.time() - start_time

print(f"✓ Training completed in {train_time:.2f}s")
print()

# Evaluate on validation set
print("Evaluating model on validation set...")
val_pred = model.transform(val_df)

evaluator_auc = BinaryClassificationEvaluator(labelCol='has_harm', metricName='areaUnderROC')
evaluator_acc = MulticlassClassificationEvaluator(labelCol='has_harm', metricName='accuracy')

val_auc = evaluator_auc.evaluate(val_pred)
val_acc = evaluator_acc.evaluate(val_pred)

print(f"\nValidation AUC: {val_auc:.4f}")
print(f"Validation Accuracy: {val_acc:.4f}")
print("=" * 60)


TRAINING RANDOM FOREST MODEL
Training on 260K rows...
(This may take 2-5 minutes...)



25/12/03 05:33:10 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/12/03 05:33:22 WARN DAGScheduler: Broadcasting large task binary with size 1096.3 KiB
25/12/03 05:33:23 WARN DAGScheduler: Broadcasting large task binary with size 1366.5 KiB
25/12/03 05:33:25 WARN DAGScheduler: Broadcasting large task binary with size 1674.1 KiB
                                                                                

✓ Training completed in 25.45s

Evaluating model on validation set...


                                                                                


Validation AUC: 0.8085
Validation Accuracy: 0.9889


## 5. SACRED Test Set Evaluation


In [9]:
print("="*60)
print("EVALUATING ON SACRED TEST SET (FIRST & ONLY TIME)")
print("="*60)

test_pred = model.transform(test_df)

test_auc = evaluator_auc.evaluate(test_pred)
test_acc = evaluator_acc.evaluate(test_pred)

# Maybe change these logs to be more valid in stating that this is 
print(f"\nTest AUC: {test_auc:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")

print("\nConfusion Matrix:")
test_pred.groupBy('has_harm', 'prediction').count().orderBy('has_harm', 'prediction').show()


EVALUATING ON SACRED TEST SET (FIRST & ONLY TIME)


                                                                                


Test AUC: 0.8313
Test Accuracy: 0.9880

Confusion Matrix:


                                                                                

+--------+----------+-----+
|has_harm|prediction|count|
+--------+----------+-----+
|       0|       0.0|54636|
|       0|       1.0|    6|
|       1|       0.0|  660|
|       1|       1.0|   59|
+--------+----------+-----+



## 6. Feature Importance (Answer Research Question)


In [10]:
# Extract feature importance with detailed breakdown
print("Extracting feature importance for stakeholder interpretation...")
print("="*80)

rf_model = model.stages[-1]
feature_importance = rf_model.featureImportances.toArray()

# Get indexers to map back to original categories
event_type_indexer = model.stages[0]
state_indexer = model.stages[1]
mag_type_indexer = model.stages[2]

# Get encoders for dimensions
event_type_encoder = model.stages[3]
state_encoder = model.stages[4]
mag_type_encoder = model.stages[5]

# Get number of categories from indexers, then calc features (n-1 due to one-hot encoding)
n_event_cats = len(event_type_indexer.labels)
n_state_cats = len(state_indexer.labels)
n_mag_cats = len(mag_type_indexer.labels)

n_event = max(1, n_event_cats - 1)
n_state = max(1, n_state_cats - 1)
n_mag = max(1, n_mag_cats - 1)

print(f"\nFeature vector breakdown:")
print(f"  EVENT_TYPE one-hot:      indices 0-{n_event-1} ({n_event_cats} categories → {n_event} features)")
print(f"  STATE one-hot:           indices {n_event}-{n_event+n_state-1} ({n_state_cats} categories → {n_state} features)")
print(f"  MAGNITUDE_TYPE one-hot:  indices {n_event+n_state}-{n_event+n_state+n_mag-1} ({n_mag_cats} categories → {n_mag} features)")
print(f"  MAGNITUDE (numeric):     index {n_event+n_state+n_mag}")
print(f"  EVENT_COUNT_PER_CZ:      index {n_event+n_state+n_mag+1}")
print(f"  TOTAL FEATURES:          {len(feature_importance)}")
print()

# Aggregate importances by feature group
event_type_imp = sum(feature_importance[:n_event])
state_imp = sum(feature_importance[n_event:n_event+n_state])
mag_type_imp = sum(feature_importance[n_event+n_state:n_event+n_state+n_mag])
magnitude_imp = feature_importance[n_event+n_state+n_mag] if len(feature_importance) > n_event+n_state+n_mag else 0
event_count_imp = feature_importance[n_event+n_state+n_mag+1] if len(feature_importance) > n_event+n_state+n_mag+1 else 0

# DETAILED BREAKDOWN FOR STAKEHOLDERS
print("="*80)
print("DETAILED FEATURE IMPORTANCE BREAKDOWN (For Stakeholder Presentation)")
print("="*80)
print()
print("INDIVIDUAL FEATURE GROUP IMPORTANCE:")
print("-" * 80)
print(f"1. EVENT_TYPE (storm type):           {event_type_imp:.6f} ({event_type_imp/sum(feature_importance)*100:.2f}%)")
print(f"2. STATE (location):                  {state_imp:.6f} ({state_imp/sum(feature_importance)*100:.2f}%)")
print(f"3. MAGNITUDE_TYPE (wind measurement): {mag_type_imp:.6f} ({mag_type_imp/sum(feature_importance)*100:.2f}%)")
print(f"4. MAGNITUDE (wind speed):            {magnitude_imp:.6f} ({magnitude_imp/sum(feature_importance)*100:.2f}%)")
print(f"5. EVENT_COUNT_PER_CZ (pop. proxy):   {event_count_imp:.6f} ({event_count_imp/sum(feature_importance)*100:.2f}%)")
print()

# Calculate storm vs location groupings
storm_imp = event_type_imp + mag_type_imp + magnitude_imp
location_imp = state_imp + event_count_imp

print("="*80)
print("HIGH-LEVEL SUMMARY (Storm vs Location)")
print("="*80)
print()
print("STORM-RELATED FACTORS (what type of weather event):")
print(f"  - EVENT_TYPE:      {event_type_imp:.6f} ({event_type_imp/sum(feature_importance)*100:.2f}%)")
print(f"  - MAGNITUDE_TYPE:  {mag_type_imp:.6f} ({mag_type_imp/sum(feature_importance)*100:.2f}%)")
print(f"  - MAGNITUDE:       {magnitude_imp:.6f} ({magnitude_imp/sum(feature_importance)*100:.2f}%)")
print(f"  TOTAL STORM:       {storm_imp:.6f} ({storm_imp/sum(feature_importance)*100:.2f}%)")
print()
print("LOCATION-RELATED FACTORS (where the event occurs):")
print(f"  - STATE:           {state_imp:.6f} ({state_imp/sum(feature_importance)*100:.2f}%)")
print(f"  - EVENT_COUNT:     {event_count_imp:.6f} ({event_count_imp/sum(feature_importance)*100:.2f}%)")
print(f"  TOTAL LOCATION:    {location_imp:.6f} ({location_imp/sum(feature_importance)*100:.2f}%)")
print()

print("="*80)
print("RESEARCH QUESTION ANSWER")
print("="*80)
if storm_imp > location_imp:
    diff_pct = (storm_imp - location_imp) / sum(feature_importance) * 100
    storm_pct = storm_imp/sum(feature_importance)*100
    location_pct = location_imp/sum(feature_importance)*100
    
    print(f"✓ STORM factors are MORE predictive of human harm")
    print(f"  Storm factors: {storm_pct:.1f}%")
    print(f"  Location factors: {location_pct:.1f}%")
    print(f"  Difference: {diff_pct:.1f} percentage points")
    print()
    
    # Dynamic interpretation based on magnitude of difference
    if diff_pct < 10:
        intensity = "slightly"
    elif diff_pct < 30:
        intensity = "moderately"
    else:
        intensity = "significantly"
    
    print(f"Interpretation: The TYPE of weather event (tornado, flood, heat, etc.)")
    print(f"is {intensity} more important ({storm_pct:.1f}% vs {location_pct:.1f}%) than WHERE it occurs when predicting harm.")
else:
    diff_pct = (location_imp - storm_imp) / sum(feature_importance) * 100
    storm_pct = storm_imp/sum(feature_importance)*100
    location_pct = location_imp/sum(feature_importance)*100
    
    print(f"✓ LOCATION factors are MORE predictive of human harm")
    print(f"  Location factors: {location_pct:.1f}%")
    print(f"  Storm factors: {storm_pct:.1f}%")
    print(f"  Difference: {diff_pct:.1f} percentage points")
    print()
    
    # Dynamic interpretation based on magnitude of difference
    if diff_pct < 10:
        intensity = "slightly"
    elif diff_pct < 30:
        intensity = "moderately"
    else:
        intensity = "significantly"
    
    print(f"Interpretation: WHERE a weather event occurs (which state, population)")
    print(f"is {intensity} more important ({location_pct:.1f}% vs {storm_pct:.1f}%) than the TYPE of event when predicting harm.")
print("="*80)


Extracting feature importance for stakeholder interpretation...

Feature vector breakdown:
  EVENT_TYPE one-hot:      indices 0-53 (55 categories → 54 features)
  STATE one-hot:           indices 54-121 (69 categories → 68 features)
  MAGNITUDE_TYPE one-hot:  indices 122-125 (5 categories → 4 features)
  MAGNITUDE (numeric):     index 126
  EVENT_COUNT_PER_CZ:      index 127
  TOTAL FEATURES:          131

DETAILED FEATURE IMPORTANCE BREAKDOWN (For Stakeholder Presentation)

INDIVIDUAL FEATURE GROUP IMPORTANCE:
--------------------------------------------------------------------------------
1. EVENT_TYPE (storm type):           0.710642 (71.06%)
2. STATE (location):                  0.227856 (22.79%)
3. MAGNITUDE_TYPE (wind measurement): 0.010218 (1.02%)
4. MAGNITUDE (wind speed):            0.004380 (0.44%)
5. EVENT_COUNT_PER_CZ (pop. proxy):   0.000001 (0.00%)

HIGH-LEVEL SUMMARY (Storm vs Location)

STORM-RELATED FACTORS (what type of weather event):
  - EVENT_TYPE:      0.710642 (7

### 6.1 Top Contributing Event Types and States (Actionable Insights)


In [11]:
# Show TOP contributing event types and states for stakeholders
print("="*80)
print("TOP CONTRIBUTING FACTORS (Most Actionable for Stakeholders)")
print("="*80)
print()

# Get the labels from indexers
event_type_labels = event_type_indexer.labels
state_labels = state_indexer.labels

# Get top EVENT_TYPES by importance
event_type_importances = feature_importance[:n_event]
top_event_indices = sorted(range(len(event_type_importances)), 
                          key=lambda i: event_type_importances[i], 
                          reverse=True)[:10]

print("TOP 10 EVENT TYPES (Storm Types) for Predicting Harm:")
print("-" * 80)
for rank, idx in enumerate(top_event_indices, 1):
    if idx < len(event_type_labels):
        event_name = event_type_labels[idx]
        importance = event_type_importances[idx]
        pct = (importance / sum(feature_importance)) * 100
        print(f"{rank:2d}. {event_name:30s} - {importance:.6f} ({pct:.2f}%)")
print()

# Get top STATES by importance
state_importances = feature_importance[n_event:n_event+n_state]
top_state_indices = sorted(range(len(state_importances)), 
                          key=lambda i: state_importances[i], 
                          reverse=True)[:10]

print("TOP 10 STATES (Locations) for Predicting Harm:")
print("-" * 80)
for rank, idx in enumerate(top_state_indices, 1):
    if idx < len(state_labels):
        state_name = state_labels[idx]
        importance = state_importances[idx]
        pct = (importance / sum(feature_importance)) * 100
        print(f"{rank:2d}. {state_name:30s} - {importance:.6f} ({pct:.2f}%)")
print()

print("="*80)
print("STAKEHOLDER RECOMMENDATIONS:")
print("="*80)
print("Based on feature importance analysis:")
print()

# Dynamic recommendations based on actual importance values
storm_pct = storm_imp/sum(feature_importance)*100
location_pct = location_imp/sum(feature_importance)*100
diff = abs(storm_pct - location_pct)

if storm_imp > location_imp:
    print(f"1. PRIORITY: FOCUS on EVENT TYPE (storm characteristics) - {storm_pct:.1f}% importance")
    print(f"   - Train responders for top event types (Rip Current, Heat, Avalanche, etc.)")
    print(f"   - Develop event-specific response protocols")
    print(f"   - Stock emergency supplies tailored to these specific events")
    print()
    print(f"2. SECONDARY: Target high-risk locations - {location_pct:.1f}% importance")
    print(f"   - Allocate resources to top states (Wyoming, Utah, etc.)")
    print(f"   - Enhance warning systems in vulnerable regions")
    print()
else:
    print(f"1. PRIORITY: FOCUS on LOCATION (where events occur) - {location_pct:.1f}% importance")
    print(f"   - Allocate resources to top states listed above")
    print(f"   - Enhance regional warning systems")
    print()
    print(f"2. SECONDARY: Event type awareness - {storm_pct:.1f}% importance")
    print(f"   - Train for top event types")
    print(f"   - Event-specific protocols")
    print()

if diff < 10:
    print(f"3. NOTE: Storm ({storm_pct:.1f}%) and location ({location_pct:.1f}%) factors are close")
    print(f"   - Difference is only {diff:.1f} percentage points")
    print(f"   - Best strategy: Consider BOTH event type AND location together")
else:
    print(f"3. NOTE: Clear dominant factor ({max(storm_pct, location_pct):.1f}% vs {min(storm_pct, location_pct):.1f}%)")
    print(f"   - Focus resources on the dominant factor")
    print(f"   - Secondary factor still relevant but less critical")

print("="*80)


TOP CONTRIBUTING FACTORS (Most Actionable for Stakeholders)

TOP 10 EVENT TYPES (Storm Types) for Predicting Harm:
--------------------------------------------------------------------------------
 1. Rip Current                    - 0.437450 (43.75%)
 2. Heat                           - 0.096486 (9.65%)
 3. Avalanche                      - 0.068776 (6.88%)
 4. Lightning                      - 0.048487 (4.85%)
 5. Tornado                        - 0.015374 (1.54%)
 6. Wildfire                       - 0.005993 (0.60%)
 7. Hurricane (Typhoon)            - 0.005545 (0.55%)
 8. Hail                           - 0.005440 (0.54%)
 9. Marine Strong Wind             - 0.003445 (0.34%)
10. Excessive Heat                 - 0.003432 (0.34%)

TOP 10 STATES (Locations) for Predicting Harm:
--------------------------------------------------------------------------------
 1. WYOMING                        - 0.169631 (16.96%)
 2. UTAH                           - 0.011579 (1.16%)
 3. WISCONSIN            

## 7. Save Results


In [12]:
# Save results
results = {
    'test_auc': float(test_auc),
    'test_accuracy': float(test_acc),
    'val_auc': float(val_auc),
    'val_accuracy': float(val_acc),
    'storm_importance': float(storm_imp),
    'location_importance': float(location_imp),
    'event_type_importance': float(event_type_imp),
    'state_importance': float(state_imp),
    'magnitude_importance': float(magnitude_imp),
    'event_count_importance': float(event_count_imp)
}

with open('/home/sparkdev/app/Task3_results.json', 'w') as f:
    json.dump(results, f, indent=2)

print("✓ Results saved to Task3_results.json")
print("\nSummary:")
print(json.dumps(results, indent=2))


✓ Results saved to Task3_results.json

Summary:
{
  "test_auc": 0.8312743629681817,
  "test_accuracy": 0.9879698704864436,
  "val_auc": 0.8084900646485705,
  "val_accuracy": 0.9888799065840295,
  "storm_importance": 0.7252400577656135,
  "location_importance": 0.22785755121787488,
  "event_type_importance": 0.7106420203071844,
  "state_importance": 0.22785642733593744,
  "magnitude_importance": 0.00438001568949174,
  "event_count_importance": 1.1238819374258197e-06
}


In [13]:
# Stop Spark
spark.stop()
print("✓ Complete!")


✓ Complete!
