In [0]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

class USAccidentsPipeline:
    
    def __init__(self):
        self.accidents_delta_table = "accidents"
        self.weather_delta_table = "weather"
        self.time_delta_table = "time"
        self.location_delta_table = "location"
        self.county_delta_table = "county"
        self.spark = SparkSession.builder.appName('US Accidents Review').getOrCreate()
    
    def read_data(self):
        df = self.spark.read.format("csv").option("header", True).option("delimiter",",").load("s3://accidentsreview/US_Accidents.csv")
        df = df.dropDuplicates()
        return df
    
    def filter_columns(self, df):
        cols_to_keep = ["ID", "Severity", "Start_Time", "End_Time", "Start_Lat", "Start_Lng", "Temperature", "Wind_Chill", "Humidity", "Pressure", "Visibility", "Weather_Condition", "Amenity", "Bump", "Crossing", "Give_Way", "Junction", "No_Exit", "Railway", "Roundabout", "Station", "Stop", "Traffic_Calming", "Traffic_Signal", "Turning_Loop", "Sunrise_Sunset", "Civil_Twilight", "Nautical_Twilight", "Astronomical_Twilight", "County", "City", "State", "Zipcode", "Country"]
        filter_df = df.select(cols_to_keep)
        return filter_df
        
    def write_accidents_to_delta(self, df):
        accidents_df = df.select("ID", "Severity", "Start_Time", "End_Time")
        self.write_to_delta(accidents_df, self.accidents_delta_table)
    
    def write_weather_to_delta(self, df):
        weather_df = df.select("ID", "Temperature", "Wind_Chill", "Humidity", "Pressure", "Visibility", "Weather_Condition")
        self.write_to_delta(weather_df, self.weather_delta_table)
    
    def write_time_to_delta(self, df):
        time_df = df.select("ID", "Start_Time", "End_Time", "Sunrise_Sunset", "Civil_Twilight", "Nautical_Twilight", "Astronomical_Twilight")
        self.write_to_delta(time_df, self.time_delta_table)
    
    def write_location_to_delta(self, df):
        location_df = df.select("ID", "Start_Lat", "Start_Lng", "City", "State", "Zipcode", "Country")
        self.write_to_delta(location_df, self.location_delta_table)
    
    def write_county_to_delta(self, df):
        county_df = df.select("ID", "County")
        self.write_to_delta(county_df, self.county_delta_table)
        
        
    def write_to_delta(self,df, delta_path):
        display(delta_path)
        df.write.format("delta").save("s3://accidentszone/delta_tables/" + delta_path)
    
    def run(self):
        df = self.read_data()
        filter_df = self.filter_columns(df)
        self.write_accidents_to_delta(filter_df)
        self.write_weather_to_delta(filter_df)
        self.write_time_to_delta(filter_df)
        self.write_location_to_delta(filter_df)
        self.write_county_to_delta(filter_df)
        
USAccidentsPipeline().run()

'accidents''weather''time''location''county'

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Convert US Accidents to Parquet').getOrCreate()
df = spark.read.format("csv").option("header", True).option('inferSchema', True).option("delimiter", ",").load("s3://accidentsreview/US_Accidents.csv")
df.write.format("parquet").save("s3://accidentsreview/US_Accidents.parquet")

