In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.appName('app_name') \
.master('local[*]') \
.config('spark.sql.execution.arrow.pyspark.enabled', True) \
.config('spark.sql.session.timeZone', 'UTC') \
.config('spark.driver.memory','32G') \
.config('spark.ui.showConsoleProgress', True) \
.config('spark.sql.repl.eagerEval.enabled', True) \
.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import Row
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
from pyspark.sql import functions as fn
from pyspark.ml import feature, regression, evaluation, Pipeline
import seaborn as sns
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.stat import Correlation
import matplotlib.pyplot as plt

In [2]:
# Do not delete or change this cell
import os
def is_databricks():
    # get the databricks runtime version
    db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")
    
    # if running on data bricks
    if db_env != None:
        return True
    else:
        return False

# Define a function to read the data file.
def get_training_filename(data_file_name):    
    if is_databricks():
        full_path_name = "/FileStore/tables/%s" % data_file_name
    else:
        full_path_name = data_file_name
    return full_path_name

In [3]:
# Reading the csv file in us_acci dataframe
us_traffic = spark.read.csv(get_training_filename('US_Accidents.csv'), header = True, inferSchema = True)

In [4]:
# Shape of the spark dataframe

print('Shape is ',(us_traffic.count(),len(us_traffic.columns)))

Shape is  (2845342, 47)


In [5]:
# Renaming the columns names which contain brackets for ease of usage

us_traffic = us_traffic.select('*').withColumnRenamed('Distance(mi)','Distance')\
    .withColumnRenamed('Temperature(F)', 'Temperature').withColumnRenamed('Wind_Chill(F)', 'Wind_Chill')\
    .withColumnRenamed('Humidity(%)', 'Humidity').withColumnRenamed('Pressure(in)', 'Pressure')\
    .withColumnRenamed('Visibility(mi)', 'Visibility').withColumnRenamed('Wind_Speed(mph)', 'Wind_Speed')\
    .withColumnRenamed('Precipitation(in)', 'Precipitation')

In [6]:
# Schema of dataframe with datatypes of all columns

