# This notebook is made for a part of Coventry University's coursework.
Dataset is accessible on https://www.kaggle.com/sobhanmoosavi/us-accidents

## This is part one of the coursework - Data Cleaning

Produced by Sunggu Choi

In [2]:
#Library Importing

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *

from pyspark.sql.types import *
from datetime import date, timedelta, datetime
import time


In [3]:
#Initialization of a Spark Session
sc = SparkSession.builder.appName("dataCleaning")\
      .config("spark.memory.fraction", 0.8) \
      .config("spark.executor.memory", "4g") \
      .config("spark.driver.memory", "4g")\
      .config("spark.sql.shuffle.partitions" , "800") \
      .config("spark.memory.offHeap.enabled",'true')\
      .config("spark.memory.offHeap.size","4g")\
      .getOrCreate()

In [6]:
#Load dataset
df = sc.read.csv('US_Accidents_June20.csv', header=True)

df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- TMC: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- Start_Time: string (nullable = true)
 |-- End_Time: string (nullable = true)
 |-- Start_Lat: string (nullable = true)
 |-- Start_Lng: string (nullable = true)
 |-- End_Lat: string (nullable = true)
 |-- End_Lng: string (nullable = true)
 |-- Distance(mi): string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Number: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- Side: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: string (nullable = true)
 |-- Temperature(F): string (nullable = true)
 |-- Wind_Chill(F): string (nullable = 

In [9]:
df.show(1)

+---+--------+-----+--------+-------------------+-------------------+---------+----------+-------+-------+------------+--------------------+------+------+----+------+----------+-----+-------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|  Source|  TMC|Severity|         Start_Time|           End_Time|Start_Lat| Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|Number|Street|Side|  City|    County|State|Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_E

In [4]:
#Drop the columns
df2 = df.drop('Source','TMC','End_Time','End_Lat','End_Lng','Distance(mi)','Description','Number','Street','Weather_Timestamp','Side','Zipcode','Country','Timezone','Airport_Code','Wind_Direction','Wind_Speed(mph)','Weather_Condition')
print(df2)

DataFrame[ID: string, Severity: string, Start_Time: string, Start_Lat: string, Start_Lng: string, City: string, County: string, State: string, Temperature(F): string, Wind_Chill(F): string, Humidity(%): string, Pressure(in): string, Visibility(mi): string, Precipitation(in): string, Amenity: string, Bump: string, Crossing: string, Give_Way: string, Junction: string, No_Exit: string, Railway: string, Roundabout: string, Station: string, Stop: string, Traffic_Calming: string, Traffic_Signal: string, Turning_Loop: string, Sunrise_Sunset: string, Civil_Twilight: string, Nautical_Twilight: string, Astronomical_Twilight: string]


### Dropping columns
Since some columns are unable to use for data processing: 
'Source','TMC','End_Time','End_Lat','End_Lng','Distance(mi)','Description','Number','Street','Side','Zipcode','Country','Timezone','Airport_Code

In [5]:
#Rename the columns
df2.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- Start_Time: string (nullable = true)
 |-- Start_Lat: string (nullable = true)
 |-- Start_Lng: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Temperature(F): string (nullable = true)
 |-- Wind_Chill(F): string (nullable = true)
 |-- Humidity(%): string (nullable = true)
 |-- Pressure(in): string (nullable = true)
 |-- Visibility(mi): string (nullable = true)
 |-- Precipitation(in): string (nullable = true)
 |-- Amenity: string (nullable = true)
 |-- Bump: string (nullable = true)
 |-- Crossing: string (nullable = true)
 |-- Give_Way: string (nullable = true)
 |-- Junction: string (nullable = true)
 |-- No_Exit: string (nullable = true)
 |-- Railway: string (nullable = true)
 |-- Roundabout: string (nullable = true)
 |-- Station: string (nullable = true)
 |-- Stop: string (nullable = true)
 |-- Traffic_Calming: s

In [6]:
#Update column
df3 = df2.withColumnRenamed('Start_Time', 'Time')
print(df3)


DataFrame[ID: string, Severity: string, Time: string, Start_Lat: string, Start_Lng: string, City: string, County: string, State: string, Temperature(F): string, Wind_Chill(F): string, Humidity(%): string, Pressure(in): string, Visibility(mi): string, Precipitation(in): string, Amenity: string, Bump: string, Crossing: string, Give_Way: string, Junction: string, No_Exit: string, Railway: string, Roundabout: string, Station: string, Stop: string, Traffic_Calming: string, Traffic_Signal: string, Turning_Loop: string, Sunrise_Sunset: string, Civil_Twilight: string, Nautical_Twilight: string, Astronomical_Twilight: string]


In [7]:
from pyspark.sql.functions import isnan, when, count, col

df3.select([count(when(isnan(c), c)).alias(c) for c in df3.columns]).show()

+---+--------+----+---------+---------+----+------+-----+--------------+-------------+-----------+------------+--------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Severity|Time|Start_Lat|Start_Lng|City|County|State|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Precipitation(in)|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|Nautical_Twilight|Astronomical_Twilight|
+---+--------+----+---------+---------+----+------+-----+--------------+-------------+-----------+------------+--------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+------

In [8]:
df3.select([count(when(isnan('City'),True))]).show()

+------------------------------------------+
|count(CASE WHEN isnan(City) THEN true END)|
+------------------------------------------+
|                                         0|
+------------------------------------------+



In [9]:
#Count number of Null Values
df3.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df3.columns]).show()

+---+--------+----+---------+---------+----+------+-----+--------------+-------------+-----------+------------+--------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Severity|Time|Start_Lat|Start_Lng|City|County|State|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Precipitation(in)|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|Nautical_Twilight|Astronomical_Twilight|
+---+--------+----+---------+---------+----+------+-----+--------------+-------------+-----------+------------+--------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+------

In [10]:
#Fixing the data
df4 = df3.fillna({'City':'Empty','Temperature(F)':0,'Wind_Chill(F)':0,'Humidity(%)':0,'Pressure(in)':0,'Visibility(mi)':0,'Precipitation(in)':0,'Sunrise_Sunset':'Day','Civil_Twilight':'Day','Nautical_Twilight':'Day','Astronomical_Twilight':'Day'})

df4.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df3.columns]).show()

+---+--------+----+---------+---------+----+------+-----+--------------+-------------+-----------+------------+--------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Severity|Time|Start_Lat|Start_Lng|City|County|State|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Precipitation(in)|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|Nautical_Twilight|Astronomical_Twilight|
+---+--------+----+---------+---------+----+------+-----+--------------+-------------+-----------+------------+--------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+------

In [11]:
#Save the dataframe as CSV
df4.toPandas().to_csv('pySparkout.csv')