## Stop Sign

### Set-up

In [1]:
# Load libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, udf, stddev
from pyspark.sql.types import FloatType
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import pandas as pd
import matplotlib.pyplot as plt

In [2]:
# Start session
spark = SparkSession\
        .builder\
        .config("spark.driver.memory", "4g")\
        .config("spark.executor.memory", "4g")\
        .getOrCreate()

23/06/08 16:20:28 WARN Utils: Your hostname, Shaolongs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.4.167 instead (on interface en0)
23/06/08 16:20:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/08 16:20:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/08 16:20:29 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# Read data
path = "/Users/shaolongxue/Documents/MSBA/3_Spring_Quarter/BAX423_Big_Data_Analytics/Final Project/Data/US_Traffic_v02.csv"
data = spark.read.format('csv').option('header', 'true').load(path)

In [4]:
# Select relevant features
cols_to_remove = ["Start_Lat", "Start_Lng", "End_Lat", "End_Lng", "Description", 
                  "Number", "Street", "Country", "Timezone", "Airport_Code", "Zipcode", 
                  "Weather_Timestamp", "Sunrise_Sunset", "Civil_Twilight", "Nautical_Twilight", 
                  "Astronomical_Twilight", "Weather_Condition", "Wind_Direction"]

df = data.select([col for col in data.columns if col not in cols_to_remove])

df = df.na.drop()

# Cast numeric columns to appropriate types
double_columns = ["Distance(mi)", "Temperature(F)", "Wind_Chill(F)", 
                   "Humidity(%)", "Pressure(in)", "Visibility(mi)", 
                   "Wind_Speed(mph)", "Precipitation(in)", "Duration"]

integer_columns = ["Population", "Start_Year", "Start_Month"]

for column in double_columns:
    df = df.withColumn(column, col(column).cast("double"))

for column in integer_columns:
    df = df.withColumn(column, col(column).cast("integer"))

# Remove one outlier row wither Side is "N"
df = df.where(df.Side != 'N')
# Remove outliers where duration is larger than 7 days
df = df.where(df.Duration <= 168)

# Encode binary categorical columns
binary_columns = ['Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop']

for column in binary_columns:
    df = df.withColumn(column, when(col(column) == "true", 1).otherwise(0))

df = df.withColumn("Side", when(col("Side") == "L", 1).otherwise(0))

# Convert values in Start_TOD_Category column [Necessary for SMOTE]
df = df.withColumn("Start_TOD_Category",
                    when(df["Start_TOD_Category"] == "Midnight", "1")
                   .when(df["Start_TOD_Category"] == "Early Morning", "2")
                   .when(df["Start_TOD_Category"] == "Late Morning", "3")
                   .when(df["Start_TOD_Category"] == "Early Afternoon", "4")
                   .when(df["Start_TOD_Category"] == "Late Afternoon", "5")
                   .when(df["Start_TOD_Category"] == "Evening", "6"))

# First convert the categories from 'string' to 'index'
indexer = StringIndexer(inputCol="Start_TOD_Category", outputCol="Start_TOD_Category_index")
df = indexer.fit(df).transform(df)

indexer = StringIndexer(inputCol="Start_Weekday", outputCol="Start_Weekday_index")
df = indexer.fit(df).transform(df)

# Then one-hot encode these indices
encoder = OneHotEncoder(inputCols=["Start_TOD_Category_index", "Start_Weekday_index"],
                        outputCols=["Start_TOD_Category_vec", "Start_Weekday_vec"])
model = encoder.fit(df)
df = model.transform(df)

23/06/08 16:20:49 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

### EDA

In [6]:
# Calculate summary stats
df.groupBy("Stop").count().show()

[Stage 8:=====>                                                   (1 + 10) / 11]

+----+-------+
|Stop|  count|
+----+-------+
|   1|  37234|
|   0|1708302|
+----+-------+



                                                                                

In [7]:
df.show(10)

+-------+--------+--------------------+--------------------+--------------------+----+------------+--------------+-----+--------------+-------------+-----------+------------+--------------+---------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+----------+----------+-----------+-------------+------------------+--------+------------------------+-------------------+----------------------+-----------------+
|     ID|Severity|          Start_Time|            End_Time|        Distance(mi)|Side|        City|        County|State|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Speed(mph)|Precipitation(in)|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Population|Start_Year|Start_Month|Start_Weekday|Start_TOD_Category|Duration|Start_TOD_Category_index|Start_Weekday_index|Start_TOD_Category_vec