us_traffic.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Number: double (nullable = true)
 |-- Street: string (nullable = true)
 |-- Side: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Wind_Chill: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Pressure: double (null

In [7]:
# Summary of all numerical variables in the dataset to find the skewness, outliers, mean, median

us_traffic.describe().toPandas()

Unnamed: 0,summary,ID,Severity,Start_Lat,Start_Lng,End_Lat,End_Lng,Distance,Description,Number,...,Pressure,Visibility,Wind_Direction,Wind_Speed,Precipitation,Weather_Condition,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight
0,count,2845342,2845342.0,2845342.0,2845342.0,2845342.0,2845342.0,2845342.0,2845342,1101431.0,...,2786142.0,2774796.0,2771567,2687398.0,2295884.0,2774706,2842475,2842475,2842475,2842475
1,mean,,2.137571511614421,36.24520054185866,-97.1146328901268,36.24532079368951,-97.11438709551648,0.7026778946081038,1092.0,8089.408113626728,...,29.47234438158541,9.099391310208052,,7.3950442026081875,0.0070169398802371,,,,,
2,stddev,,0.4787216185938097,5.36379745983604,18.31781912169393,5.363872991165174,18.317632423289847,1.56036082534385,177.90446874657198,18360.09399458918,...,1.0452864968682465,2.7175457451003693,,5.527453950436704,0.0934883117140287,,,,,
3,min,A-1,1.0,24.566027,-124.548074,24.566013,-124.545748,0.0,1039 GOLDEN BEAR - BOT,0.0,...,0.0,0.0,CALM,0.0,0.0,Blowing Dust,Day,Day,Day,Day
4,max,A-999999,4.0,49.00058,-67.113167,49.075,-67.10924200000001,155.186,x DRESSER RANCH PL. LL SLOSO - DIRECTIONS PER ...,9999997.0,...,58.9,140.0,West,1087.0,24.0,Wintry Mix / Windy,Night,Night,Night,Night


In [8]:
# Columns to be dropped initially as a list
drop_col = ['ID','End_Lat','End_Lng','Description','Number','Street','Zipcode','Airport_Code','Country','Weather_Timestamp','Wind_Chill','Turning_Loop']

In [9]:
# Dropping columns from the original dataset

us_traffic = us_traffic.drop(*(drop_col))

In [10]:
# Checking null values in all the columns

from pyspark.sql.functions import isnan, isnull, when, count, col

us_traffic.select([count(when(isnull(c), c)).alias(c) for c in us_traffic.columns]).show()

+--------+----------+--------+---------+---------+--------+----+----+------+-----+--------+-----------+--------+--------+----------+--------------+----------+-------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+--------------+--------------+-----------------+---------------------+
|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|Distance|Side|City|County|State|Timezone|Temperature|Humidity|Pressure|Visibility|Wind_Direction|Wind_Speed|Precipitation|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Sunrise_Sunset|Civil_Twilight|Nautical_Twilight|Astronomical_Twilight|
+--------+----------+--------+---------+---------+--------+----+----+------+-----+--------+-----------+--------+--------+----------+--------------+----------+-------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------

In [11]:
# Replacing same Wind_Direction named with different string N and North same

us_traffic = us_traffic.withColumn('Wind_Direction', when(us_traffic.Wind_Direction == 'E', 'East')\
                   .when(us_traffic.Wind_Direction == 'W', 'West')\
                   .when(us_traffic.Wind_Direction == 'N', 'North')\
                   .when(us_traffic.Wind_Direction == 'S', 'South')\
                   .when(us_traffic.Wind_Direction == 'VAR', 'Variable')\
                   .when(us_traffic.Wind_Direction == 'CALM', 'Calm')\
                   .otherwise(us_traffic.Wind_Direction))

In [12]:
# Replacing same Weather_Condition named with different string Light Rain Shower and Light Rain Showers same

us_traffic = us_traffic.withColumn('Weather_Condition', when(us_traffic.Weather_Condition == 'Light Rain Shower', 'Light Rain Showers')\
                   .when(us_traffic.Weather_Condition == 'Light Snow Shower', 'Light Snow Showers')\
                   .when(us_traffic.Weather_Condition == 'Rain Shower', 'Rain Showers')\
                   .otherwise(us_traffic.Weather_Condition))

In [13]:
# Dropping Null rows from City Column as there are only 30 rows with City = Null

us_traffic = us_traffic.where(col("city").isNotNull())

In [14]:
# Dropping Null rows from Timezone Column 

us_traffic = us_traffic.where(col("Timezone").isNotNull())

In [15]:
# Clipping Temperature extreme values to suppress outliers

lower = -30
upper = 115
us_traffic = us_traffic.withColumn('Temperature', when(us_traffic.Temperature > upper, upper)\
                  .when(us_traffic.Temperature < lower, lower).otherwise(us_traffic.Temperature).alias('Temperature'))

In [16]:
# Distance > 100 rows dropped
us_traffic = us_traffic.where(us_traffic.Distance <100)

In [17]:
# Clipping Visibility extreme values to suppress outliers 

upper = 20
us_traffic =us_traffic.withColumn('Visibility', when(us_traffic.Visibility > upper, upper)\
                  .otherwise(us_traffic.Visibility).alias('Visibility'))

In [18]:
# Clipping Wind_Speed extreme values to suppress outliers 
upper = 40
us_acci = us_traffic.withColumn('Wind_Speed', when(us_traffic.Wind_Speed > upper, upper)\
                  .otherwise(us_traffic.Wind_Speed).alias('Wind_Speed'))

In [19]:
# Replacing Null values in Precipitation with 0

us_traffic= us_traffic.withColumn('Precipitation', when(us_traffic.Precipitation.isNull(), 0).otherwise(us_traffic.Precipitation))

In [20]:
#replacing missing values in categorical attributes with the mode of the corresponding variables

for col_name in ['Wind_Direction', 'Weather_Condition']:
    common = us_traffic.dropna().groupBy(col_name).agg(fn.count('*')).orderBy('count(1)', ascending = False).first()[col_name]
    us_traffic= us_traffic.withColumn(col_name, when(isnull(col_name), common).otherwise(us_traffic[col_name]))

In [21]:
#replacing missing values in numerical attributes with the median of the corresponding variables

for col_name in ['Temperature', 'Humidity', 'Pressure', 'Visibility', 'Wind_Speed']:
    median = us_traffic.dropna().approxQuantile(col_name, [0.5], 0.00)[0]
    us_traffic = us_traffic.withColumn(col_name, when(isnull(col_name), median).otherwise(us_traffic[col_name]))

In [22]:

# Removing Null values in last 4 columns 

us_traffic =us_traffic.filter(us_traffic.Sunrise_Sunset.isNotNull())

us_traffic = us_traffic.filter(us_traffic.Civil_Twilight.isNotNull())

us_traffic = us_traffic.filter(us_traffic.Nautical_Twilight.isNotNull())

us_traffic = us_traffic.filter(us_traffic.Astronomical_Twilight.isNotNull())

In [23]:
# Adding Month of Year, Day of Week , Week of Year and Hour of the Day Column from Start Time

us_traffic = us_traffic.withColumn("Start_Time",to_timestamp(col("Start_Time"))).withColumn("month_of_year", date_format(col("Start_Time"), "MMMM")).withColumn("day_of_week", date_format(col("Start_Time"), "EEEE")).withColumn("hour_day", date_format(col("Start_Time"), "H")).withColumn("week_of_year", date_format(col("Start_Time"), "w"))

In [24]:
# Checking null values in all the columns

from pyspark.sql.functions import isnan, isnull, when, count, col

us_traffic.select([count(when(isnull(c), c)).alias(c) for c in us_traffic.columns]).show()

+--------+----------+--------+---------+---------+--------+----+----+------+-----+--------+-----------+--------+--------+----------+--------------+----------+-------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+--------------+--------------+-----------------+---------------------+-------------+-----------+--------+------------+
|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|Distance|Side|City|County|State|Timezone|Temperature|Humidity|Pressure|Visibility|Wind_Direction|Wind_Speed|Precipitation|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Sunrise_Sunset|Civil_Twilight|Nautical_Twilight|Astronomical_Twilight|month_of_year|day_of_week|hour_day|week_of_year|
+--------+----------+--------+---------+---------+--------+----+----+------+-----+--------+-----------+--------+--------+----------+--------------+----------+--

In [25]:
# Shape of the spark dataframe

print('Shape is ',(us_traffic.count(),len(us_traffic.columns)))

Shape is  (2838820, 39)


In [33]:
us_traffic.toPandas().to_csv("Us_clean.csv",header=True,index=False)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:37476)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:37476)