## Traffic Crash Analysis

### Data importing and pre-processing

In [0]:
import requests
from pyspark.sql import SparkSession

# Make the GET request
resp = requests.get('https://data.cityofchicago.org/resource/85ca-t3if.json?$query=SELECT%20crash_record_id%2C%20crash_date_est_i%2C%20crash_date%2C%20posted_speed_limit%2C%20traffic_control_device%2C%20device_condition%2C%20weather_condition%2C%20lighting_condition%2C%20first_crash_type%2C%20trafficway_type%2C%20lane_cnt%2C%20alignment%2C%20roadway_surface_cond%2C%20road_defect%2C%20report_type%2C%20crash_type%2C%20intersection_related_i%2C%20private_property_i%2C%20hit_and_run_i%2C%20damage%2C%20date_police_notified%2C%20prim_contributory_cause%2C%20sec_contributory_cause%2C%20street_no%2C%20street_direction%2C%20street_name%2C%20beat_of_occurrence%2C%20photos_taken_i%2C%20statements_taken_i%2C%20dooring_i%2C%20work_zone_i%2C%20work_zone_type%2C%20workers_present_i%2C%20num_units%2C%20most_severe_injury%2C%20injuries_total%2C%20injuries_fatal%2C%20injuries_incapacitating%2C%20injuries_non_incapacitating%2C%20injuries_reported_not_evident%2C%20injuries_no_indication%2C%20injuries_unknown%2C%20crash_hour%2C%20crash_day_of_week%2C%20crash_month%2C%20latitude%2C%20longitude%2C%20location%20ORDER%20BY%20crash_date%20DESC%2C%20crash_record_id%20ASC')

# Create a Spark session
spark = SparkSession.builder.appName("SENG550").getOrCreate()

# Create a Spark DataFrame from the response text
df2 = spark.read.json(spark.sparkContext.parallelize([resp.text]))


In [0]:
# Show the DataFrame
df2.show(5)

+------------------+------------------+--------------------+----------------+-----------------+----------+-----------+--------------------+--------------------+-------------+--------------------+--------------------+---------+--------------------+-------------+--------------+-----------------------+----------------------+---------------------------+-----------------------------+--------------+----------------+----------------------+------------+--------------------+--------------------+-------------+--------------------+---------+--------------+------------------+-----------------------+------------------+--------------------+-----------+--------------------+----------------------+------------------+----------------+-----------+---------+----------------------+---------------+-----------------+-----------+--------------+-----------------+
|         alignment|beat_of_occurrence|          crash_date|crash_date_est_i|crash_day_of_week|crash_hour|crash_month|     crash_record_id|          

### Create RDD of wanted features

In [0]:
wanted_columns = df2.select("Crash_type","num_units","Weather_condition","Crash_date","Most_severe_injury","Longitude","Latitude")
wanted_columns.show(1)
rdd_of_features = wanted_columns.rdd.map(lambda row:[row[0],row[1],row[2],row[3],row[4],row[5],row[6]])


+--------------------+---------+-----------------+--------------------+--------------------+-------------+------------+
|          Crash_type|num_units|Weather_condition|          Crash_date|  Most_severe_injury|    Longitude|    Latitude|
+--------------------+---------+-----------------+--------------------+--------------------+-------------+------------+
|NO INJURY / DRIVE...|        4|            CLEAR|2023-12-10T00:40:...|NO INDICATION OF ...|-87.739147556|41.918734001|
+--------------------+---------+-----------------+--------------------+--------------------+-------------+------------+
only showing top 1 row



### Remove all rows where the content of one of the fields is unknown

In [0]:
print(rdd_of_features.count())
#row[0] = Crash_type, row[2] = Weather_condition,  row[4]= Most_severe_injury
cleaned_data_rdd = rdd_of_features.filter(lambda row: row[0]!="UNKNOWN"  and row[2]!="UNKNOWN"  and row[4]!="UNKNOWN" and row[5] != None and row[6] != None and row[0] != None and row[1] != None and row[2] != None and row[3] != None and row[4] != None)
print(cleaned_data_rdd.count())

1000
919


### Create Dataframe from RDD and get it ready for regression

In [0]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

cleaned_data_df = spark.createDataFrame(cleaned_data_rdd)

#_1 = Crash_type, _2 = numUnits, _3 = weather, _4 = time, _5 = injury severity, _6 = longitude, _7 = latitude
print(cleaned_data_df.dtypes)
numeric_cols = ["_2", "_6", "_7"]
for col_name in numeric_cols:    
    cleaned_data_df = cleaned_data_df.withColumn(col_name, col(col_name).cast("double"))