### Severity

#### Address imbalance, calculate PS, model eval, matching, calculate ATE

In [8]:
fraction = [0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
covariates = ['Side', 'Wind_Chill(F)', 'Pressure(in)', 'Wind_Speed(mph)', 'Amenity', 'Bump', 'Crossing', 
              'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Precipitation(in)', 'Traffic_Calming', 
              'Traffic_Signal', 'Turning_Loop', 'Population', 'Start_Weekday_vec', 'Start_TOD_Category_vec']
selected_columns = ["Severity", "Stop"] + covariates

In [9]:
from sklearn.neighbors import NearestNeighbors

for i in fraction:
    print("\n")
    print("Fraction = ", i)
    df_major = df.filter(F.col("Stop") == 0).sample(withReplacement=False, fraction=i, seed=1) 
    df_minor = df.filter(F.col("Stop") == 1)
    df_balanced = df_minor.union(df_major)
    df_ps = df_balanced.select(selected_columns)

    # Define the covariates
    assembler = VectorAssembler(inputCols=covariates, outputCol="features")
    train_data, test_data = df_ps.randomSplit([0.8, 0.2], seed=1)
    lr = LogisticRegression(featuresCol='features', labelCol='Stop', maxIter=10)
    pipeline = Pipeline(stages=[assembler, lr])
    model = pipeline.fit(train_data)

    # Predict and add the propensity scores to the DataFrame
    ps = model.transform(test_data).select('Severity', 'Stop', 'features', 'probability')

    # Extract the probability of treatment (i.e., the propensity score)
    extract_prob = udf(lambda x: float(x[1]), FloatType())
    ps = ps.withColumn("propensity_score", extract_prob('probability'))

    # Evaluate model performance on the test data
    auc = BinaryClassificationEvaluator(labelCol='Stop', rawPredictionCol='probability', metricName='areaUnderROC')
    AUC = round(auc.evaluate(ps), 3)
    print('AUC =', AUC)

    ps_pd = ps.toPandas()

    # Create two dataframes for treatment and control groups
    df_treatment = ps_pd[ps_pd['Stop'] == 1]
    df_control = ps_pd[ps_pd['Stop'] == 0]

    # Fit nearest neighbors model to control group
    nbrs = NearestNeighbors(n_neighbors=1).fit(df_control[['propensity_score']])

    # Find nearest neighbors in control group for each treatment case
    distances, indices = nbrs.kneighbors(df_treatment[['propensity_score']])

    # Create dataframe of distances and indices
    matches = pd.DataFrame({'distance': distances.flatten(), 'control_index': indices.flatten(),
                            'treatment_index': df_treatment.index})

    # Merge data from treatment and control cases into the matches dataframe
    matched_pairs = matches.merge(df_treatment, left_on='treatment_index', right_index=True) \
        .merge(df_control, left_on='control_index', right_index=True, suffixes=('_treatment', '_control'))
    
    matched_pairs['Severity_treatment'] = matched_pairs['Severity_treatment'].astype(float)
    matched_pairs['Severity_control'] = matched_pairs['Severity_control'].astype(float)

    matched_pairs['treatment_effect'] = matched_pairs['Severity_treatment'] - matched_pairs['Severity_control']
    average_treatment_effect = matched_pairs['treatment_effect'].mean()
    print("ATE =", round(average_treatment_effect, 4))



Fraction =  0.05


23/06/08 16:25:43 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/06/08 16:25:43 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

AUC = 0.724


                                                                                

ATE = -0.0033


Fraction =  0.1


                                                                                

AUC = 0.721


                                                                                

ATE = 0.0038


Fraction =  0.2


                                                                                

AUC = 0.723


                                                                                

ATE = -0.0252


Fraction =  0.3


                                                                                

AUC = 0.722


                                                                                

ATE = -0.0007


Fraction =  0.4


                                                                                

AUC = 0.722


                                                                                

ATE = 0.0128


Fraction =  0.5


                                                                                

AUC = 0.722


                                                                                

ATE = -0.0015


Fraction =  0.6


                                                                                

AUC = 0.723


                                                                                

ATE = -0.0058


Fraction =  0.7


                                                                                

AUC = 0.723


                                                                                

ATE = -0.0073


Fraction =  0.8


                                                                                

AUC = 0.723


                                                                                

ATE = 0.0064


#### Summary

The average treatment effect of having a stop sign nearby on accident severity (measured on a scale of 1 to 4, with 4 being the most severe) is -0.003 unit. 

### Duration

#### Address imbalance, calculate PS, model eval, matching, calculate ATE

In [11]:
fraction = [0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
covariates = ['Side', 'Wind_Chill(F)', 'Pressure(in)', 'Wind_Speed(mph)', 'Amenity', 'Bump', 'Crossing', 
              'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Precipitation(in)', 'Traffic_Calming', 
              'Traffic_Signal', 'Turning_Loop', 'Population', 'Start_Weekday_vec', 'Start_TOD_Category_vec']
selected_columns = ["Duration", "Stop"] + covariates

In [12]:
from sklearn.neighbors import NearestNeighbors

for i in fraction:
    print("\n")
    print("Fraction = ", i)
    df_major = df.filter(F.col("Stop") == 0).sample(withReplacement=False, fraction=i, seed=1) 
    df_minor = df.filter(F.col("Stop") == 1)
    df_balanced = df_minor.union(df_major)
    df_ps = df_balanced.select(selected_columns)

    # Define the covariates
    assembler = VectorAssembler(inputCols=covariates, outputCol="features")
    train_data, test_data = df_ps.randomSplit([0.8, 0.2], seed=1)
    lr = LogisticRegression(featuresCol='features', labelCol='Stop', maxIter=10)
    pipeline = Pipeline(stages=[assembler, lr])
    model = pipeline.fit(train_data)

    # Predict and add the propensity scores to the DataFrame
    ps = model.transform(test_data).select('Duration', 'Stop', 'features', 'probability')

    # Extract the probability of treatment (i.e., the propensity score)
    extract_prob = udf(lambda x: float(x[1]), FloatType())
    ps = ps.withColumn("propensity_score", extract_prob('probability'))

    # Evaluate model performance on the test data
    auc = BinaryClassificationEvaluator(labelCol='Stop', rawPredictionCol='probability', metricName='areaUnderROC')
    AUC = round(auc.evaluate(ps), 3)
    print('AUC =', AUC)

    ps_pd = ps.toPandas()

    # Create two dataframes for treatment and control groups
    df_treatment = ps_pd[ps_pd['Stop'] == 1]
    df_control = ps_pd[ps_pd['Stop'] == 0]

    # Fit nearest neighbors model to control group
    nbrs = NearestNeighbors(n_neighbors=1).fit(df_control[['propensity_score']])

    # Find nearest neighbors in control group for each treatment case
    distances, indices = nbrs.kneighbors(df_treatment[['propensity_score']])

    # Create dataframe of distances and indices
    matches = pd.DataFrame({'distance': distances.flatten(), 'control_index': indices.flatten(),
                            'treatment_index': df_treatment.index})

    # Merge data from treatment and control cases into the matches dataframe
    matched_pairs = matches.merge(df_treatment, left_on='treatment_index', right_index=True) \
        .merge(df_control, left_on='control_index', right_index=True, suffixes=('_treatment', '_control'))
    
    matched_pairs['treatment_effect'] = matched_pairs['Duration_treatment'] - matched_pairs['Duration_control']
    average_treatment_effect = matched_pairs['treatment_effect'].mean()
    print("ATE =", round(average_treatment_effect, 4))



Fraction =  0.05


                                                                                

AUC = 0.723


                                                                                

ATE = 0.5207


Fraction =  0.1


                                                                                

AUC = 0.72


                                                                                

ATE = 0.4219


Fraction =  0.2


                                                                                

AUC = 0.721


                                                                                

ATE = 0.3575


Fraction =  0.3


                                                                                

AUC = 0.721


                                                                                

ATE = 0.388


Fraction =  0.4


                                                                                

AUC = 0.723


                                                                                

ATE = 0.5165


Fraction =  0.5


                                                                                

AUC = 0.723


                                                                                

ATE = 0.4512


Fraction =  0.6


                                                                                

AUC = 0.723


                                                                                

ATE = 0.4366


Fraction =  0.7


                                                                                

AUC = 0.722


                                                                                

ATE = 0.441


Fraction =  0.8


                                                                                

AUC = 0.723


                                                                                

ATE = 0.4415


#### Summary

The average treatment effect of having a stop sign nearby on accident duration (measured in hours) is 0.51 hour. 

### Distance

#### Address imbalance, calculate PS, model eval, matching, calculate ATE

In [13]:
fraction = [0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
covariates = ['Side', 'Wind_Chill(F)', 'Pressure(in)', 'Wind_Speed(mph)', 'Amenity', 'Bump', 'Crossing', 
              'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Precipitation(in)', 'Traffic_Calming', 
              'Traffic_Signal', 'Turning_Loop', 'Population', 'Start_Weekday_vec', 'Start_TOD_Category_vec']
selected_columns = ["Distance(mi)", "Stop"] + covariates

In [14]:
from sklearn.neighbors import NearestNeighbors

for i in fraction:
    print("\n")
    print("Fraction = ", i)
    df_major = df.filter(F.col("Stop") == 0).sample(withReplacement=False, fraction=i, seed=1) 
    df_minor = df.filter(F.col("Stop") == 1)
    df_balanced = df_minor.union(df_major)
    df_ps = df_balanced.select(selected_columns)

    # Define the covariates
    assembler = VectorAssembler(inputCols=covariates, outputCol="features")
    train_data, test_data = df_ps.randomSplit([0.8, 0.2], seed=1)
    lr = LogisticRegression(featuresCol='features', labelCol='Stop', maxIter=10)
    pipeline = Pipeline(stages=[assembler, lr])
    model = pipeline.fit(train_data)

    # Predict and add the propensity scores to the DataFrame
    ps = model.transform(test_data).select('Distance(mi)', 'Stop', 'features', 'probability')

    # Extract the probability of treatment (i.e., the propensity score)
    extract_prob = udf(lambda x: float(x[1]), FloatType())
    ps = ps.withColumn("propensity_score", extract_prob('probability'))

    # Evaluate model performance on the test data
    auc = BinaryClassificationEvaluator(labelCol='Stop', rawPredictionCol='probability', metricName='areaUnderROC')
    AUC = round(auc.evaluate(ps), 3)
    print('AUC =', AUC)

    ps_pd = ps.toPandas()

    # Create two dataframes for treatment and control groups
    df_treatment = ps_pd[ps_pd['Stop'] == 1]
    df_control = ps_pd[ps_pd['Stop'] == 0]

    # Fit nearest neighbors model to control group
    nbrs = NearestNeighbors(n_neighbors=1).fit(df_control[['propensity_score']])

    # Find nearest neighbors in control group for each treatment case
    distances, indices = nbrs.kneighbors(df_treatment[['propensity_score']])

    # Create dataframe of distances and indices
    matches = pd.DataFrame({'distance': distances.flatten(), 'control_index': indices.flatten(),
                            'treatment_index': df_treatment.index})

    # Merge data from treatment and control cases into the matches dataframe
    matched_pairs = matches.merge(df_treatment, left_on='treatment_index', right_index=True) \
        .merge(df_control, left_on='control_index', right_index=True, suffixes=('_treatment', '_control'))
    
    matched_pairs['treatment_effect'] = matched_pairs['Distance(mi)_treatment'] - matched_pairs['Distance(mi)_control']
    average_treatment_effect = matched_pairs['treatment_effect'].mean()
    print("ATE =", round(average_treatment_effect, 4))



Fraction =  0.05


                                                                                

AUC = 0.722


                                                                                

ATE = -0.402


Fraction =  0.1


                                                                                

AUC = 0.72


                                                                                

ATE = -0.3131


Fraction =  0.2


                                                                                

AUC = 0.722


                                                                                

ATE = -0.2628


Fraction =  0.3


                                                                                

AUC = 0.723


                                                                                

ATE = -0.4389


Fraction =  0.4


                                                                                

AUC = 0.723


                                                                                

ATE = -0.42


Fraction =  0.5


                                                                                

AUC = 0.723


                                                                                

ATE = -0.3973


Fraction =  0.6


                                                                                

AUC = 0.723


                                                                                

ATE = -0.3984


Fraction =  0.7


                                                                                

AUC = 0.722


                                                                                

ATE = -0.408


Fraction =  0.8


                                                                                

AUC = 0.723


                                                                                

ATE = -0.3985


#### Summary

The average treatment effect of having a stop sign nearby on accident distance impacted (measured in miles) is -0.44 miles. 