In [68]:
import configparser
from pyspark.sql.types import ArrayType, IntegerType, DateType
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import Window
from pyspark.sql.functions import udf, col, count, monotonically_increasing_id, substring, ceil
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from zipfile import ZipFile
import glob
import datetime
import os
import boto3
import pandas as pd
import logging

In [69]:
# Timer for logging
time = datetime.datetime.now()
def get_time():
    global time
    new_time = datetime.datetime.now()
    seconds = (new_time-time).seconds
    time=new_time
    return seconds

# Setting logging attributes
logging.basicConfig(filename='ETL.log', filemode='w', format='%(message)s - %(created)f')

In [70]:
# For AWS keys
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['DEFAULT']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['DEFAULT']['AWS_SECRET_ACCESS_KEY']

In [45]:
# Creating a spark session
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()
logging.info('Spark session created.')

In [72]:
# Population data
logging.info('Starting population data read.') 
get_time()
population_pd = pd.read_csv("http://www2.census.gov/programs-surveys/popest/datasets/2010-2019/national/totals/nst-est2019-alldata.csv")
logging.info('Population data staged. Time taken: {} Row count: {}'.format(get_time(), population_pd.count())) 

population_staging = spark.createDataFrame(population_pd)
logging.info('Cleaning Data and selecting necessary columns.')

population_dimension = population_staging.filter(population_staging['state'].between(1,56)).select('state', 'name', 'popestimate2016', 'popestimate2017', 'popestimate2018', 'popestimate2019').withColumnRenamed('popestimate2016', 'population_2016').withColumnRenamed('popestimate2017', 'population_2017').withColumnRenamed('popestimate2018', 'population_2018').withColumnRenamed('popestimate2019', 'population_2019')
population_dimension.createOrReplaceTempView('population_dimension')
logging.info('Population dimension dataframe and view created. Row count: {}'.format(population_dimension.count()))

population_dimension.write.parquet("population", "overwrite")

In [73]:
# This cell be could be utilised for ensuring proper data load
population_dimension.head()

Row(state=1, name='Alabama', population_2016=4863525, population_2017=4874486, population_2018=4887681, population_2019=4903185)

In [75]:
# State Code data
logging.info('Getting state codes data.') 
state_codes_staging = spark.read.text("s3a://de-capstone-anil/state_codes.txt")
@udf
def split_state_code(row):
    return row.split(' ')[-1]
@udf
def split_state_name(row):
    return " ".join(row.split(' ')[0:-1])
state_codes_dimension = state_codes_staging.withColumn('state_code', split_state_code(state_codes_staging['value'])).withColumn('state_name', split_state_name(state_codes_staging['value'])).drop('value')
state_codes_dimension.createOrReplaceTempView('state_codes_dimension')
logging.info('State Codes fetched.')

state_codes_dimension.write.parquet("State_Codes", "overwrite")

In [76]:
# This cell be could be utilised for ensuring proper data load
state_codes_dimension.head()

Row(state_code='AL', state_name='Alabama')

In [None]:
# Accidents data
logging.info('Starting Accidents data read.') 
get_time()
accidents_staging = spark.read.csv("s3a://de-capstone-anil/US_Accidents_May19.csv", header=True)
accidents_staging.createOrReplaceTempView('Accidents_staging_view')
logging.info('Accidents data staged. Time taken: {} Row count: {}'.format(get_time(), accidents_staging.count()))

logging.info('Cleaning Data and selecting necessary columns.')
countries = accidents_staging.groupby('country').count().collect()
logging.info("Selecting only those rows with country set to US.")
for country in countries:
    logging.info('Country: {} Count: {}'.format(country['country'], country('name')))
accidents_temp = accidents_staging.filter(accidents_staging['country'] == 'US')
logging.info('After filtering, Row count: {}'.format(accidents_temp.count()))

accidents_temp = accidents_temp.filter(accidents_staging['Start_Time'].startswith('2016') | accidents_staging['Start_Time'].startswith('2017') | accidents_staging['Start_Time'].startswith('2018') | accidents_staging['Start_Time'].startswith('2019'))
logging.info('Filtered data based on year.')
logging.info('After filtering, Row count: {}'.format(accidents_temp.count()))

