# Traffic Crashes in Chicago

Flights analysis is going to be performed as follows:

1. PySpark **environment setup**
2. Data source and **Spark data abstraction** (DataFrame) **set up**
3. Data set **metadata analysis**:
  1. Display **schema and size** of the DataFrame
  2. Get one or multiple **random samples** from the data set to better understand what the data is all about
  3. Identify **data entities**, **metrics** and **dimensions**
  4. **Columns/fields categorization**
4. Columns groups **basic profiling** to better understand our data set:
  1. **Timing related** columns basic profiling
  2. **Crash related** columns basic profiling
  3. **Location and Condition of Streets related** columns basic profiling
5. **Business Questions** for Traffic Crashes in Chicago
  1. **Which streets in the City of Chicago had the most crashes?**
  2. **What is the Beat (police area code) with most traffic crashes?** 
  3. **What hour of the day is it most likely for traffic crashes to happen? and what is the probability that there is a mortal victim?**
  4. **How does weather influence traffic crashes?** 
Let's go for it:

## 1. PySpark environment setup

In [1]:
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

## 2. Data source and Spark data abstraction (DataFrame) setup

In [2]:
CrashesDF = spark.read \
                 .option("inferSchema", "true") \
                 .option("header", "true") \
                 .csv("crashes_data.csv")

## 3. Data set metadata analysis
### A. Display schema and size of the DataFrame

In [3]:
from IPython.display import display, Markdown

CrashesDF.printSchema()
display(Markdown("This DataFrame has **%d rows**." % CrashesDF.count()))

root
 |-- alignment: string (nullable = true)
 |-- beat: integer (nullable = true)
 |-- contributory_cause: string (nullable = true)
 |-- device_condition: string (nullable = true)
 |-- dooring: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- first_crash_type: string (nullable = true)
 |-- hit_and_run: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- injuries_fatal: integer (nullable = true)
 |-- injuries_incapacitating: integer (nullable = true)
 |-- injuries_no_indication: integer (nullable = true)
 |-- injuries_non_incapacitating: integer (nullable = true)
 |-- injuries_reported_not_evident: integer (nullable = true)
 |-- intersection_related: integer (nullable = true)
 |-- lane_count: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- lighting_condition: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- month: integer (nullable = true)
 |-- most_severe_injury: string (nullable = true)
 |-- num_units: int

This DataFrame has **178241 rows**.

### B. Get one or multiple random samples from the data set

In [4]:
CrashesDF.cache() # optimization to make the processing faster
CrashesDF.sample(False, 0.1).take(2)

[Row(alignment='straight_and_level', beat=1131, contributory_cause='improper_lane_usage', device_condition='functioning_properly', dooring=0, dow=5, first_crash_type='rear_end', hit_and_run=0, hour=23, injuries_fatal=0, injuries_incapacitating=0, injuries_no_indication=2, injuries_non_incapacitating=0, injuries_reported_not_evident=0, intersection_related=0, lane_count=2, latitude=41.876918244, lighting_condition='darkness,_lighted_road', longitude=-87.735494772, month=7, most_severe_injury='no_indication_of_injury', num_units=2, posted_speed_limit=35, road_defect='no_defects', roadway_surface_cond='dry', street='S KOSTNER AVE', traffic_control_device='traffic_signal', trafficway_type='divided_-_w/median_(not_raised)', weather_condition='clear', work_zone=0, year=2016, i=2),
 Row(alignment='straight_and_level', beat=223, contributory_cause='unable_to_determine', device_condition='no_controls', dooring=0, dow=5, first_crash_type='parked_motor_vehicle', hit_and_run=0, hour=17, injuries_f

### C. Data entities, metrics and dimensions

I've identified the following elements:

* **Entities:** Traffic Crashes (main one which is measured - facts), Police Beat, Contributory Cause, Device Condition & First Crash Type
* **Metrics:** Injuries_Fatal, Injuries_Incapacitating, Injuries_no_indication, Injuries_non_incapacitating, Injuries_reported_not_evident, interesection_related, Hit_and_run, Hour, Month, Year, Dow, Num_units, Posted_speed_limit
* **Dimensions:** Latitude, Longitute, Road_Defect, Road_surface_cond, Street, Traffic_control_device, trafficway_type, Weather_condition, Work_zone, lighting_condition, Lane_count, Dooring

### D. Column categorization

The following could be a potential column categorization:

* **Timing related columns:** *Dow*,*Hour*,*Year* and *Month*
* **Crash related columns:** *Contributory_cause*, *First_Crash_Type*, *Device Condition*, *Num_Units*, *Weather Condition*, *Lighting_Condtion*, *Beat*, *Injuries_Fatal*, *Injuries_Incapacitating*, *Injuries_no_indication*, *Injuries_non_incapacitating*, *Injuries_reported_not_evident*, *interesection_related* & *Hit_and_run*, *Dooring*
* **Location and Condtion of street related columns:** *Posted_speed_limit*, *Latitude*, *Longitute*, *Road_Defect*, *Road_surface_cond*,*Street*,*Traffic_control_device*,*trafficway_type*,*Work_zone* & *Lane_count*

## 4. Columns groups basic profiling to better understand our data set
### A. Timing related columns basic profiling

In [5]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit


print ("Summary of columns Day of Week, Hour, Month and Year:")
CrashesDF.select("dow","hour","month","year").summary().show()

print("Checking for nulls on columns Day of Week, Hour, Month and Year:")
CrashesDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["dow","hour","month","year"]]).show()

