In [None]:
pip install pyspark

In [1]:
import os
os.environ["JAVA_HOME"] = "C:/Program Files/Java/jdk1.8.0_251"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

In [2]:
# Importing pyspark
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('Accident').getOrCreate()

In [None]:
df = spark.read.csv('US_Accidents_June20.csv', header = True,inferSchema = True)

In [None]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- TMC: double (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: string (nullable = true)
 |-- End_Time: string (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Number: double (nullable = true)
 |-- Street: string (nullable = true)
 |-- Side: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: string (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable =

# Droping unwanted columns

In [None]:
# drop columns
columns_to_drop = ['End_Lat', 'End_Lng','Wind_Chill(F)','Number', 'ID']
df = df.drop(*columns_to_drop)
df.columns
print("Total columns : ",df.count)

Total columns :  <bound method DataFrame.count of DataFrame[Source: string, TMC: double, Severity: int, Start_Time: string, End_Time: string, Start_Lat: double, Start_Lng: double, Distance(mi): double, Description: string, Street: string, Side: string, City: string, County: string, State: string, Zipcode: string, Country: string, Timezone: string, Airport_Code: string, Weather_Timestamp: string, Temperature(F): double, Humidity(%): double, Pressure(in): double, Visibility(mi): double, Wind_Direction: string, Wind_Speed(mph): double, Precipitation(in): double, Weather_Condition: string, Amenity: boolean, Bump: boolean, Crossing: boolean, Give_Way: boolean, Junction: boolean, No_Exit: boolean, Railway: boolean, Roundabout: boolean, Station: boolean, Stop: boolean, Traffic_Calming: boolean, Traffic_Signal: boolean, Turning_Loop: boolean, Sunrise_Sunset: string, Civil_Twilight: string, Nautical_Twilight: string, Astronomical_Twilight: string]>


In [None]:
# percentage of null values 
print(df.count()/3513617)

1.0


In [None]:
# calculating null in TMC
df.groupBy('TMC').count().orderBy('count')
# droping nan values in TMC
df.na.drop(subset=["TMC"])

DataFrame[Source: string, TMC: double, Severity: int, Start_Time: string, End_Time: string, Start_Lat: double, Start_Lng: double, Distance(mi): double, Description: string, Street: string, Side: string, City: string, County: string, State: string, Zipcode: string, Country: string, Timezone: string, Airport_Code: string, Weather_Timestamp: string, Temperature(F): double, Humidity(%): double, Pressure(in): double, Visibility(mi): double, Wind_Direction: string, Wind_Speed(mph): double, Precipitation(in): double, Weather_Condition: string, Amenity: boolean, Bump: boolean, Crossing: boolean, Give_Way: boolean, Junction: boolean, No_Exit: boolean, Railway: boolean, Roundabout: boolean, Station: boolean, Stop: boolean, Traffic_Calming: boolean, Traffic_Signal: boolean, Turning_Loop: boolean, Sunrise_Sunset: string, Civil_Twilight: string, Nautical_Twilight: string, Astronomical_Twilight: string]

# Filling missing data

In [None]:
from pyspark.sql.functions import mean
# taking mean values for temperature, pressure, humidity,visibility,Wind_Speed and Precipitation
# for temperature
temp_mean = df.select(mean(df['Temperature(F)'])).collect()
mean_temp = temp_mean[0][0]
# filling null value with average of temperature data
df.na.fill(mean_temp,["Temperature(F)"]).show()

+--------+-----+--------+-------------------+-------------------+---------+----------+------------+--------------------+--------------------+----+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|  Source|  TMC|Severity|         Start_Time|           End_Time|Start_Lat| Start_Lng|Distance(mi)|         Description|              Street|Side|        City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station| Stop

In [None]:
# for humidity
df.na.fill(df.select(mean(df['Humidity(%)'])).collect()[0][0],['Humidity(%)'])
# for visibility
df.na.fill(df.select(mean(df['Visibility(mi)'])).collect()[0][0],['Visibility(mi)'])
# for pressure
df.na.fill(df.select(mean(df['Pressure(in)'])).collect()[0][0],['Pressure(in)'])
# for wind speed
df.na.fill(df.select(mean(df['Wind_Speed(mph)'])).collect()[0][0],['Wind_Speed(mph)'])
# for precipitation
df.na.fill(df.select(mean(df['Precipitation(in)'])).collect()[0][0],['Precipitation(in)'])

DataFrame[Source: string, TMC: double, Severity: int, Start_Time: string, End_Time: string, Start_Lat: double, Start_Lng: double, Distance(mi): double, Description: string, Street: string, Side: string, City: string, County: string, State: string, Zipcode: string, Country: string, Timezone: string, Airport_Code: string, Weather_Timestamp: string, Temperature(F): double, Humidity(%): double, Pressure(in): double, Visibility(mi): double, Wind_Direction: string, Wind_Speed(mph): double, Precipitation(in): double, Weather_Condition: string, Amenity: boolean, Bump: boolean, Crossing: boolean, Give_Way: boolean, Junction: boolean, No_Exit: boolean, Railway: boolean, Roundabout: boolean, Station: boolean, Stop: boolean, Traffic_Calming: boolean, Traffic_Signal: boolean, Turning_Loop: boolean, Sunrise_Sunset: string, Civil_Twilight: string, Nautical_Twilight: string, Astronomical_Twilight: string]

In [None]:
# droping row with null values
df.na.drop().count()

899960

In [None]:
# describe
df.describe().show()

In [None]:
# droping other unwanted data
column_to_drop = ['Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight']
df = df.drop(*column_to_drop)
df.columns

['Source',
 'TMC',
 'Severity',
 'Start_Time',
 'End_Time',
 'Start_Lat',
 'Start_Lng',
 'Distance(mi)',
 'Description',
 'Street',
 'Side',
 'City',
 'County',
 'State',
 'Zipcode',
 'Country',
 'Timezone',
 'Airport_Code',
 'Weather_Timestamp',
 'Temperature(F)',
 'Humidity(%)',
 'Pressure(in)',
 'Visibility(mi)',
 'Wind_Direction',
 'Wind_Speed(mph)',
 'Precipitation(in)',
 'Weather_Condition',
 'Amenity',
 'Bump',
 'Crossing',
 'Give_Way',
 'Junction',
 'No_Exit',
 'Railway',
 'Roundabout',
 'Station',
 'Stop',
 'Traffic_Calming',
 'Traffic_Signal',
 'Turning_Loop',
 'Sunrise_Sunset']

In [None]:
# generating dummy for source
from pyspark.sql.functions import when
df = df.withColumn("e_source_A", when(df.Source == "MapQuest", 1).otherwise(0)).withColumn("e_source_B", when(df.Source == "MapQuest-Bing", 1).otherwise(0)).withColumn("e_source_c", when(df.Source == "Bing", 1).otherwise(0))

In [None]:
df.groupBy('Source').count().show()

+-------------+-------+
|       Source|  count|
+-------------+-------+
|         Bing|1034799|
|     MapQuest|2414301|
|MapQuest-Bing|  64517|
+-------------+-------+



In [None]:
# df.drop('Source').collect()
df.columns

['Source',
 'TMC',
 'Severity',
 'Start_Time',
 'End_Time',
 'Start_Lat',
 'Start_Lng',
 'Distance(mi)',
 'Description',
 'Street',
 'Side',
 'City',
 'County',
 'State',
 'Zipcode',
 'Country',
 'Timezone',
 'Airport_Code',
 'Weather_Timestamp',
 'Temperature(F)',
 'Humidity(%)',
 'Pressure(in)',
 'Visibility(mi)',
 'Wind_Direction',
 'Wind_Speed(mph)',
 'Precipitation(in)',
 'Weather_Condition',
 'Amenity',
 'Bump',
 'Crossing',
 'Give_Way',
 'Junction',
 'No_Exit',
 'Railway',
 'Roundabout',
 'Station',
 'Stop',
 'Traffic_Calming',
 'Traffic_Signal',
 'Turning_Loop',
 'Sunrise_Sunset',
 'e_source_A',
 'e_source_B',
 'e_source_c']

In [None]:
# Taking time and date
# from pyspark.sql.functions import format_number,dayofmonth,hour,dayofyear,month,year,weekofyear,date_format

# df2 = df.withColumnRenamed("Month",df.select(dayofmonth(df['Start_Time'])))
# df2 = df.withColumnRenamed("Hour",df.select(hour(df['Start_Time'])))
# df2 = df.withColumnRenamed("Year",df.select(dayofyear(df['Start_Time']))).show()


In [None]:
# calculating duration

from pyspark.sql import functions as F
timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
timeDiff = (F.unix_timestamp('End_Time', format=timeFmt)
            - F.unix_timestamp('Start_Time', format=timeFmt))
df = df.withColumn("Duration", timeDiff)
df.columns

['Source',
 'TMC',
 'Severity',
 'Start_Time',
 'End_Time',
 'Start_Lat',
 'Start_Lng',
 'Distance(mi)',
 'Description',
 'Street',
 'Side',
 'City',
 'County',
 'State',
 'Zipcode',
 'Country',
 'Timezone',
 'Airport_Code',
 'Weather_Timestamp',
 'Temperature(F)',
 'Humidity(%)',
 'Pressure(in)',
 'Visibility(mi)',
 'Wind_Direction',
 'Wind_Speed(mph)',
 'Precipitation(in)',
 'Weather_Condition',
 'Amenity',
 'Bump',
 'Crossing',
 'Give_Way',
 'Junction',
 'No_Exit',
 'Railway',
 'Roundabout',
 'Station',
 'Stop',
 'Traffic_Calming',
 'Traffic_Signal',
 'Turning_Loop',
 'Sunrise_Sunset',
 'e_source_A',
 'e_source_B',
 'e_source_c',
 'Duration']

# Data visualization

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.graph_objects as go
# converting pyspark to pandas
data = df.toPandas()
# data = pd.read_csv('US_Accidents_June20.csv')

NameError: ignored

In [None]:
# visualize traffic region
state_count_acc = pd.value_counts(data['State'])

fig = go.Figure(data=go.Choropleth(
    locations=state_count_acc.index,
    z = state_count_acc.values.astype(float),
    locationmode = 'USA-states',
    colorscale = 'Reds',
    colorbar_title = "Count Accidents",))

fig.update_layout(
    title_text = '2016 - 2019 US Traffic Accident Dataset by State',
    geo_scope='usa',)

fig.show()

NameError: ignored

In [None]:
# df_county = pd.read_csv('https://raw.githubusercontent.com/plotly/datasets/master/laucnty16.csv')

In [None]:
import random
import folium
from folium import plugins
from mpl_toolkits.basemap import Basemap
from matplotlib import animation,rc
from IPython.display import HTML, display
import io
import warnings
warnings.filterwarnings('ignore')
import codecs
import base64
import plotly.express as px
import plotly.figure_factory as ff
import plotly.graph_objects as go

plt.style.use("fivethirtyeight")
plt.rcParams['figure.figsize'] = (8, 6)

In [None]:
# Accident zone via city
data_sever = data.sample(n=10000)[['Start_Lng','Start_Lat','City','Visibility(mi)','Severity']]
data_sever.dropna(inplace=True)

fig = go.Figure(data=go.Scattergeo(
        locationmode = 'USA-states',
        lon = data_sever['Start_Lng'],
        lat = data_sever['Start_Lat'],
        text = data_sever['City'],
        mode = 'markers',
        marker = dict(
            size = data_sever['Visibility(mi)'],
            opacity = 0.8,
            reversescale = True,
            autocolorscale = False,
            symbol = 'circle',
            line = dict(
                width=1,
                color='rgba(102, 102, 102)'
            ),
            colorscale = 'Blues',
            cmin = data_sever['Severity'].max(),
        color = data_sever['Severity'],
        cmax = 1,
            colorbar_title="Severity"
        )))

fig.update_layout(
        title = 'Severity & Visibility of accidents',
        geo = dict(
            scope='usa',
            projection_type='albers usa',
            showland = True,
            landcolor = "rgb(250, 250, 250)",
            subunitcolor = "rgb(217, 217, 217)",
            countrycolor = "rgb(217, 217, 217)",
            countrywidth = 0.7,
            subunitwidth = 0.7
        ),
    )
fig.show()

In [None]:
# Bay area heat map
bay_area_counties=['Alameda' 'San Francisco', 'San Mateo', 'Santa Clara',]
CA=df[df["State"]=='CA']
bay_area=CA[CA['County'].isin(bay_area_counties)]
bay_area['Start_Time']=pd.to_datetime(bay_area['Start_Time'])

fig1 = px.density_mapbox(bay_area, lat='Start_Lat', lon='Start_Lng', radius=5, color_continuous_scale='Reds',
                        mapbox_style="stamen-terrain",)
fig1.update_layout(title = 'Bay Area Accidents Heatmap')

fig1.show()

In [None]:
bay_area['roundlat']=bay_area['Start_Lat'].round(3)
bay_area['roundlon']=bay_area['Start_Lng'].round(3)
hotspot_bayarea=bay_area.groupby(['roundlat', 'roundlon'])['ID'].count().sort_values(ascending=False).reset_index()[:50]
hotspot_bayarea.columns=['lat','lon','count']

m= folium.Map(location=[37.38, -122.08], zoom_start=10,)

for lat, lng, size, in zip(hotspot_bayarea.lat, hotspot_bayarea.lon, hotspot_bayarea['count']):
    folium.CircleMarker(
        location=[lat, lng],
        radius=size/20,
        color='red',
        fill=True,
        fill_color='yellow',
        fill_opacity=0.4
    ).add_to(m)

In [None]:
# Average severity at different hour
df.groupby('Accident_hour')['Severity'].mean().plot(kind='line')
plt.xlabel('Hour of the day')
plt.ylabel('Average Severity')
plt.title('Average severity at different hours')
plt.tight_layout()

In [None]:
# Average severity at different month
df['Accident_month']=df['Start_Time'].dt.month
df.groupby('Accident_month')['Severity'].mean().plot(kind='line')
plt.ylabel('Average Severity')
plt.title('Average Severity by Month')

In [None]:
# 10 most accident states
sns.countplot(df['State'], order=df['State'].value_counts().iloc[:10].index)
plt.xticks(rotation=0)
plt.title("Top 10 states with the most accidents", fontsize=25)
plt.tight_layout()

In [None]:
# Total accident by hour
#convert datetime
df.Start_Time=pd.to_datetime(df.Start_Time)
df.End_Time=pd.to_datetime(df.End_Time)

#Plot the total accidents by years
sns.countplot(df['Start_Time'].dt.hour, hue=df['Severity'])
plt.xticks(rotation=0)
plt.title("Total accidents by hour", fontsize=25)
plt.tight_layout()

In [None]:
# Total accident by different month
#Plot the total accidents by years
sns.countplot(df['Start_Time'].dt.month)
plt.xticks(rotation=0)
plt.title("Total accidents by month", fontsize=25)
plt.tight_layout()

In [None]:
# accident by year
data = [go.Bar(x=[2016,2017,2018,2019,2020],
               y=accidents_years['Year'])]

layout = go.Layout(title='Accidents by year 2016-June 2020',
                   xaxis={'title':'Year'},
                   yaxis={'title':'Number of accidents'},
                   width=700,
                   height=600)


fig = go.Figure(data=data, layout=layout)
fig.update_yaxes(nticks=4)

py.iplot(fig)

In [None]:
# def weather(kind):
#     if 'Rain' in kind or 'Snow' in kind or 'Storm' in kind or 'Thunder' in kind or 'Drizzle' in kind:
#         return 'Slippery'
#     elif 'Fog' in kind or 'Smoke' in kind or 'Haze' in kind or 'Mist'in kind:
#         return 'Vis_obstruct'
#     else:
#         return 'Fair'
    
# weather = df.weather(df["Weather_Condition"])


In [None]:
df = df.withColumn("Amenity_1", when(df.Amenity == "TRUE", 1).otherwise(0))
df = df.withColumn("Bump_1", when(df.Bump == "TRUE", 1).otherwise(0))
df = df.withColumn("Crossing_1", when(df.Crossing == "TRUE", 1).otherwise(0))
df = df.withColumn("Give_Way_1", when(df.Give_Way == "TRUE", 1).otherwise(0))
df = df.withColumn("No_Exit_1", when(df.No_Exit == "TRUE", 1).otherwise(0))
df = df.withColumn("Railway_1", when(df.Railway == "TRUE", 1).otherwise(0))
df = df.withColumn("Roundabout_1", when(df.Roundabout == "TRUE", 1).otherwise(0))
df = df.withColumn("Station_1", when(df.Station == "TRUE", 1).otherwise(0))
df = df.withColumn("Traffic_Calming_1", when(df.Traffic_Calming == "TRUE", 1).otherwise(0))
df = df.withColumn("Traffic_Signal_1", when(df.Traffic_Signal == "TRUE", 1).otherwise(0))
df = df.withColumn("Turning_Loop_1", when(df.Turning_Loop == "TRUE", 1).otherwise(0))
df = df.withColumn("Sunrise_Sunset_1", when(df.Sunrise_Sunset == "DAY", 1).otherwise(0))
df = df.withColumn("Side_1", when(df.Side == "L", 1).otherwise(0))

In [None]:
column_to_drop = ['Source','Amenity','Bump','Crossing','Give_Way','No_Exit','Railway','Roundabout','Station','Traffic_Calming','Traffic_Signal','Turning_Loop','Sunrise_Sunset','Start_Time','End_Time','Description','Street','Side','City','County','State',
 'Zipcode',
 'Country',
 'Timezone','Airport_Code','Weather_Timestamp','Wind_Direction','Weather_Condition']
df = df.drop(*column_to_drop)
df.columns

In [None]:
df.groupBy('Severity').count().show()

# Cleaned data

In [None]:
df.printSchema()

root
 |-- TMC: double (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- Visibility(mi): double (nullable = true)
 |-- Wind_Speed(mph): double (nullable = true)
 |-- Precipitation(in): double (nullable = true)
 |-- Junction: boolean (nullable = true)
 |-- Stop: boolean (nullable = true)
 |-- e_source_A: integer (nullable = false)
 |-- e_source_B: integer (nullable = false)
 |-- e_source_c: integer (nullable = false)
 |-- Duration: long (nullable = true)
 |-- Amenity_1: integer (nullable = false)
 |-- Bump_1: integer (nullable = false)
 |-- Crossing_1: integer (nullable = false)
 |-- Give_Way_1: integer (nullable = false)
 |-- No_Exit_1: integer (nullable = false)
 |-- Railway_1: integer (nullable = false)
 |-- Roundabo

# Format for MLlib

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['TMC',
 'Start_Lat',
 'Start_Lng',
 'Distance(mi)',
 'Temperature(F)',
 'Humidity(%)',
 'Pressure(in)',
 'Visibility(mi)',
 'Wind_Speed(mph)',
 'Precipitation(in)',
 'Junction',
 'Stop',
 'e_source_A',
 'e_source_B',
 'e_source_c',
 'Duration',
 'Amenity_1',
 'Bump_1',
 'Crossing_1',
 'Give_Way_1',
 'No_Exit_1',
 'Railway_1',
 'Roundabout_1',
 'Station_1',
 'Traffic_Calming_1',
 'Traffic_Signal_1',
 'Turning_Loop_1',
 'Sunrise_Sunset_1',
 'Side_1'],outputCol='features')

In [None]:
output = assembler.transform(df)
final_data = output.select('features','Severity')

# Classification for Severity

# train and test split

In [None]:
train_churn,test_churn = final_data.randomSplit([0.7,0.3])

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
lr_churn = LogisticRegression(labelCol='Severity')

In [None]:
# fitted_churn_model = lr_churn.fit(train_churn)