<a href="https://colab.research.google.com/github/FanusArefaine/US_Accidents---Pyspark/blob/main/US_Accidents_Preprocessing___PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Apache Spark Machine Learning**

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:7 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:10 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [40.1 kB]
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Ign:13 htt

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

In [4]:
import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
sc

In [5]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

In [6]:
import pandas as pd
from pyspark.sql.functions import split
from pyspark.sql.functions import col, to_timestamp

In [7]:

# Importing dataset from Local directory
acc_df = spark.read.csv('/content/drive/My Drive/colab_notes/datasets/US_Accidents_June20.csv', inferSchema=True, header=True)

# Chaching the data into memory
acc_df.cache()

# showing top 10 rows in the dataset
acc_df.show(10)

+----+--------+-----+--------+-------------------+-------------------+---------+----------+-------+-------+------------+--------------------+------+--------------------+----+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|  ID|  Source|  TMC|Severity|         Start_Time|           End_Time|Start_Lat| Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|Number|              Street|Side|        City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Conditi

## **Data Profiling**

In [8]:
# number of records and features  on the dataframe
print("Number of records: ", acc_df.count())
print("Number of features: ", len(acc_df.columns))

Number of records:  3513617
Number of features:  49


In [9]:
# Checking the schema of the dataset

acc_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- TMC: double (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Number: double (nullable = true)
 |-- Street: string (nullable = true)
 |-- Side: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (n

In [10]:
### Validating Zipcode in the dataset

acc_df = acc_df.withColumn("Zipcode", split("Zipcode", "-").getItem(0))

In [11]:
acc_df.show(10)

+----+--------+-----+--------+-------------------+-------------------+---------+----------+-------+-------+------------+--------------------+------+--------------------+----+------------+----------+-----+-------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|  ID|  Source|  TMC|Severity|         Start_Time|           End_Time|Start_Lat| Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|Number|              Street|Side|        City|    County|State|Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Ame

In [12]:
#Checking the percentage of null values of all columns

def null_percentage(df):

  """ A function to check and print percentage of null values in a pyspark dataframe"""

  """ Input: pyspark dataframe -> df
      Output: print the percentage of null values in each feature"""

  for col in df.columns:
    
    per_null = round(((df.filter(df[col].isNull()=='true')).count()/3513617)*100,2)
    print(col, " - ", per_null, "% null values")



null_percentage(acc_df)

ID  -  0.0 % null values
Source  -  0.0 % null values
TMC  -  29.45 % null values
Severity  -  0.0 % null values
Start_Time  -  0.0 % null values
End_Time  -  0.0 % null values
Start_Lat  -  0.0 % null values
Start_Lng  -  0.0 % null values
End_Lat  -  70.55 % null values
End_Lng  -  70.55 % null values
Distance(mi)  -  0.0 % null values
Description  -  0.0 % null values
Number  -  64.4 % null values
Street  -  0.0 % null values
Side  -  0.0 % null values
City  -  0.0 % null values
County  -  0.0 % null values
State  -  0.0 % null values
Zipcode  -  0.03 % null values
Country  -  0.0 % null values
Timezone  -  0.11 % null values
Airport_Code  -  0.19 % null values
Weather_Timestamp  -  1.23 % null values
Temperature(F)  -  1.87 % null values
Wind_Chill(F)  -  53.17 % null values
Humidity(%)  -  1.98 % null values
Pressure(in)  -  1.59 % null values
Visibility(mi)  -  2.16 % null values
Wind_Direction  -  1.68 % null values
Wind_Speed(mph)  -  12.94 % null values
Precipitation(in)  -  5

In [13]:
# Some of the columns have a lot of null values. Hence, removal of columns with considerable null values is essential for further efficient and correct data processing

# columns with significant null values

cols_null = ['TMC', 'End_Lat', 'End_Lng', 'Number', 'Wind_Chill(F)', 'Precipitation(in)']

# dropping the above columns fromt the dataset

acc_df = acc_df.drop(*cols_null)


In [14]:
# Further dropping null values from the data

acc_df = acc_df.dropna()

# Further checking the percentage of null values of all columns

null_percentage(acc_df)

ID  -  0.0 % null values
Source  -  0.0 % null values
Severity  -  0.0 % null values
Start_Time  -  0.0 % null values
End_Time  -  0.0 % null values
Start_Lat  -  0.0 % null values
Start_Lng  -  0.0 % null values
Distance(mi)  -  0.0 % null values
Description  -  0.0 % null values
Street  -  0.0 % null values
Side  -  0.0 % null values
City  -  0.0 % null values
County  -  0.0 % null values
State  -  0.0 % null values
Zipcode  -  0.0 % null values
Country  -  0.0 % null values
Timezone  -  0.0 % null values
Airport_Code  -  0.0 % null values
Weather_Timestamp  -  0.0 % null values
Temperature(F)  -  0.0 % null values
Humidity(%)  -  0.0 % null values
Pressure(in)  -  0.0 % null values
Visibility(mi)  -  0.0 % null values
Wind_Direction  -  0.0 % null values
Wind_Speed(mph)  -  0.0 % null values
Weather_Condition  -  0.0 % null values
Amenity  -  0.0 % null values
Bump  -  0.0 % null values
Crossing  -  0.0 % null values
Give_Way  -  0.0 % null values
Junction  -  0.0 % null values
No_E

In [None]:
# As shown above, the dataset does not have null values

# Clean dataset with selected columns

acc_df.show()

+----+--------+--------+-------------------+-------------------+---------+----------+------------+--------------------+--------------------+----+------------+----------+-----+-------+-------+----------+------------+-------------------+--------------+-----------+------------+--------------+--------------+---------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|  ID|  Source|Severity|         Start_Time|           End_Time|Start_Lat| Start_Lng|Distance(mi)|         Description|              Street|Side|        City|    County|State|Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station| Stop|Traffic_Calming|Traffic_Signal|Turning_Loop

In [None]:
# Saving pyspark dataframe as a form of several csv files

acc_df.write.csv("US_accidents.csv", header=True)

In [None]:
# Savig pyspark dataframe to a single csv file

acc_df.coalesce(1).write.option("header", "true").csv("US_accident.csv")

In [None]:
# Saving pyspark dataframe into a single dataframe by first converting into pandas dataframe

acc_df.toPandas().to_csv("sample_file.csv", header=True)