print("Checking amount of distinct values in columns Day of Week, Hour, Month and Year:")
CrashesDF.select([countDistinct(c).alias(c) for c in ["dow","hour","month","year"]]).show()

print ("Most and least frequent occurrences for Hour and Day of Week columns:")
HourofOccurrencesDF = CrashesDF.groupBy("hour").agg(count(lit(1)).alias("Total"))
DayOfWeekDF = CrashesDF.groupBy("dow").agg(count(lit(1)).alias("Total"))

leastFreqHour    = HourofOccurrencesDF.orderBy(col("Total").asc()).first()
mostFreqHour     = HourofOccurrencesDF.orderBy(col("Total").desc()).first()
leastFreqDayOfWeek     = DayOfWeekDF.orderBy(col("Total").asc()).first()
mostFreqDayOfWeek      = DayOfWeekDF.orderBy(col("Total").desc()).first()

display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqHour", "mostFreqHour", "leastFreqDayOfWeek", "mostFreqDayOfWeek", \
       "%d (%d occurrences)" % (leastFreqHour["hour"], leastFreqHour["Total"]), \
       "%d (%d occurrences)" % (mostFreqHour["hour"], mostFreqHour["Total"]), \
       "%d (%d occurrences)" % (leastFreqDayOfWeek  ["dow"], leastFreqDayOfWeek ["Total"]), \
       "%d (%d occurrences)" % (mostFreqDayOfWeek  ["dow"], mostFreqDayOfWeek  ["Total"]))))

Summary of columns Day of Week, Hour, Month and Year:
+-------+------------------+------------------+------------------+------------------+
|summary|               dow|              hour|             month|              year|
+-------+------------------+------------------+------------------+------------------+
|  count|            178241|            178241|            178241|            178241|
|   mean|3.1271031917460066|13.164490773727707| 6.637126138206137|  2017.29660964649|
| stddev|1.9686750715144956|  5.53206252210308|3.5287043333623647|0.8475623815502351|
|    min|                 0|                 0|                 1|              2016|
|    25%|                 1|                 9|                 4|              2017|
|    50%|                 3|                14|                 7|              2017|
|    75%|                 5|                17|                10|              2018|
|    max|                 6|                23|                12|              2019|



| leastFreqHour | mostFreqHour | leastFreqDayOfWeek | mostFreqDayOfWeek |
|----|----|----|----|
| 4 (1979 occurrences) | 16 (13556 occurrences) | 0 (21294 occurrences) | 5 (29086 occurrences) |


### B. Crash related columns basic profiling

In [6]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first

print ("Summary of columns Police Beat,Number of Units Involved, Fatal Injuries, Incapacitated Injuries and Hit & Run:")
CrashesDF.select("beat", "num_units", "injuries_fatal", "injuries_incapacitating", "hit_and_run").summary().show()

print("Checking for nulls on columns Police Beat,Number of Units Involved, Fatal Injuries, Incapacitated Injuries and Hit & Run:")
CrashesDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["beat", "num_units", "injuries_fatal", "injuries_incapacitating", "hit_and_run"]]).show()

print("Checking amount of distinct values in columns Police Beat,Number of Units Involved, Fatal Injuries, Incapacitated Injuries and Hit & Run:")
CrashesDF.select([countDistinct(c).alias(c) for c in ["beat", "num_units", "injuries_fatal", "injuries_incapacitating", "hit_and_run"]]).show()