In [0]:
# Display the schema of the DataFrame
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: string (nullable = true)
 |-- End_Time: string (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Number: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Side: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Wind_Chill: double (nullable = true)
 |-- Humidity: integer (nullable = true)
 |-- Pressure: double (nullable = 

In [0]:
%sql 
CREATE TABLE accidents USING DELTA LOCATION "s3://accidentszone/delta_tables/accidents/"

In [0]:
%sql 
SELECT * FROM accidents LIMIT 100

ID,Severity,Start_Time,End_Time
A-218,2,2/16/16 9:41,2/16/16 15:41
A-932,4,3/17/16 7:25,3/17/16 13:25
A-1457,2,4/1/16 20:41,4/2/16 2:41
A-1862,2,4/11/16 17:25,4/11/16 23:25
A-2599,2,4/26/16 18:17,4/27/16 0:17
A-2604,3,4/26/16 19:53,4/27/16 1:53
A-2656,2,4/27/16 16:56,4/27/16 22:56
A-2671,4,4/27/16 18:17,4/28/16 0:17
A-3166,2,5/10/16 17:37,5/10/16 23:37
A-3216,2,5/11/16 15:25,5/11/16 21:25


In [0]:
%sql 
CREATE TABLE county USING DELTA LOCATION "s3://accidentszone/delta_tables/county/"

In [0]:
%sql 
SELECT * FROM county LIMIT 100

ID,County
A-218,Franklin
A-932,Montgomery
A-1457,Santa Cruz
A-1862,Sacramento
A-2599,Santa Clara
A-2604,Solano
A-2656,Sacramento
A-2671,Sacramento
A-3166,Santa Cruz
A-3216,Marin


In [0]:
%sql 
CREATE TABLE location USING DELTA LOCATION "s3://accidentszone/delta_tables/location/"

In [0]:
%sql 
SELECT * FROM location LIMIT 100

ID,Start_Lat,Start_Lng,City,State,Zipcode,Country
A-218,39.95085,-82.94428,Columbus,OH,43205,US
A-932,39.818751,-84.167637,Dayton,OH,45414,US
A-1457,36.98743,-121.98838,Santa Cruz,CA,95062,US
A-1862,38.49797,-121.44812,Sacramento,CA,95823,US
A-2599,37.38665,-121.90556,San Jose,CA,95131,US
A-2604,38.10683,-122.22955,Vallejo,CA,94591,US
A-2656,38.62837,-121.22151,Rancho Cordova,CA,95742,US
A-2671,38.532655,-121.4092,Sacramento,CA,95826-4341,US
A-3166,37.0498,-122.06585,Felton,CA,95018-9781,US
A-3216,37.90264,-122.51424,Mill Valley,CA,94941-1917,US


In [0]:
%sql 
CREATE TABLE time USING DELTA LOCATION "s3://accidentszone/delta_tables/time/"

In [0]:
%sql 
SELECT * FROM time LIMIT 100

ID,Start_Time,End_Time,Sunrise_Sunset,Civil_Twilight,Nautical_Twilight,Astronomical_Twilight
A-218,2/16/16 9:41,2/16/16 15:41,Day,Day,Day,Day
A-932,3/17/16 7:25,3/17/16 13:25,Night,Day,Day,Day
A-1457,4/1/16 20:41,4/2/16 2:41,Night,Night,Night,Day
A-1862,4/11/16 17:25,4/11/16 23:25,Day,Day,Day,Day
A-2599,4/26/16 18:17,4/27/16 0:17,Day,Day,Day,Day
A-2604,4/26/16 19:53,4/27/16 1:53,Day,Day,Day,Day
A-2656,4/27/16 16:56,4/27/16 22:56,Day,Day,Day,Day
A-2671,4/27/16 18:17,4/28/16 0:17,Day,Day,Day,Day
A-3166,5/10/16 17:37,5/10/16 23:37,Day,Day,Day,Day
A-3216,5/11/16 15:25,5/11/16 21:25,Day,Day,Day,Day


In [0]:
%sql 
CREATE TABLE weather USING DELTA LOCATION "s3://accidentszone/delta_tables/weather/"

In [0]:
%sql 
SELECT * FROM weather LIMIT 100

ID,Temperature,Wind_Chill,Humidity,Pressure,Visibility,Weather_Condition
A-218,32.0,26.4,87.0,29.77,5.0,Overcast
A-932,43.0,37.2,65.0,29.9,10.0,Scattered Clouds
A-1457,53.1,,89.0,30.18,10.0,Clear
A-1862,69.1,,58.0,30.06,10.0,Clear
A-2599,64.0,,46.0,30.06,10.0,Mostly Cloudy
A-2604,55.0,,67.0,29.99,10.0,Clear
A-2656,57.2,,72.0,29.82,10.0,Mostly Cloudy
A-2671,53.6,,82.0,29.85,10.0,Overcast
A-3166,57.0,,81.0,30.0,10.0,Overcast
A-3216,71.6,,64.0,30.01,,Clear