accidents_dimension = accidents_temp.withColumn('year', substring(accidents_temp['start_time'], 0, 4).cast(IntegerType())).select('id','severity','start_time','end_time','description','side','state','weather_condition', 'year').dropDuplicates()
accidents_dimension.createOrReplaceTempView('accidents_dimension')
logging.info('Accidents dimension created. ')

accidents_dimension.write.parquet("Accidents_Dimension", "overwrite", "state")

In [None]:
accidents_staging.head()

In [None]:
# Vehicles data
logging.info('Starting Vehicles data read.') 
get_time()
vehicle_staging = spark.read.text("s3a://de-capstone-anil/vehicles_per_1000.txt")
logging.info('Vehicles data staged. Time taken: {} Row count: {}'.format(get_time(), vehicle_staging.count()))
logging.info('Applying data transformation.')
@udf
def get_state_vehicle_data(row):
    split = row.split(' ')
    state_name = " ".join(split[1:-1])
    count = split[-1]
    return state_name

@udf
def get_count_vehicle_data(row):
    split = row.split(' ')
    count = split[-1]
    return count

vehicle_dimension = vehicle_staging.withColumn('state', get_state_vehicle_data(vehicle_staging['value'])).withColumn('2017', get_count_vehicle_data(vehicle_staging['value']).cast(IntegerType())).drop('value')

logging.info('Extrapolating the data based on statistics.')
increase_2018_17 = 272.1/268.3
increase_2019_17 = 276/268.3
logging.info('Assuming the count in 2016 decreases by the same fraction as it increased in 2017.')
decrease_2016_17 = 1/increase_2018_17

vehicle_dimension = vehicle_dimension.withColumn('2016', vehicle_dimension['2017']*decrease_2016_17).withColumn('2018', vehicle_dimension['2017']*increase_2018_17).withColumn('2019', vehicle_dimension['2017']*increase_2019_17)

vehicle_dimension.write.parquet("Vehicles_Dimension", "overwrite")

In [None]:
vehicle_staging.head()

In [None]:
logging.info('Creating Location Dimension Table.')
location_dimension = spark.sql('SELECT street, city, county, state FROM accidents_staging_view GROUP BY street, city, county, state')
logging.info('Number of records: {}'.format(location_dimension.count()))

In [None]:
# Joining state data with Accidents dimension
accidents_states = accidents_dimension.join(state_codes_dimension, on=[accidents_temp['state'] == state_codes_dimension['state_code']], how='full').drop('value', 'state_code')
logging.info('Added state names.')
accidents_temp.head(1)

In [None]:
# Creating a Vehicle_Population dimension
Vehicles_Population = vehicle_dimension.join(population_dimension, on=[vehicle_dimension['state'] == population_dimension['name']], how='full').withColumn('vehicles_2019', ceil(vehicle_dimension['2019']*population_dimension['population_2019']/1000)).withColumn('vehicles_2018', ceil(vehicle_dimension['2018']*population_dimension['population_2018']/1000)).withColumn('vehicles_2017', ceil(vehicle_dimension['2017']*population_dimension['population_2017']/1000)).withColumn('vehicles_2016', ceil(vehicle_dimension['2016']*population_dimension['population_2016']/1000)).drop('2016').drop('2017').drop('2018').drop('2019').drop(vehicle_dimension['state'])
logging.info('Joined Vehicle dimension with Population dimension. Row count: {}'.format(Vehicles_Population.count()))

Vehicles_Population.write.parquet("Vehicles_Population", "overwrite")

In [None]:
Vehicles_Population.head()

In [None]:
#Creating the fact table
accidents_fact = accidents_states.groupby('state_name').agg({'id':'count'}).join(Vehicles_Population, on=[Vehicles_Population['name'] == accidents_states['state_name']], how='inner').drop(Vehicles_Population['name'])
logging.info('Created fact table dataframe.')
logging.info('Sample row: {}'.format(accidents_fact.head()))