print ("Most and least frequent occurrences for Beat, Number of Units Involved, Fatal Injuries and Incapacitated Injuries columns:")
BeatDF = CrashesDF.groupBy("beat").agg(count(lit(1)).alias("Total"))
NumOfUnitsDF   = CrashesDF.groupBy("num_units").agg(count(lit(1)).alias("Total"))
FatalInjuriesDF    = CrashesDF.groupBy("injuries_fatal").agg(count(lit(1)).alias("Total"))
IncapacitatedInjuriesDF      = CrashesDF.groupBy("injuries_incapacitating").agg(count(lit(1)).alias("Total"))

leastFreqBeat    = BeatDF.orderBy(col("Total").asc()).first()
mostFreqBeat     = BeatDF.orderBy(col("Total").desc()).first()
leastFreqNumOfUnits      = NumOfUnitsDF.orderBy(col("Total").asc()).first()
mostFreqNumOfUnits        = NumOfUnitsDF.orderBy(col("Total").desc()).first()
leastFreqFatalInjuries      = FatalInjuriesDF.orderBy(col("Total").asc()).first()
mostFreqFatalInjuries       = FatalInjuriesDF.orderBy(col("Total").desc()).first()
leastFreqIncapacitatedInjuries         = IncapacitatedInjuriesDF.orderBy(col("Total").asc()).first()
mostFreqIncapacitatedInjuries          = IncapacitatedInjuriesDF.orderBy(col("Total").desc()).first()

display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqBeat", "mostFreqBeat", "leastFreqNumOfUnits", "mostFreqNumOfUnits", \
       "%d (%d occurrences)" % (leastFreqBeat["beat"], leastFreqBeat["Total"]), \
       "%d (%d occurrences)" % (mostFreqBeat["beat"], mostFreqBeat["Total"]), \
       "%s (%d occurrences)" % (leastFreqNumOfUnits["num_units"], leastFreqNumOfUnits["Total"]), \
       "%s (%d occurrences)" % (mostFreqNumOfUnits["num_units"], mostFreqNumOfUnits["Total"]))))
display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqFatalInjuries", "mostFreqFatalInjuries", "leastFreqIncapacitatedInjuries ", "mostFreqIncapacitatedInjuries ", \
       "%s (%d occurrences)" % (leastFreqFatalInjuries["injuries_fatal"], leastFreqFatalInjuries["Total"]), \
       "%s (%d occurrences)" % (mostFreqFatalInjuries["injuries_fatal"], mostFreqFatalInjuries["Total"]), \
       "%s (%d occurrences)" % (leastFreqIncapacitatedInjuries ["injuries_incapacitating"], leastFreqIncapacitatedInjuries ["Total"]), \
       "%s (%d occurrences)" % (mostFreqIncapacitatedInjuries ["injuries_incapacitating"], mostFreqIncapacitatedInjuries ["Total"]))))

print ("Summary of columns No indication of Injuries, Injuries non Incapacitated, Injuries reported not evident and Intersection related:")
CrashesDF.select("injuries_no_indication", "injuries_non_incapacitating", "injuries_reported_not_evident", "intersection_related").summary().show()

print("Checking for nulls on columns No indication of Injuries, Injuries non Incapacitated, Injuries reported not evident and Intersection related:")
CrashesDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["injuries_no_indication", "injuries_non_incapacitating", "injuries_reported_not_evident", "intersection_related"]]).show()

print("Checking amount of distinct values in columns No indication of Injuries, Injuries non Incapacitated, Injuries reported not evident and Intersection related:")
CrashesDF.select([countDistinct(c).alias(c) for c in ["injuries_no_indication", "injuries_non_incapacitating", "injuries_reported_not_evident", "intersection_related"]]).show()

Summary of columns Police Beat,Number of Units Involved, Fatal Injuries, Incapacitated Injuries and Hit & Run:
+-------+------------------+------------------+--------------------+-----------------------+-------------------+
|summary|              beat|         num_units|      injuries_fatal|injuries_incapacitating|        hit_and_run|
+-------+------------------+------------------+--------------------+-----------------------+-------------------+
|  count|            178241|            178241|              178241|                 178241|             178241|
|   mean|1240.5327057186619| 2.029336684601186|0.001088414001267...|   0.018620856032001615|  0.266824131372692|
| stddev| 711.0807265746752|0.4209477748521104| 0.03559169382988048|    0.15873711547449149|0.44230092906238483|
|    min|               111|                 1|                   0|                      0|                  0|
|    25%|               711|                 2|                   0|                      0|      