print(cleaned_data_df.dtypes)

string_cols = ["_1", "_3", "_4", "_5"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(cleaned_data_df) for column in string_cols ]

pipeline = Pipeline(stages=indexers)
indexed_df = pipeline.fit(cleaned_data_df).transform(cleaned_data_df)
indexed_df.show(5)

[('_1', 'string'), ('_2', 'string'), ('_3', 'string'), ('_4', 'string'), ('_5', 'string'), ('_6', 'string'), ('_7', 'string')]
[('_1', 'string'), ('_2', 'double'), ('_3', 'string'), ('_4', 'string'), ('_5', 'string'), ('_6', 'double'), ('_7', 'double')]
+--------------------+---+-----+--------------------+--------------------+-------------+------------+--------+--------+--------+--------+
|                  _1| _2|   _3|                  _4|                  _5|           _6|          _7|_1_index|_3_index|_4_index|_5_index|
+--------------------+---+-----+--------------------+--------------------+-------------+------------+--------+--------+--------+--------+
|NO INJURY / DRIVE...|4.0|CLEAR|2023-12-10T00:40:...|NO INDICATION OF ...|-87.739147556|41.918734001|     0.0|     0.0|   679.0|     0.0|
|NO INJURY / DRIVE...|2.0|CLEAR|2023-12-10T00:38:...|NO INDICATION OF ...|-87.640270308|41.869855597|     0.0|     0.0|   678.0|     0.0|
|INJURY AND / OR T...|1.0|CLEAR|2023-12-09T23:00:...|NO 

### Create Labeled Points

Partially taken from lab notebook

In [0]:
indexed_rdd = indexed_df.rdd
print(indexed_rdd.take(1)[0][10])

0.0


In [0]:
from pyspark.mllib.regression import LabeledPoint
def createRDD(values):
    return str(values[5]) +',' +str(values[1]) +',' +str(values[6]) +',' +str(values[7]) +',' +str(values[8]) +',' + str(values[9]) +',' + str(values[10])

def parsePoint(line):
    label_features = line.split(',')
    ret_val = LabeledPoint(label_features[0],label_features[1:])
    return ret_val

indexed_rdd = indexed_df.rdd
nice_rdd = indexed_rdd.map(createRDD)
print(nice_rdd.take(1))
parsedPoints = nice_rdd.map(parsePoint)
firstPoint = parsedPoints.take(1)
firstPointFeatures =firstPoint[0].features 
firstPointLabel = firstPoint[0].label
print (firstPointFeatures, firstPointLabel)
d = len(firstPointFeatures)
print(d)

['-87.739147556,4.0,41.918734001,0.0,0.0,679.0,0.0']
[4.0,41.918734001,0.0,0.0,679.0,0.0] -87.739147556
6


### Normalize features

Taken from lab notebook

In [0]:
def normalizeFeatures(lp):
    """Normalizes features in the LabeledPoint object lp.

    Args:
        lp - LabeledPoint object 

    Returns:
        LabeledPoint: The object contains the label and the normalized features
    """
    normalizedFeatures = list()
    for i in range(0,len(lp.features)):
        feature = (lp.features[i]-broadcastMean.value[i])/broadcastStdev.value[i]
        normalizedFeatures.insert(i,feature)
    return LabeledPoint(lp.label, normalizedFeatures)
    #normalizedAccidents = (lp.features[0]-broadcastMeanAccidents.value)/broadcastStdevAccidents.value
    #normalizedSnoOnGround = (lp.features[1]-broadcastMeanSnoOnGround.value)/broadcastStdevSnoOnGround.value
    #return LabeledPoint(lp.label,[normalizedAccidents,normalizedSnoOnGround])

def getNormalizedRDD(nonNormalizedRDD): 
    """Normalizes the features of the LabeldPoints contained in nonNormalizedRDD.

    Args:
        nonNormalizedRDD - RDD containing non-normalized features 

    Returns:
        returnRDD: RDD containing normalized features
    """


    meanList = list()
    stdevList = list()
    numFeatures = len(nonNormalizedRDD.take(1)[0].features)
    for i in range(0,numFeatures):
        featureRDD = nonNormalizedRDD.map(lambda lp: lp.features[i])
        featureMean = featureRDD.mean()
        featureStdev = featureRDD.stdev()
        meanList.insert(i,featureMean)
        stdevList.insert(i,featureStdev)
    global broadcastMean 
    broadcastMean = sc.broadcast(meanList)
    global broadcastStdev 
    broadcastStdev = sc.broadcast(stdevList)
    returnRDD = nonNormalizedRDD.map(normalizeFeatures)
    return returnRDD

