# Spark ELT

In [13]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, DateType, FloatType
from pyspark.sql.functions import *

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

sc = spark.sparkContext

raw_data_s3_dir="s3://kallibek-data-engineering-capstone-proj/raw_data/"
output_s3_dir="s3://kallibek-data-engineering-capstone-proj/output/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
try:
    sc.install_pypi_package("pandas==0.25.1")
    import pandas as pd
    pd.set_option('display.max_columns', None)
except Exception as e:
    print(e)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package already installed for current Spark context!

In [15]:
# a function to print 5 rows of a function 
def quality_check(df):
    """
    Prints five rows of a dataframe.
    Then, checks count of rows.
    If rows<1 raises an exception
    
    Input: df - pyspark DataFrame object
    """
    row_count=df.count()
    print(df.limit(5).toPandas())
    if row_count<1:
        raise Exception("Datafrane has no rows")
    print('count of rows: ', row_count)
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 1. Load data from S3

## 1.1 Load NYC Vehicle Collisions

In [16]:
s3_path_collisions=os.path.join(raw_data_s3_dir, 'vehicle_crash_data/vehicle_crash_data.csv')
collisions_df = spark.read.format("csv").option("header", "true").option("mode", "DROPMALFORMED").load(s3_path_collisions)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Select certain columns and rename them.
# Add Year and Month columns to partition later on
collisions_df2 = collisions_df.select(col("COLLISION_ID").alias("Collision_ID"),
                                      to_timestamp(concat_ws(' ',col("CRASH DATE"),col("CRASH TIME")),'MM/dd/yyyy HH:mm').alias("Datetime"),
                                     col("ZIP CODE").alias("Zipcode"),
                                      col("LOCATION").alias("Coordinates"),
                                      col("NUMBER OF PERSONS INJURED").cast(IntegerType()).alias("Injured"),
                                      col("NUMBER OF PERSONS KILLED").cast(IntegerType()).alias("Killed")
                                     )\
                                .withColumn("Year", year(col("Datetime")))\
                                .withColumn("Month", month(col("Datetime")))