| leastFreqBeat | mostFreqBeat | leastFreqNumOfUnits | mostFreqNumOfUnits |
|----|----|----|----|
| 1654 (28 occurrences) | 1834 (2758 occurrences) | 12 (1 occurrences) | 2 (158129 occurrences) |



| leastFreqFatalInjuries | mostFreqFatalInjuries | leastFreqIncapacitatedInjuries  | mostFreqIncapacitatedInjuries  |
|----|----|----|----|
| 3 (2 occurrences) | 0 (178061 occurrences) | 6 (1 occurrences) | 0 (175372 occurrences) |


Summary of columns No indication of Injuries, Injuries non Incapacitated, Injuries reported not evident and Intersection related:
+-------+----------------------+---------------------------+-----------------------------+--------------------+
|summary|injuries_no_indication|injuries_non_incapacitating|injuries_reported_not_evident|intersection_related|
+-------+----------------------+---------------------------+-----------------------------+--------------------+
|  count|                178241|                     178241|                       178241|              178241|
|   mean|    2.0499941090994778|         0.0922683333239827|          0.05804500647999057| 0.22542512665436124|
| stddev|     1.199611655663729|        0.39130870589728234|           0.3073335616306294| 0.41786315768896276|
|    min|                     0|                          0|                            0|                   0|
|    25%|                     1|                          0|                          

### C. Location and Street Condition related columns basic profiling

In [33]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first

print ("Summary of columns Posted Speed Limit and Is the area an active Work Zone :")
CrashesDF.select("posted_speed_limit", "work_zone").summary().show()

print("Checking for nulls on columns Posted Speed Limit and Is the area an active Work Zone:")
CrashesDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["posted_speed_limit", "work_zone"]]).show()

print("Checking amount of distinct values in columns Posted Speed Limit and Is the area an active Work Zone")
CrashesDF.select([countDistinct(c).alias(c) for c in ["posted_speed_limit", "work_zone"]]).show()


Summary of columns Posted Speed Limit and Is the area an active Work Zone :
+-------+------------------+--------------------+
|summary|posted_speed_limit|           work_zone|
+-------+------------------+--------------------+
|  count|            178241|              178241|
|   mean| 28.44888661980128|0.006911989946196442|
| stddev| 6.452489404136152| 0.08285078667252987|
|    min|                 0|                   0|
|    25%|                30|                   0|
|    50%|                30|                   0|
|    75%|                30|                   0|
|    max|                70|                   1|
+-------+------------------+--------------------+

Checking for nulls on columns Posted Speed Limit and Is the area an active Work Zone:
+------------------+---------+
|posted_speed_limit|work_zone|
+------------------+---------+
|                 0|        0|
+------------------+---------+

Checking amount of distinct values in columns Posted Speed Limit and Is the area 

## 5. Business Questions for Traffic Crashes in Chicago

### 1. Which streets in the City of Chicago had the most crashes?

In [10]:
from pyspark.sql.functions import max, min, avg, stddev, sum
from pyspark.sql.types import IntegerType

display(Markdown("**Streets with most Crashes**:"))
CrashesDF.groupBy("street")\
              .agg(sum("num_units").alias("Total Nº of Crashes"),\
                   sum("injuries_fatal").alias("Total Nº of Fatalities"),\
                   min("num_units").alias("Lowest Nº of Crashes"),\
                   max("num_units").alias("Highest Nº of Crashes"))\
              .sort("Total Nº of Crashes", ascending=False).show()

**Streets with most Crashes**:

+------------------+-------------------+----------------------+--------------------+---------------------+
|            street|Total Nº of Crashes|Total Nº of Fatalities|Lowest Nº of Crashes|Highest Nº of Crashes|
+------------------+-------------------+----------------------+--------------------+---------------------+
|     S WESTERN AVE|               5459|                     2|                   1|                    6|
|      S PULASKI RD|               5347|                     3|                   1|                    8|
|     S ASHLAND AVE|               5194|                     3|                   1|                    5|
|      S HALSTED ST|               4956|                     5|                   1|                    6|
|      S CICERO AVE|               4272|                     3|                   1|                    5|
|       W NORTH AVE|               4233|                     5|                   1|                    7|
|     N WESTERN AVE|               41