normalizedSamplePoints = getNormalizedRDD(parsedPoints)
print(normalizedSamplePoints.take(5))

[LabeledPoint(-87.739147556, [4.736608919573033,0.6614447817838677,-0.6602556323122132,-0.27573645879954267,1.9570253721791233,-0.3842492869745239]), LabeledPoint(-87.640270308, [-0.09194748318638288,0.09687504293802254,-0.6602556323122132,-0.27573645879954267,1.952293270256998,-0.3842492869745239]), LabeledPoint(-87.601787319, [-2.506225684566091,-0.5163854822549204,1.5145648913255074,-0.27573645879954267,1.9475611683348726,-0.3842492869745239]), LabeledPoint(-87.712760178, [-0.09194748318638288,1.0677243834214338,-0.6602556323122132,-0.27573645879954267,1.942829066412747,-0.3842492869745239]), LabeledPoint(-87.684592469, [-0.09194748318638288,1.8056911281919992,-0.6602556323122132,-0.27573645879954267,1.9380969644906216,-0.3842492869745239])]


### Apply linear regression

In [0]:
weights = [.8, .2] # train/test split
seed = 42
parsedTrainData, parsedValData = normalizedSamplePoints.randomSplit(weights,seed)
parsedTrainData.cache()
parsedValData.cache()
nTrain = parsedTrainData.count()
nVal = parsedValData.count()

print(nTrain, nVal, nTrain + nVal)
print(normalizedSamplePoints.count())

713 206 919
919


In [0]:
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LinearRegressionWithSGD
# Values to use when training the linear regression model
numIters = 500  # iterations
alpha = 1.0  # step
miniBatchFrac = 1.0  # miniBatchFraction
reg = 1e-1  # regParam
regType = 'l2'  # regType
useIntercept = True  # intercept

In [0]:
firstModel = LinearRegressionWithSGD.train(parsedTrainData,numIters,alpha,miniBatchFrac,initialWeights=None,regParam=reg,regType=regType,intercept=useIntercept)

# weightsLR1 stores the model weights; interceptLR1 stores the model intercept
weightsLR1 = firstModel.weights
interceptLR1 = firstModel.intercept
print(weightsLR1, interceptLR1)



[0.10627993033097366,-0.16034459618798186,-0.2683340789856343,0.15560124587623536,-0.005662953317108078,0.14202668842758057] -79.77840401704017


In [0]:
samplePoints = parsedTrainData.take(5)
print(samplePoints)
for i in range(5):
    samplePrediction = firstModel.predict(samplePoints[i].features)
    print(samplePrediction)

[LabeledPoint(-87.739147556, [4.736608919573033,0.6614447817838677,-0.6602556323122132,-0.27573645879954267,1.9570253721791233,-0.3842492869745239]), LabeledPoint(-87.640270308, [-0.09194748318638288,0.09687504293802254,-0.6602556323122132,-0.27573645879954267,1.952293270256998,-0.3842492869745239]), LabeledPoint(-87.601787319, [-2.506225684566091,-0.5163854822549204,1.5145648913255074,-0.27573645879954267,1.9475611683348726,-0.3842492869745239]), LabeledPoint(-87.712760178, [-0.09194748318638288,1.0677243834214338,-0.6602556323122132,-0.27573645879954267,1.942829066412747,-0.3842492869745239]), LabeledPoint(-87.684592469, [-0.09194748318638288,1.8056911281919992,-0.6602556323122132,-0.27573645879954267,1.9380969644906216,-0.3842492869745239])]
-79.3124486941125
-79.73507482772948
-80.47688279999872
-79.8906916778441
-80.00899385986223


In [0]:
import math
def squaredError(label, prediction):
    sqrError = (label-prediction)*(label-prediction)
    return sqrError

def calcRMSE(labelsAndPreds):
    sqrSum = labelsAndPreds.map(lambda s: squaredError(s[0],s[1])).sum()
    return math.sqrt(sqrSum/labelsAndPreds.count())

labelsAndPreds = parsedValData.map(lambda lp: (lp.label,firstModel.predict(lp.features)))
rmseValLR1 = calcRMSE(labelsAndPreds)

#print(rmseValBase)
print(rmseValLR1)

7.941125795237553
