# Data Engineering Nanodegree Capstone Project
This notebook implements ETL process to extract bike sharing data from CSV files located in a public AWS S3 bucket.

Data then is transformed into dimentional model using Spark running on an AWS EMR cluster.

Finally, dimentional tables are loaded to the same S3 bucket to allow for infinite scalling and to save cost on spark cluster.

For more details about the goal and scope of the project read the `README.md` file.

## Imports
Importing PySpark sql functions

In [1]:
import pyspark.sql.functions as F

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1599686625902_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Loading Data
loaing the CSV files from S3 bucket `omar-dend`. It is located in `us-west-2`

In [2]:
st_station_df = spark.read.csv('s3://omar-dend/station.csv', header=True)
st_weather_df = spark.read.csv('s3://omar-dend/weather.csv', header=True)
st_trip_df = spark.read.csv('s3://omar-dend/trip.csv', header=True)
st_status_df = spark.read.csv('s3://omar-dend/status.csv', header=True)
st_city_df = spark.read.csv('s3://omar-dend/city.csv', header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# save counts to ensure later that all rows are present
station_count = st_station_df.count()
weather_count = st_weather_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Adding Timestamp Columns
Each of dataframe loaded from S3 has different string represntation of date & time. In this step I add new column to represent the timestamp

In [4]:
st_station_df = st_station_df.withColumn('datetime', F.to_timestamp(st_station_df.installation_date, 'MM/dd/yyyy'))

st_weather_df = st_weather_df.withColumn('datetime', F.to_timestamp(st_weather_df.date, 'MM/dd/yyyy'))

st_trip_df = st_trip_df.withColumn('datetime_start', F.to_timestamp(st_trip_df.start_date, 'MM/dd/yyyy HH:mm'))
st_trip_df = st_trip_df.withColumn('datetime_end', F.to_timestamp(st_trip_df.end_date, 'MM/dd/yyyy HH:mm'))

st_status_df = st_status_df.withColumn('datetime', F.to_timestamp(st_status_df.time, 'yyyy/MM/dd HH:mm:ss'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Constructing Dimensional Tables
This is the Transformation step of the ETL process. Here relvent columns are transformed & copied to new dataframes that represent the dimensional table. 

### Time Table
Construnting `dim_time` table requires extracting the timestamps from all CSV files.

Each Cell represent a different file. At the end duplicates are dropped.

In [5]:
time_df = st_station_df.select('datetime')\
        .withColumn('second', F.second('datetime'))\
        .withColumn('minute', F.minute('datetime'))\
        .withColumn('hour', F.hour('datetime'))\
        .withColumn('day', F.dayofmonth('datetime'))\
        .withColumn('week', F.weekofyear('datetime'))\
        .withColumn('month', F.month('datetime'))\
        .withColumn('year', F.year('datetime'))\
        .withColumn('weekday', F.dayofweek('datetime'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
time_df = st_weather_df.select('datetime')\
        .withColumn('second', F.second('datetime'))\
        .withColumn('minute', F.minute('datetime'))\
        .withColumn('hour', F.hour('datetime'))\
        .withColumn('day', F.dayofmonth('datetime'))\
        .withColumn('week', F.weekofyear('datetime'))\
        .withColumn('month', F.month('datetime'))\
        .withColumn('year', F.year('datetime'))\
        .withColumn('weekday', F.dayofweek('datetime'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
time_df = st_trip_df.select(F.col('datetime_start').alias('datetime'))\
        .withColumn('second', F.second('datetime'))\
        .withColumn('minute', F.minute('datetime'))\
        .withColumn('hour', F.hour('datetime'))\
        .withColumn('day', F.dayofmonth('datetime'))\
        .withColumn('week', F.weekofyear('datetime'))\
        .withColumn('month', F.month('datetime'))\
        .withColumn('year', F.year('datetime'))\
        .withColumn('weekday', F.dayofweek('datetime'))\

time_df = st_trip_df.select(F.col('datetime_end').alias('datetime'))\
        .withColumn('second', F.second('datetime'))\
        .withColumn('minute', F.minute('datetime'))\
        .withColumn('hour', F.hour('datetime'))\
        .withColumn('day', F.dayofmonth('datetime'))\
        .withColumn('week', F.weekofyear('datetime'))\
        .withColumn('month', F.month('datetime'))\
        .withColumn('year', F.year('datetime'))\
        .withColumn('weekday', F.dayofweek('datetime'))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
time_df = st_status_df.select('datetime')\
        .withColumn('second', F.second('datetime'))\
        .withColumn('minute', F.minute('datetime'))\
        .withColumn('hour', F.hour('datetime'))\
        .withColumn('day', F.dayofmonth('datetime'))\
        .withColumn('week', F.weekofyear('datetime'))\
        .withColumn('month', F.month('datetime'))\
        .withColumn('year', F.year('datetime'))\
        .withColumn('weekday', F.dayofweek('datetime'))\
        .dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Weather Table
Creating `dim_weather` table. Duplicates are dropped if any, the current dataset does not have duplicates.

In [9]:
weather_df = st_weather_df.select('max_temperature_f', 'mean_temperature_f', 'min_temperature_f',
                                    'max_humidity', 'mean_humidity', 'min_humidity',
                                    'max_wind_Speed_mph', 'mean_wind_speed_mph',
                                    'precipitation_inches',
                                    'events', 'zip_code', 'datetime')\
                                        .dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Station Table
Creating `dim_station` table.

A new column for zip codes is added, the value is based on the name of the city. This column will make it possible to make analyses involve weather data.

In [10]:
station_df = st_station_df.select(F.col('id').alias('station_id'),
                                    F.col('name').alias('station_name'),
                                    'lat', 'long', 'dock_count', 'city',
                                    F.col('datetime').alias('installation_datetime') )
station_df = station_df.join(st_city_df, station_df.city == st_city_df.city, 'left')\
                            .drop('city')\
                            .dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Trip Table
Creating `fact_trip` table.

In [11]:
trip_df = st_trip_df.select(F.col('id').alias('trip_id'), 'duration', 'bike_id',
                            'subscription_type',
                            'start_station_id', 'end_station_id',
                            'datetime_start', 'datetime_end')\
                            .dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Status Table
Creating `fact_status` table.

In [12]:
status_df = st_status_df.select('station_id', 'bikes_available',
                                'docks_available', 'datetime')\
                                .dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Check that all station & weather rows present
Making sure that no station was dropped by mistake. Also making sure weather data of all the days for all zip codes are present and none was dropped by mistake.

In [13]:
station_dim_count = station_df.count()
weather_dim_count = weather_df.count()

if station_dim_count != station_count or weather_dim_count != weather_count:
    raise Exception('Some dimensional rows are missing')
else:
    print('All is good')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

All is good

## Saving Dimensional Tables to S3
Loading dimensional tables to S3 in parquet format. This allows data to scale (in size and number of users) infitily and letting S3 manage that.

Another advantage is the AWS EMR Spark Cluster can be shutdown to save on cost while data is still accessible

In [14]:
station_df.write.mode('overwrite')\
        .parquet('s3://omar-dend/dim_station')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
trip_df.write.mode('overwrite')\
        .parquet('s3://omar-dend/fact_trip')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

`fact_status` is partitioned by station id

In [16]:
status_df.write.mode('overwrite')\
        .partitionBy('station_id')\
        .parquet('s3://omar-dend/fact_status')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

`dim_weather` is partitioned by zip code

In [17]:
weather_df.write.mode('overwrite')\
        .partitionBy('zip_code')\
        .parquet('s3://omar-dend/dim_weather')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

`dim_time` is partitioned by year and month

In [18]:
time_df.write.mode('overwrite')\
        .partitionBy('year', 'month')\
        .parquet('s3://omar-dend/dim_time')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…