### 2. What is the Beat (police area code) with most traffic crashes?

In [9]:
from pyspark.sql.functions import max, min, avg, stddev, sum
from pyspark.sql.types import IntegerType

display(Markdown("**Most traffic crashes by Police Beat**:"))
CrashesDF.groupBy("beat")\
              .agg(sum("num_units").alias("Total Nº of Crashes"),\
                   sum("injuries_fatal").alias("Total Nº of Fatalities"),\
                   (sum("injuries_fatal")/sum("num_units")*100).alias("Fatalities : Nº of Crashes t (%)"))\
              .sort("Total Nº of Crashes", ascending=False).show()

**Most traffic crashes by Police Beat**:

+----+-------------------+----------------------+--------------------------------+
|beat|Total Nº of Crashes|Total Nº of Fatalities|Fatalities : Nº of Crashes t (%)|
+----+-------------------+----------------------+--------------------------------+
|1834|               5528|                     0|                             0.0|
| 114|               4570|                     1|             0.02188183807439825|
| 122|               4474|                     0|                             0.0|
|1831|               4366|                     2|             0.04580852038479157|
| 813|               3329|                     1|            0.030039050765995796|
| 815|               3072|                     4|             0.13020833333333331|
|1232|               2946|                     2|              0.0678886625933469|
| 331|               2923|                     8|             0.27369141293191923|
|2413|               2819|                     1|              0.0354735721887194|
|183

### 3. What hour of the day is it most likely for traffic crashes to happen? and what is the probability that there is a mortal victim?

In [8]:
from pyspark.sql.functions import max, min, avg, stddev, sum
from pyspark.sql.types import IntegerType

display(Markdown("**Hour of the day is it most likely for traffic crashes & probability that there is a mortal victim**:"))
CrashesDF.groupBy("hour")\
              .agg(sum("num_units").alias("Total Nº of Crashes"),\
                   avg("num_units").alias("Average Nº of Crashes"),\
                   sum("injuries_fatal").alias("Total Nº of Fatalities"),\
                   (sum("injuries_fatal")/sum("num_units")*100).alias("Probability of Fatality (%)"))\
              .sort("hour", ascending=True).show(24)

**Hour of the day is it most likely for traffic crashes & probability that there is a mortal victim**:

+----+-------------------+---------------------+----------------------+---------------------------+
|hour|Total Nº of Crashes|Average Nº of Crashes|Total Nº of Fatalities|Probability of Fatality (%)|
+----+-------------------+---------------------+----------------------+---------------------------+
|   0|               7155|   2.0402053036783574|                     7|         0.0978336827393431|
|   1|               6211|    2.055261416280609|                    21|        0.33810980518435035|
|   2|               5572|    2.041773543422499|                    17|         0.3050969131371142|
|   3|               4415|   2.0336250575771535|                     8|         0.1812004530011325|
|   4|               4040|   2.0414350682162707|                    10|        0.24752475247524752|
|   5|               5111|   2.0106215578284816|                     9|        0.17609078458227354|
|   6|               8561|   2.0115131578947367|                     3|         0.0350426352061675|


### 4. How does weather influence traffic crashes?

In [7]:
from pyspark.sql.functions import max, min, avg, stddev, sum
from pyspark.sql.types import IntegerType

display(Markdown("**Weather Vs. Traffic Crashes**:"))
CrashesDF.groupBy("weather_condition")\
              .agg(sum("num_units").alias("Total Nº of Crashes"),\
                   sum("injuries_fatal").alias("Total Nº of Fatalities"),\
                   (sum("injuries_fatal")/sum("num_units")*100).alias("% of Deaths due to Weather type"))\
              .sort("Total Nº of Crashes", ascending=True).show()

**Weather Vs. Traffic Crashes**:

+--------------------+-------------------+----------------------+-------------------------------+
|   weather_condition|Total Nº of Crashes|Total Nº of Fatalities|% of Deaths due to Weather type|
+--------------------+-------------------+----------------------+-------------------------------+
|severe_cross_wind...|                 66|                     0|                            0.0|
|          sleet/hail|                629|                     0|                            0.0|
|      fog/smoke/haze|                814|                     0|                            0.0|
|               other|               1188|                     1|            0.08417508417508417|
|     cloudy/overcast|              11554|                     3|            0.02596503375454388|
|             unknown|              13817|                     5|            0.03618730549323298|
|                snow|              14755|                     8|           0.054218908844459505|
|                rai