quality_check(collisions_df2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

  Collision_ID            Datetime Zipcode            Coordinates  Injured  \
0      4403352 2021-03-29 18:00:00   11221  (40.69173, -73.93517)        0   
1      4402851 2021-03-29 22:02:00    None                   None        2   
2      4402444 2021-03-28 14:20:00    None                   None        1   
3      4402463 2021-03-27 15:52:00    None                   None        0   
4      4401984 2021-03-26 15:20:00    None                   None        1   

   Killed  Year  Month  
0       0  2021      3  
1       0  2021      3  
2       0  2021      3  
3       0  2021      3  
4       0  2021      3  
count of rows:  1766640

## 1.2. Load weather data and filter data for Central Park Weather Station

In [18]:
s3_path_weather_data=os.path.join(raw_data_s3_dir,'weather_by_year/*.csv.gz')
weather_df= spark.read.format("csv").option("sep", ",").option("header", "false").load(s3_path_weather_data)

# Select certain columns and rename them.
# Filter Central Park Weather Station ("USW00094728")
# Filter non-null date values
weather_df2=weather_df.select(col("_c0").alias("Station_ID"),
                             to_date(col("_c1"),'yyyyMMdd').alias("Date"),
                             col("_c2").alias("Param"),
                             col("_c3").cast(FloatType()).alias("Value"))\
                        .filter(col("Station_ID")=="USW00094728")\
                        .filter(col("Date").isNotNull())\
                        .drop("Station_ID")

quality_check(weather_df2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

         Date Param  Value
0  2020-01-01  TMAX   50.0
1  2020-01-01  TMIN   11.0
2  2020-01-01  PRCP    0.0
3  2020-01-01  SNOW    0.0
4  2020-01-01  SNWD    0.0
count of rows:  4678

# 2. Transform data

## 2.1. Summary of NYC vehicle accidents by date

In [19]:
# Aggregate collisions table by date, 
# summaurize by count of Collisions, sum of Injured and Killed.
# Order by descending Date
collisions_daily_summary=collisions_df2.groupBy(to_date(col("Datetime")).cast("date").alias("Date"))\
                                .agg(count("Collision_ID").alias("Collisions"),
                                     sum("Injured").alias("Injured"),
                                     sum("Killed").alias("Killed"))\
                                .orderBy(col("Date").desc())
quality_check(collisions_daily_summary)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

         Date  Collisions  Injured  Killed
0  2021-03-29         184       75       0
1  2021-03-28         144       56       0
2  2021-03-27         272      165       1
3  2021-03-26         302      143       0
4  2021-03-25         231       90       0
count of rows:  3195

## 2.2. Extract columns from crashes data to create time_table

In [20]:
# Exract components from 
time_table = collisions_df2.select('DATETIME').filter(col('DATETIME').isNotNull()) \
                           .withColumn('hour', hour(col('DATETIME'))) \
                           .withColumn('day', dayofmonth(col('DATETIME'))) \
                           .withColumn('week', weekofyear(col('DATETIME'))) \
                           .withColumn('month', month(col('DATETIME'))) \
                           .withColumn('year', year(col('DATETIME'))) \
                           .withColumn('weekday', dayofweek(col('DATETIME'))) \
                           .dropDuplicates()

quality_check(time_table)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

             DATETIME  hour  day  week  month  year  weekday
0 2019-03-10 00:10:00     0   10    10      3  2019        1
1 2019-02-24 15:15:00    15   24     8      2  2019        1
2 2019-03-05 12:40:00    12    5    10      3  2019        3
3 2019-03-15 15:13:00    15   15    11      3  2019        6
4 2019-02-20 17:10:00    17   20     8      2  2019        4
count of rows:  911777

## 2.3. Reshape weather data

Filter main weather params


PRCP = Precipitation (tenths of mm) <br>
SNOW = Snowfall (mm) <br>
SNWD = Snow depth (mm) <br>
TMAX = Maximum temperature (tenths of degrees C) <br>
TMIN = Minimum temperature (tenths of degrees C) <br>

In [21]:
# filter main weather parameters
weather_df3=weather_df2.filter(col("Param").isin(["PRCP","SNOW","SNWD","TMAX","TMIN"]))

# Pivot weather table
weather_df4=weather_df3.groupBy("Date").pivot("Param").sum("Value")

# Convert TMIN and TMAX from tenths-of-degrees C to C
weather_df5=weather_df4.withColumn("TMAX", col("TMAX")/10).withColumn("TMIN", col("TMIN")/10).orderBy(col("Date").desc())
quality_check(weather_df5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

         Date  PRCP  SNOW  SNWD  TMAX  TMIN
0  2021-04-02   0.0   0.0   0.0   3.9  -2.2
1  2021-04-01   8.0   0.0   0.0  14.4   3.3
2  2021-03-31  89.0   0.0   0.0  19.4  10.0
3  2021-03-30   0.0   0.0   0.0  16.1   5.6
4  2021-03-29   0.0   0.0   0.0  12.8   7.2
count of rows:  458

# 3. Joins for analytics

In [25]:
# join collisions daily summary and weather data
collisions_n_weather_byDate = collisions_daily_summary.join(
    weather_df5,weather_df5.Date==collisions_daily_summary.Date, 
    how="inner"
    ).orderBy(weather_df5.Date.desc())\
    .drop(collisions_daily_summary.Date)
    
collisions_n_weather_byDate.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------+------+----------+-----+----+----+----+----+
|Collisions|Injured|Killed|      Date| PRCP|SNOW|SNWD|TMAX|TMIN|
+----------+-------+------+----------+-----+----+----+----+----+
|       184|     75|     0|2021-03-29|  0.0| 0.0| 0.0|12.8| 7.2|
|       144|     56|     0|2021-03-28|193.0| 0.0| 0.0|15.6| 8.9|
|       272|    165|     1|2021-03-27|  0.0| 0.0| 0.0|20.6|10.0|
|       302|    143|     0|2021-03-26| 48.0| 0.0| 0.0|27.8|11.1|
|       231|     90|     0|2021-03-25|  3.0| 0.0| 0.0|20.0| 9.4|
+----------+-------+------+----------+-----+----+----+----+----+
only showing top 5 rows

# 4. Load to S3

In [26]:
# collisions daily summary and weather by date
collisions_n_weather_byDate.write.csv(os.path.join(output_s3_dir, 'collisions_n_weather_byDate.csv'), 'overwrite')

# collisions
collisions_df2.write.partitionBy("Year","Month") \
                     .parquet(os.path.join(output_s3_dir, 'collisions.csv'), 'overwrite')

# time_table
time_table.write.partitionBy("year","month") \
                     .parquet(os.path.join(output_s3_dir, 'time/time.parquet'), 'overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# weather
weather_df5.write.csv(os.path.join(output_s3_dir, 'weather_daily.csv'), 'overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…