# 2020 Chicago Taxi and Weather Conditions Data Modeling

### Data Engineering Capstone Project
 

#### Project Summary
In the following project we will try to explore the taxi demand in Chicago based on Temperature data.

We would like to explore the following questions:



The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

# Step 1: Scope the Project and Gather Data

In the current project, a public Chicago Taxi Trips dataset will be used.
We will take information for last several months and combine it with weather data, and weather conditions during each day, recorded in Chicago OHARE INTERNATIONAL AIRPORT.
Data will be processed and ready to be loaded for further analysis.

Chicago taxi dataset is a public dataset that can be found at: https://data.cityofchicago.org/Transportation/Taxi-Trips/wrvz-psew

The weather dataset can be found here: https://www.ncdc.noaa.gov/cdo-web/datasets

ETL process will be performed on the AWS EMR Spark cluster, and data will be loaded to Data Lake only during last step. All processing steps will happen in memory.

Data will then be saved in S3 Data Lake in Parquet format, that would allow it to be easily queryable with tools like Presto, AWS Athena or AWS Glue.

Last step can be changed if needed, for example for data to be written directly to Redshift cluster.
<br>However, all the tables should be created beforehand performing such option.

The final tables will have a format that can support running predefined queries.
<br>This means, it could be beneficial to load the data into NoSQL database, such as Cassandra, that allows exactly that kind of queries.


Chicago taxi Dataset will be described in **section 4**, however we provide general outline of the whole dataset below.

![dataset-info](./assets/dataset.png)

![columns](./assets/dataset-columns.png)

# Chicago Community Areas

Chicago is officially divided into 77 community areas (zones). More can be learned via this link: https://en.wikipedia.org/wiki/Community_areas_in_Chicago

Data can be populated with real area names, however for many purposes, keeping number mapping makes more sense.

![columns](./assets/chicago-areas.png)

#### Local cluster setup prepequisites
Some preinstallation may be required if Notebook is run from local cluster.

In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 45.0MB/s eta 0:00:01
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 42.1MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=0b296db2c8d0ce2baf7660a963ae55a7ea99fa7747303f68d13e01d4165216f4
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully i

In [9]:
!apt-get install openjdk-8-jdk-headless -qq

debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 76, <> line 32.)
debconf: falling back to frontend: Readline
Extracting templates from packages: 100%
Preconfiguring packages ...
Selecting previously unselected package multiarch-support.
(Reading database ... 18426 files and directories currently installed.)
Preparing to unpack .../multiarch-support_2.27-3ubuntu1_amd64.deb ...
Unpacking multiarch-support (2.27-3ubuntu1) ...
Setting up multiarch-support (2.27-3ubuntu1) ...
Selecting previously unselected package libxau6:amd64.
(Reading database ... 18429 files and directories currently installed.)
Preparing to unpack .../00-libxau6_1%3a1.0.8-1_amd64.deb ...
Unpacking libxau6:amd64 (1:1.0.8-1) ...
Selecting previously unselected package libbsd0:amd64.
Preparing to unpack .../01-libbsd0_0.8.7-1ubuntu0.1_amd64.deb ...
Unpacking libbsd0:amd64 (

In [3]:
!apt-get install wget -y




wget is already the newest version (1.19.4-1ubuntu2.2).
0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded.


#### Data can be downloaded via official API

In [78]:
!wget -O /storage/taxi_trips_100m_rows.csv "https://data.cityofchicago.org/resource/wrvz-psew.csv?%24where=trip_start_timestamp%20>=%20'2018-01-01T00:00:00'&%24limit=100000000"

--2020-02-21 10:32:09--  https://data.cityofchicago.org/resource/wrvz-psew.csv?%24where=trip_start_timestamp%20%3E=%20'2018-01-01T00:00:00'&%24limit=100000000
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.205, 52.206.68.26, 52.206.140.199
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.205|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘/storage/taxi_trips_100m_rows.csv’

/storage/taxi_trips     [ <=>                ]  16.38G  11.2MB/s    in 24m 49s 

2020-02-21 10:58:08 (11.3 MB/s) - ‘/storage/taxi_trips_100m_rows.csv’ saved [17584118950]



# Step 1a. Loading Taxi Data

Chicago Taxi Dataset is very large (around 170 Gb on the current web site), and we will use part of this data with 100 million rows, that currently has size of around 11 Gb.

The data will contain trips only from 2018 till present day Febrauary 2020 (currently available data upper date is 5 February), and for exploratory purposes will be limited to 40 million rows.

Weather dataset contains weather recording from 1953 till today. Here, we only use a subset of this data from 2018 till February 2020.

Considering large amount of data, the use of S3 buckets for storage is generally is very good idea, especially if ETL pipelines are performed in EMR cluster, as this allow faster data retrieval and load withing AWS internal network.

In [1]:
# Data Buckets. Change the name of the bucket where data is stored

#trips_data = "./storage/taxi_trips_100m_rows.csv"
trips_data = 's3://taxi-big-data-for-capstone/taxi_trips_100m_rows.csv'
weather_data = "s3a://taxi-big-data-sample/chicago_weather_extended_data.csv" #"./storage/chicago_weather_extended_data.csv" 

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1582290877179_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Step 2: Explore and Assess the Data

To be able to perform analysis and ETL the dataset should be loaded first.

Next we check if the data is correct, and if some columns have missing data.

Additionally, first 5 rows of each datset, schema and column list are shown.


In [2]:
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# if run outside of EMR cluster, create Spark Session by hand
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID', None)
AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY', None)

def create_spark_session():
    """returns: Spark Session object
    
    Description: Factory function that creates Spark Session objects.
    
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("fs.s3a.awsAccessKeyId", AWS_ACCESS_KEY)\
        .config("fs.s3a.awsSecretAccessKey", AWS_SECRET_KEY)\
        .getOrCreate()
    return spark

spark = create_spark_session()

In [3]:
taxi_trips = spark.read.option("header", "true").option("inferschema", "true").csv(trips_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Next, let's read the dataset and explore how does the data actually look like

In [5]:
taxi_trips.limit(5).toPandas()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_census_tract,dropoff_census_tract,pickup_community_area,dropoff_community_area,...,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,pickup_centroid_location,dropoff_centroid_latitude,dropoff_centroid_longitude,dropoff_centroid_location
0,3485f158f4e4151a50a3666701f1d6480712e344,8dd990653cc2793d96b80ae45e928afc9a590b21b491ee...,2018-04-26 10:15:00,2018-04-26 10:15:00,300,0.7,17031080000.0,17031080000.0,8,8,...,0.0,5.25,Cash,KOAM Taxi Association,41.890922,-87.618868,POINT (-87.6188683546 41.8909220259),41.892042,-87.631864,POINT (-87.6318639497 41.8920421365)
1,c9ea510fdf7256077b5f33daa9f3362b631a19d6,d59a55fcbd9ef11987cadd1a2dc93149edace06e23c37a...,2018-04-20 14:30:00,2018-04-20 14:30:00,420,0.7,17031080000.0,17031080000.0,8,8,...,0.0,6.0,Cash,Taxi Affiliation Services,41.898332,-87.620763,POINT (-87.6207628651 41.8983317935),41.892508,-87.626215,POINT (-87.6262149064 41.8925077809)
2,e0203090822169816112bb888a0bf20de23f1705,78a7a28fe001257044d5095fcb1ff7156a47411ff93cc5...,2018-04-08 01:00:00,2018-04-08 01:15:00,780,5.7,,,32,6,...,1.0,17.75,Cash,Star North Management LLC,41.878866,-87.625192,POINT (-87.6251921424 41.8788655841),41.944227,-87.655998,POINT (-87.6559981815 41.9442266014)
3,dc27a74dce2a3fea125264863be02fea6e7e276a,c0250f358cae01c5319aeb7b39827e53f9a2259eb32e4c...,2018-04-19 12:15:00,2018-04-19 12:15:00,540,0.1,17031320000.0,17031080000.0,32,8,...,0.0,10.0,Credit Card,Blue Ribbon Taxi Association Inc.,41.884987,-87.620993,POINT (-87.6209929134 41.8849871918),41.893216,-87.637844,POINT (-87.6378442095 41.8932163595)
4,4549d2d605d9563a7e0e50c4b9d0b3336eb8cd4a,135f786cdda9db8da848725a7af2922b6886fbd43c609a...,2018-04-09 07:45:00,2018-04-09 08:45:00,3720,15.5,17031070000.0,17031980000.0,7,76,...,0.0,43.25,Cash,Star North Management LLC,41.922083,-87.634156,POINT (-87.6341560931 41.922082541),41.979071,-87.90304,POINT (-87.9030396611 41.9790708201)


In [5]:
def check_total_rows(df):
    """
    Count the number of all records.
    """ 
    count_rows = df.count()
    print ('Total data count is '+str(count_rows))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [82]:
check_total_rows(taxi_trips)

Total data count is 38282870


Schema of the dataset look like the following.
This matches the schema listed on the official website.

In [8]:
taxi_trips.printSchema()

root
 |-- trip_id: string (nullable = true)
 |-- taxi_id: string (nullable = true)
 |-- trip_start_timestamp: timestamp (nullable = true)
 |-- trip_end_timestamp: timestamp (nullable = true)
 |-- trip_seconds: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- pickup_census_tract: long (nullable = true)
 |-- dropoff_census_tract: long (nullable = true)
 |-- pickup_community_area: integer (nullable = true)
 |-- dropoff_community_area: integer (nullable = true)
 |-- fare: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- extras: double (nullable = true)
 |-- trip_total: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- company: string (nullable = true)
 |-- pickup_centroid_latitude: double (nullable = true)
 |-- pickup_centroid_longitude: double (nullable = true)
 |-- pickup_centroid_location: string (nullable = true)
 |-- dropoff_centroid_latitude: double (nullable = true)
 |-- dropoff_cen

In [9]:
# Dataset contains the follwong columns
taxi_trips.columns

['trip_id',
 'taxi_id',
 'trip_start_timestamp',
 'trip_end_timestamp',
 'trip_seconds',
 'trip_miles',
 'pickup_census_tract',
 'dropoff_census_tract',
 'pickup_community_area',
 'dropoff_community_area',
 'fare',
 'tips',
 'tolls',
 'extras',
 'trip_total',
 'payment_type',
 'company',
 'pickup_centroid_latitude',
 'pickup_centroid_longitude',
 'pickup_centroid_location',
 'dropoff_centroid_latitude',
 'dropoff_centroid_longitude',
 'dropoff_centroid_location']

In [4]:
# The check for missing values need to be performed
import pyspark.sql.functions as F


def check_missing_data(df, num_rows=1000 ,columns=None):
    """
    Prints number of missing data fields found for every column.
    If no set of columns specified, all columns checked
    """
 
    if columns is None:
        columns = df.columns
        
    for col in columns:
        count = df.select(col).limit(num_rows).withColumn('isNull_c',F.col(col).isNull()).where('isNull_c = True').count()
        print("Column %s contains missing data: %s" % (col, count))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [83]:
check_missing_data(taxi_trips)

Column trip_id contains missing data: 0
Column taxi_id contains missing data: 0
Column trip_start_timestamp contains missing data: 0
Column trip_end_timestamp contains missing data: 0
Column trip_seconds contains missing data: 0
Column trip_miles contains missing data: 0
Column pickup_census_tract contains missing data: 314
Column dropoff_census_tract contains missing data: 323
Column pickup_community_area contains missing data: 38
Column dropoff_community_area contains missing data: 65
Column fare contains missing data: 0
Column tips contains missing data: 0
Column tolls contains missing data: 0
Column extras contains missing data: 0
Column trip_total contains missing data: 0
Column payment_type contains missing data: 0
Column company contains missing data: 0
Column pickup_centroid_latitude contains missing data: 38
Column pickup_centroid_longitude contains missing data: 38
Column pickup_centroid_location contains missing data: 38
Column dropoff_centroid_latitude contains missing data

As stated on the official website, `Census Tract` is not shown for some trips. Or column can be blank if trips are outside of Chicago.
<br> Some of the coordinate data is missing, and this should be decided where to be preserved or to be cleaned.
<br> From random sample of 1000 rows, we can observe that dataset has at least 10% of rows with missing data.

For analysis this can be not very crucial, but for Machine Learning jobs this data may become vital.

We use different schemas for different departments later on.

In [84]:
# Show the summary of all columns
def show_summary(df, num_rows=1000):
    """
    Shows summary data (min, max, average, etc.) for each column
    """
    for col in df.columns:
        print("Summary for column %s" % col)
        taxi_trips.limit(num_rows).select(*[col]).describe().show()

show_summary(taxi_trips)

Summary for column trip_id
+-------+--------------------+
|summary|             trip_id|
+-------+--------------------+
|  count|                1000|
|   mean|                null|
| stddev|                null|
|    min|006e531f6de2d7022...|
|    max|ff2868907eaf6633b...|
+-------+--------------------+

Summary for column taxi_id
+-------+--------------------+
|summary|             taxi_id|
+-------+--------------------+
|  count|                1000|
|   mean|                null|
| stddev|                null|
|    min|008ca9f6e7dff925f...|
|    max|fe3f654450472e6aa...|
+-------+--------------------+

Summary for column trip_start_timestamp
+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+

Summary for column trip_end_timestamp
+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+

Summary for column trip_seconds
+-------+-----------------+
|summary|     trip_seconds|
+-------+-----------------+
|  count| 

#### Let us show unique companies that work in Chicago
There is also a sepatate datset that is collected by Uber and Lyft companies,
that only contains trip information, withoud taxi_id, company_name columns.

This data can be also used to for enrichment of our dataset.

In [85]:
taxi_trips.select("company").distinct().toPandas()

Unnamed: 0,company
0,3556 - 36214 RC Andrews Cab
1,Service Taxi Association
2,Metro Group
3,Chicago Taxicab
4,FlashCab
5,4053 - 40193 Adwar H. Nikola
6,Taxi Affiliation Services
7,American United Taxi Affiliation
8,5 Star Taxi
9,5006 - 39261 Salifu Bawa


#### Now we check how many taxi Cabs are registered in Chicago at the moment

In [86]:
number_of_cabs = taxi_trips.select("taxi_id").distinct().count()
print("There are %s cabs registered in Chicago city at the moment" % number_of_cabs)

There are 6645 cabs registered in Chicago city at the moment


# Step 2a: Loading Temperature Data

Temperature data in Chicago is a dataset of temperatures recoded for each day at  **CHICAGO OHARE INTERNATIONAL AIRPORT, IL US** station since 1954.
And this perfecly fits our requirements to gather weather conditions for each day in Chicago City.

The weather dataset can be found here: https://www.ncdc.noaa.gov/cdo-web/datasets

We will use a subset part of this dataset for the period from january 2018 till today, as we interested about most recent events.

The dataset is quite small and can be easily stored locally.

Weather data for one day looks like the following:
![Weather](./assets/Chicago_weather_sample.png)

In [5]:
# Step 2a: Loading Temperature Data in Chicago for the period from january 2018 till today
weather_df = spark.read.option("header", "true").option("inferschema", "true").csv(weather_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
weather_df.limit(5).toPandas()

Unnamed: 0,STATION,DATE,AWND,AWND_ATTRIBUTES,PRCP,PRCP_ATTRIBUTES,SNOW,SNOW_ATTRIBUTES,SNWD,SNWD_ATTRIBUTES,...,WT04,WT04_ATTRIBUTES,WT05,WT05_ATTRIBUTES,WT06,WT06_ATTRIBUTES,WT08,WT08_ATTRIBUTES,WT09,WT09_ATTRIBUTES
0,USW00094846,2018-01-01,4.8,",,W",0.0,",,W,2400",0.0,",,W",30.0,",,W,2400",...,,,,,,,,,,
1,USW00094846,2018-01-02,5.0,",,W",0.0,",,W,2400",0.0,",,W",30.0,",,W,2400",...,,,,,,,,,,
2,USW00094846,2018-01-03,5.5,",,W",0.0,"T,,W,2400",3.0,",,W",30.0,",,W,2400",...,,,,,,,1.0,",,W",,
3,USW00094846,2018-01-04,5.5,",,W",0.0,",,W,2400",0.0,",,W",30.0,",,W,2400",...,,,,,,,,,,
4,USW00094846,2018-01-05,4.8,",,W",0.0,",,W,2400",0.0,",,W",30.0,",,W,2400",...,,,,,,,,,,


In [7]:
weather_df.printSchema()

root
 |-- STATION: string (nullable = true)
 |-- DATE: timestamp (nullable = true)
 |-- AWND: double (nullable = true)
 |-- AWND_ATTRIBUTES: string (nullable = true)
 |-- PRCP: double (nullable = true)
 |-- PRCP_ATTRIBUTES: string (nullable = true)
 |-- SNOW: double (nullable = true)
 |-- SNOW_ATTRIBUTES: string (nullable = true)
 |-- SNWD: double (nullable = true)
 |-- SNWD_ATTRIBUTES: string (nullable = true)
 |-- TAVG: double (nullable = true)
 |-- TAVG_ATTRIBUTES: string (nullable = true)
 |-- TMAX: double (nullable = true)
 |-- TMAX_ATTRIBUTES: string (nullable = true)
 |-- TMIN: double (nullable = true)
 |-- TMIN_ATTRIBUTES: string (nullable = true)
 |-- WDF2: double (nullable = true)
 |-- WDF2_ATTRIBUTES: string (nullable = true)
 |-- WDF5: double (nullable = true)
 |-- WDF5_ATTRIBUTES: string (nullable = true)
 |-- WSF2: double (nullable = true)
 |-- WSF2_ATTRIBUTES: string (nullable = true)
 |-- WSF5: double (nullable = true)
 |-- WSF5_ATTRIBUTES: string (nullable = true)
 |--

The data contains too many columns, and we would like to use only the data about weather conditions and temperature of each day only.
<br>This may help us to predict taxi demand, price, and possible delays due to heavy traffic, etc.
<br>We would need to preprocess this data for future use.

In [107]:
check_missing_data(weather_df)

Column STATION contains missing data: 0
Column DATE contains missing data: 0
Column AWND contains missing data: 2
Column AWND_ATTRIBUTES contains missing data: 2
Column PRCP contains missing data: 1
Column PRCP_ATTRIBUTES contains missing data: 1
Column SNOW contains missing data: 1
Column SNOW_ATTRIBUTES contains missing data: 1
Column SNWD contains missing data: 1
Column SNWD_ATTRIBUTES contains missing data: 1
Column TAVG contains missing data: 0
Column TAVG_ATTRIBUTES contains missing data: 0
Column TMAX contains missing data: 1
Column TMAX_ATTRIBUTES contains missing data: 1
Column TMIN contains missing data: 1
Column TMIN_ATTRIBUTES contains missing data: 1
Column WDF2 contains missing data: 2
Column WDF2_ATTRIBUTES contains missing data: 2
Column WDF5 contains missing data: 3
Column WDF5_ATTRIBUTES contains missing data: 3
Column WSF2 contains missing data: 2
Column WSF2_ATTRIBUTES contains missing data: 2
Column WSF5 contains missing data: 3
Column WSF5_ATTRIBUTES contains miss

**Note!** Missing data in this dataset is expected. As many columns contain conditions data with `1` - when condition occured and `null` when conditions were normal.


In [108]:
check_total_rows(weather_df)

Total data count is 779


# Step 3: Define the Data Model

#### 3.1 Conceptual Data Model

The conceptual data model is going to extend existing Taxi Trips Dataset with Weather temperature collected for each given day in Chicago Ohara Airport. The weather data additionally contains weather conditions, that may be useful for prediction of various traveling patterns in Chicago city.

Initial Taxi Trips dataset contains **23** columns and weather dataset - **40** columns.

Given the combined data, we would be able to provide enough data that can be used for many use cases.


The following use-cases using created data models can be targeted:

* as a Taxi Driver, I would be able to choose better location for passenger pickups, during exact hour of the day, season, and weather conditions at the given moment.

* as a Data Analyst, I would be able to analyze data for each Trip, for each driver. I would be able to collect driver ratings based on number of trips per week/month and amount of tips collected.

* as a Machine Learning engineer, I would be able to use data to predict best places for passenger pickups, if tips will be given, if the passenger flow will increase at any given day, etc.

* Additionally, information that is collected for each Taxi Cab can help companies to decide if any repair works should be performed

* Payment patterns can be defined, e.g for any short/long trips

* Can ride-sharing opportunity be explored and in which area with with higher probability


And many other use cases can be supported.


#### 3.2 Mapping Out Data Pipelines

The weather data has been already downloaded for the period of our interest.
Dataset is quite small, as we are going to target information for period from 2018 until 21 February 2020.

The Taxi Data is also prefiltered to contain only data for period of our interest. We will take into account part of the dataset of around 39.000.000 rows.

* After data is loaded, we ensure that dataframes are valid. Checking the validity of schema have been done in Section 1.
* First, we would need to convert date and time column to DateType.
* For ease of joins between tables we extract information about day, year, hour, etc. This would allow us to easily join dataframes on time periods.
* Next, each taxi trip will be enriched with this data
* Community area data is extracted next. Each location coordinate point will be rounded to precision of 15 meters. thus allowing us to collect most popular passanger point in Chicago City.
* Extraction of data for each taxi cab is performed next. This data will contain a lot of useful information for any department and can be used to create helpful API for drivers, of passenders and help them to estimate cost for the ride.
* During the next step, we process the weather data and split it into two tables that should be used separately.
* Weather condition data is a helpful datset that can be ysed to enrich your data `on-demand`
* The main taxi dataset will be stored with hidden locations data included. As this may include a lot of insightful information later on. We simply cannot throw our 30% of such data.
* For wide range of analytic and `ad-hoc` use cases, we store preprocessed taxi trips dataset with no missing values present in them in a separate table.
* Finally, the main fact table will be created. The preprocessed taxi trips dataset with no missing values will be joined with weather data.

Next section will contain step by step guide through the ETL pipeline in much more detail.

# Step 4: Run Pipelines to Model the Data



## Step 4a. Taxi Data ETL

#### Data checks
Before each step we perform data check whether dataframes are valid.
However, one need to note here, that if full data is used ( more that 100 Gb) these check may drastically slow down the whole pipeline.
And should be performed for final tests only.

In [6]:
def check_dataframe_correctness(df):
    check = df.select(*[df.columns]).limit(5)
    if check.count() <= 0:
        raise ValueError("Dataframe cannot be empty. Check if Spark Session is still running")
    else:
        print("Dataframe is available")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
check_dataframe_correctness(taxi_trips)
check_dataframe_correctness(weather_df)

Dataframe is available
Dataframe is available


Next, we extract any useful information from timestamp field and join this data with *Taxi Data*.
<br>We will use this data for easier joining with *Weather Data* on **(year, dayofyear)** columns

In [7]:
from pyspark.sql.functions import *

col = taxi_trips.trip_start_timestamp
time_df = taxi_trips.select(year(col).alias('year'), month(col).alias('month'), dayofmonth(col).alias('day'), dayofyear(col).alias('dayofyear'), hour(col).alias('hour'), minute(col).alias('min'), weekofyear(col).alias('week_no'), unix_timestamp(col).alias('unix_ts'),"trip_id")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [89]:
time_df.limit(5).toPandas()

Unnamed: 0,year,month,day,dayofyear,hour,min,week_no,unix_ts,trip_id
0,2018,4,26,116,10,15,17,1524737700,3485f158f4e4151a50a3666701f1d6480712e344
1,2018,4,20,110,14,30,16,1524234600,c9ea510fdf7256077b5f33daa9f3362b631a19d6
2,2018,4,8,98,1,0,14,1523149200,e0203090822169816112bb888a0bf20de23f1705
3,2018,4,19,109,12,15,16,1524140100,dc27a74dce2a3fea125264863be02fea6e7e276a
4,2018,4,9,99,7,45,15,1523259900,4549d2d605d9563a7e0e50c4b9d0b3336eb8cd4a


In [8]:
# Now we will enrich main dataframe with extracted data
taxi_trips = taxi_trips.join(time_df, on="trip_id")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [91]:
taxi_trips.columns

['trip_id',
 'taxi_id',
 'trip_start_timestamp',
 'trip_end_timestamp',
 'trip_seconds',
 'trip_miles',
 'pickup_census_tract',
 'dropoff_census_tract',
 'pickup_community_area',
 'dropoff_community_area',
 'fare',
 'tips',
 'tolls',
 'extras',
 'trip_total',
 'payment_type',
 'company',
 'pickup_centroid_latitude',
 'pickup_centroid_longitude',
 'pickup_centroid_location',
 'dropoff_centroid_latitude',
 'dropoff_centroid_longitude',
 'dropoff_centroid_location',
 'year',
 'month',
 'day',
 'dayofyear',
 'hour',
 'min',
 'week_no',
 'unix_ts']

### Community Areas Table

This table contains collection of coordinates in each community area.

Coordinates can be easily plotted, or grouped to find the closest one to the taxi driver of to find closes taxi cab to the passenger.

This table won't have unique PRIMARY Key and further this would need autoincrement id, if added to relational database.

Nexy, we would need to round coordinate points,
<br>as we need to know only location with precision up to 20-50 meters for most use cases.


In [10]:
# we will use round function here from pyspark.sql.functions

from pyspark.sql.functions import col

PRECISION = 4  #individual street precision, 15 meters
community_areas_df = taxi_trips.select(
                col("pickup_community_area").alias("community_area"), 
                round(taxi_trips["pickup_centroid_latitude"],PRECISION).alias("centroid_latitude"),
                round(taxi_trips["pickup_centroid_longitude"],PRECISION).alias("centroid_longitude")
            )
community_areas_df.dropDuplicates(subset = ["centroid_latitude", "centroid_longitude"])

DataFrame[community_area: int, centroid_latitude: double, centroid_longitude: double]

In [93]:
community_areas_df.limit(5).toPandas()

Unnamed: 0,community_area,centroid_latitude,centroid_longitude
0,8,41.8992,-87.6262
1,32,41.881,-87.6327
2,28,41.8853,-87.6428
3,8,41.8983,-87.6208
4,6,41.9442,-87.656


### Cabs Table

Collect data relevant for each taxi cab. So data can be easily grouped and searched by standalone cab or driver.

In [94]:
cabs_columns = ['trip_id', 'taxi_id',                          # taxi data
                'trip_start_timestamp', 'trip_end_timestamp',  # trip time
                'trip_seconds', 'trip_miles',                  # trip time
                'fare', 'tips', 'tolls', 'extras',             # cost data
                'trip_total', 'payment_type',                  # fare data
                'company'                                      # company where taxi cab is registered
                ]

In [95]:
cabs_df = taxi_trips.select(*[cabs_columns])


In [44]:
# How cabs data look like
cabs_df.limit(5).toPandas()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,fare,tips,tolls,extras,trip_total,payment_type,company
0,000040bde022bec832268c32707fd802c0a3d956,adb0da2862924ceb2be34ffd2eab0a097a8e6594c39415...,2018-06-14 10:00:00,2018-06-14 10:15:00,600,0.8,7.25,3.0,0.0,0.0,10.25,Credit Card,Star North Management LLC
1,000b5c4c37a2c154b6f703803068625c67f81017,0423b61dd9d4f4da3f4490e3dfed085a321aa78d527286...,2019-09-15 00:00:00,2019-09-15 00:15:00,1020,0.1,11.0,0.0,0.0,0.0,11.0,Cash,Taxi Affiliation Services
2,000d50f70b61120f8342e039410a6ce077db84d2,839c8a07639d9477fe43f3e2b7be014a3deead1930460c...,2019-08-24 23:45:00,2019-08-24 23:45:00,478,1.35,7.25,0.0,0.0,0.0,7.25,Cash,Chicago Carriage Cab Corp
3,000f9ef866c0f120314ceb0d9290e71e7c23fd6a,e93472aad9e00c0a523e1a861ae897303548d713ba63d3...,2019-08-01 05:30:00,2019-08-01 06:00:00,1657,2.64,15.25,0.0,0.0,0.0,15.25,Cash,Flash Cab
4,0010ea18098781e411e7ea4f92e400e7058de94c,93930278eb7df9672cdc277c92e59dae0f82116868ce1c...,2018-06-15 20:45:00,2018-06-15 20:45:00,420,0.0,6.25,0.0,0.0,1.5,7.75,Cash,Taxi Affiliation Services


### General Taxi Trips dataset

This table will contain enriched Chicago taxi Dataset, and can be used for vast amount of use cases.

Next, we drop the columns that have no relevant information and, in general, that contain duplicate data.

In [27]:
# Dataset has columns that contains POINT objects as string values. This can be filtered out for now

taxi_trips.select(*["pickup_centroid_location", "dropoff_centroid_location"]).limit(5).toPandas()

Unnamed: 0,pickup_centroid_location,dropoff_centroid_location
0,POINT (-87.6327464887 41.8809944707),POINT (-87.6209929134 41.8849871918)
1,POINT (-87.642648998 41.8792550844),POINT (-87.6207628651 41.8983317935)
2,POINT (-87.6207628651 41.8983317935),POINT (-87.6317173661 41.9146162864)
3,POINT (-87.6207628651 41.8983317935),POINT (-87.6209929134 41.8849871918)
4,POINT (-87.6317173661 41.9146162864),POINT (-87.6308650266 41.9058577688)


In [28]:
taxi_trips.limit(5).toPandas()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_census_tract,dropoff_census_tract,pickup_community_area,dropoff_community_area,...,dropoff_centroid_longitude,dropoff_centroid_location,year,month,day,dayofyear,hour,min,week_no,unix_ts
0,000040bde022bec832268c32707fd802c0a3d956,adb0da2862924ceb2be34ffd2eab0a097a8e6594c39415...,2018-06-14 10:00:00,2018-06-14 10:15:00,600,0.8,17031839100,17031320100,32,32,...,-87.620993,POINT (-87.6209929134 41.8849871918),2018,6,14,165,10,0,24,1528970400
1,000b5c4c37a2c154b6f703803068625c67f81017,0423b61dd9d4f4da3f4490e3dfed085a321aa78d527286...,2019-09-15 00:00:00,2019-09-15 00:15:00,1020,0.1,17031281900,17031081300,28,8,...,-87.620763,POINT (-87.6207628651 41.8983317935),2019,9,15,258,0,0,37,1568505600
2,000d50f70b61120f8342e039410a6ce077db84d2,839c8a07639d9477fe43f3e2b7be014a3deead1930460c...,2019-08-24 23:45:00,2019-08-24 23:45:00,478,1.35,17031081300,17031071500,8,7,...,-87.631717,POINT (-87.6317173661 41.9146162864),2019,8,24,236,23,45,34,1566690300
3,000f9ef866c0f120314ceb0d9290e71e7c23fd6a,e93472aad9e00c0a523e1a861ae897303548d713ba63d3...,2019-08-01 05:30:00,2019-08-01 06:00:00,1657,2.64,17031081300,17031320100,8,32,...,-87.620993,POINT (-87.6209929134 41.8849871918),2019,8,1,213,5,30,31,1564637400
4,0010ea18098781e411e7ea4f92e400e7058de94c,93930278eb7df9672cdc277c92e59dae0f82116868ce1c...,2018-06-15 20:45:00,2018-06-15 20:45:00,420,0.0,17031071500,17031080202,7,8,...,-87.630865,POINT (-87.6308650266 41.9058577688),2018,6,15,166,20,45,24,1529095500


Let's extract these columns from all available columns

In [9]:
def diff(first, second):
    """
    Function used to exclude `second` list from `first` list.
    Resulting in reduced list.
    
    use-case: filtering out several colums from larger number of columns
    """
    second = set(second)
    return [item for item in first if item not in second]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
general_taxi_columns = diff(taxi_trips.columns, ["pickup_centroid_location", "dropoff_centroid_location", "min", "week_no", "unix_ts"])
general_taxi_columns


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['trip_id', 'taxi_id', 'trip_start_timestamp', 'trip_end_timestamp', 'trip_seconds', 'trip_miles', 'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area', 'dropoff_community_area', 'fare', 'tips', 'tolls', 'extras', 'trip_total', 'payment_type', 'company', 'pickup_centroid_latitude', 'pickup_centroid_longitude', 'dropoff_centroid_latitude', 'dropoff_centroid_longitude', 'year', 'month', 'day', 'dayofyear', 'hour']

In [11]:
# Filter out POINT columns, and columns that does not provide value to the data

filtered_taxi_trips = taxi_trips.select(*general_taxi_columns)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
# Check and show the columns available
filtered_taxi_trips.limit(5).toPandas()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_census_tract,dropoff_census_tract,pickup_community_area,dropoff_community_area,...,company,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_centroid_latitude,dropoff_centroid_longitude,year,month,day,dayofyear,hour
0,000040bde022bec832268c32707fd802c0a3d956,adb0da2862924ceb2be34ffd2eab0a097a8e6594c39415...,2018-06-14 10:00:00,2018-06-14 10:15:00,600,0.8,17031839100,17031320100,32,32,...,Star North Management LLC,41.880994,-87.632746,41.884987,-87.620993,2018,6,14,165,10
1,000b5c4c37a2c154b6f703803068625c67f81017,0423b61dd9d4f4da3f4490e3dfed085a321aa78d527286...,2019-09-15 00:00:00,2019-09-15 00:15:00,1020,0.1,17031281900,17031081300,28,8,...,Taxi Affiliation Services,41.879255,-87.642649,41.898332,-87.620763,2019,9,15,258,0
2,000d50f70b61120f8342e039410a6ce077db84d2,839c8a07639d9477fe43f3e2b7be014a3deead1930460c...,2019-08-24 23:45:00,2019-08-24 23:45:00,478,1.35,17031081300,17031071500,8,7,...,Chicago Carriage Cab Corp,41.898332,-87.620763,41.914616,-87.631717,2019,8,24,236,23
3,000f9ef866c0f120314ceb0d9290e71e7c23fd6a,e93472aad9e00c0a523e1a861ae897303548d713ba63d3...,2019-08-01 05:30:00,2019-08-01 06:00:00,1657,2.64,17031081300,17031320100,8,32,...,Flash Cab,41.898332,-87.620763,41.884987,-87.620993,2019,8,1,213,5
4,0010ea18098781e411e7ea4f92e400e7058de94c,93930278eb7df9672cdc277c92e59dae0f82116868ce1c...,2018-06-15 20:45:00,2018-06-15 20:45:00,420,0.0,17031071500,17031080202,7,8,...,Taxi Affiliation Services,41.914616,-87.631717,41.905858,-87.630865,2018,6,15,166,20


In [99]:
single_day_trip_count=filtered_taxi_trips.filter((filtered_taxi_trips['dayofyear']==165) & (filtered_taxi_trips['year']==2018)).count()
print(" %s trips happened on 14 June of 2018" % single_day_trip_count)

 68849 trips happened on 14 June of 2018


In [51]:
# [Optional!] Casting to Timestamp

from pyspark.sql.types import DateType

taxi_trips = taxi_trips.select(*[taxi_trips.columns]).withColumn("record_date",taxi_trips["trip_end_timestamp"].cast(DateType()))

In [52]:
# [Optional!] Convert column to unix timestamp
from pyspark.sql.functions import unix_timestamp

taxi_trips = taxi_trips.withColumn(
    "start_time", unix_timestamp("trip_start_timestamp", "yyyy-MM-dd HH:mm:ss")
        ).withColumn(
    "end_time", unix_timestamp("trip_end_timestamp", "yyyy-MM-dd HH:mm:ss")
)

## Step 4b. Weather Data ETL

Next, we split weather data into separate tables that will contain the most useful information and additional information about weather conditions.

In [12]:
def extract_time_data(df, date_column, unique_column):
    """
    Extract year, dayofyear and month from timestamp field
    
    Returns: new dataset that can be joined by @unique_column
    """
    col = date_column
    days_df = df.select(year(col).alias('year'),  dayofyear(col).alias('dayofyear'), weekofyear(col).alias('month'), unique_column)
    return days_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now we can join extrated time data with weather data

In [13]:
days = extract_time_data(weather_df, date_column="DATE", unique_column="DATE")
weather_df = weather_df.join(days, on="DATE")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Here, a set of columns are defined that will represent separate tables.
<br>Tables are created next, as separate dataframes. 

In [14]:
join_columns = ["year", "dayofyear", "month"]
weather_temp_columns = ["DATE", "AWND", "PRCP", "SNOW", "SNWD", "TMAX", "TMIN", "TAVG"] + join_columns
weather_conditions = ["DATE", "WT01", "WT02", "WT03", "WT04", "WT05", "WT09"] + join_columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Final **temperature** table looks like the following.

In [15]:
temperature_df = weather_df.select(*[weather_temp_columns])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
temperature_df.limit(5).toPandas()

Unnamed: 0,DATE,AWND,PRCP,SNOW,SNWD,TMAX,TMIN,TAVG,year,dayofyear,month
0,2018-01-01,4.8,0.0,0.0,30.0,-17.1,-22.7,-18.9,2018,1,1
1,2018-01-02,5.0,0.0,0.0,30.0,-13.2,-22.7,-19.1,2018,2,1
2,2018-01-03,5.5,0.0,3.0,30.0,-8.2,-14.3,-11.4,2018,3,1
3,2018-01-04,5.5,0.0,0.0,30.0,-11.0,-17.7,-13.9,2018,4,1
4,2018-01-05,4.8,0.0,0.0,30.0,-11.6,-18.2,-15.2,2018,5,1


Weather conditions data will contain many null values, but this is expected, 
<br>as this would indicate that weather was normal that day

In [114]:

weather_df.select(*[weather_conditions]).limit(5).toPandas()

Unnamed: 0,DATE,WT01,WT02,WT03,WT04,WT05,WT09,year,dayofyear,month
0,2018-01-01,,,,,,,2018,1,1
1,2018-01-02,,,,,,,2018,2,1
2,2018-01-03,1.0,,,,,,2018,3,1
3,2018-01-04,,,,,,,2018,4,1
4,2018-01-05,,,,,,,2018,5,1


In [16]:
# Renaming several columns to have better representation of data
conditions_df = weather_df.selectExpr("DATE", "year", "dayofyear", "month", "WT01 as FOG", "WT02 as HEAVY_FOG", "WT03 as THNDR", "WT04 as ICE", "WT05 as HAIL", "WT09 as HVSNOW")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
conditions_df.printSchema()

root
 |-- DATE: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- dayofyear: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- FOG: double (nullable = true)
 |-- HEAVY_FOG: double (nullable = true)
 |-- THNDR: double (nullable = true)
 |-- ICE: double (nullable = true)
 |-- HAIL: double (nullable = true)
 |-- HVSNOW: double (nullable = true)



In [21]:
conditions_df.limit(5).toPandas()

Unnamed: 0,DATE,year,dayofyear,month,FOG,HEAVY_FOG,THNDR,ICE,HAIL,HVSNOW
0,2018-01-01,2018,1,1,,,,,,
1,2018-01-02,2018,2,1,,,,,,
2,2018-01-03,2018,3,1,1.0,,,,,
3,2018-01-04,2018,4,1,,,,,,
4,2018-01-05,2018,5,1,,,,,,


**Possible improvements**

Data loaded was implicitly casted to appropriate type, that may allow us to extract day, weekday, hour, etc.
<br>But weather data we use has coarse granularity, so it can be left for future improvements, or for datset that is updated evenry 15 minutes.
<br>For our goals, the given dataset is more than enough. 

### Creating Main Fact Table

To create Fact table, we join temperature dataframe and overall taxi dataframe.
<br>We also skip missing data, thus we only left with rows that completely filled with data.

We use SQL queries in this section, as this will simplify working with large amount of colums at once.

In [17]:
filtered_taxi_trips.createOrReplaceTempView("trips")
temperature_df.createOrReplaceTempView("temps")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Next, we filter out missing location data.
<br> **At least 30%** of Census Tract data were hidden from the dataset due to privacy reasons, however filtering missing data will reduce amount of data for analysis drastically.
<br> We are interested in performing location suggestion with precision to block or individual street.
<br> For now we will not completely filter out `census` columns.
<br> Additional filters can be added on demand with: 
```sql
WHERE pickup_census_tract IS NOT NULL 
AND dropoff_census_tract IS NOT NULL 
AND dropoff_community_area IS NOT NULL
```

In [18]:
taxi_trips_cleaned = filtered_taxi_trips.filter((filtered_taxi_trips['pickup_centroid_latitude'].isNotNull()) & \
                            (filtered_taxi_trips['pickup_centroid_longitude'].isNotNull()) & \
                            (filtered_taxi_trips['dropoff_centroid_latitude'].isNotNull()) & \
                            (filtered_taxi_trips['dropoff_centroid_longitude'].isNotNull())                         )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# The same filtering call using SQL. As I noticed, more efficient on a big cluster, 
# na dmay fail on a small cluster due to out of memory of DAG taks exceptions 

taxi_trips_cleaned = spark.sql("""SELECT * FROM trips 
                    WHERE pickup_centroid_latitude IS NOT NULL
                    AND pickup_centroid_longitude IS NOT NULL
                    AND dropoff_centroid_latitude IS NOT NULL
                    AND dropoff_centroid_longitude IS NOT NULL
                """)

In [19]:
taxi_trips_cleaned.limit(5).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+-------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+-----+----+-----+------+----------+------------+--------------------+------------------------+-------------------------+-------------------------+--------------------------+----+-----+---+---------+----+
|             trip_id|             taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles|pickup_census_tract|dropoff_census_tract|pickup_community_area|dropoff_community_area| fare|tips|tolls|extras|trip_total|payment_type|             company|pickup_centroid_latitude|pickup_centroid_longitude|dropoff_centroid_latitude|dropoff_centroid_longitude|year|month|day|dayofyear|hour|
+--------------------+--------------------+--------------------+-------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+-----+----

**Note!** Run the checks if you really need to. This will trigger reshuffling of the data and count missing vlues over all rows.
<br> It may take a very long time to finish if dataset is big.

In [None]:
# Checking the number of rows before processing further
check_total_rows(taxi_trips_cleaned)

# Here we can see if dataset was cleaned from missing values
check_missing_data(taxi_trips_cleaned)

As we see now if all missing values were to be cleaneed, the dataset is reduced to 60% of the originally available dataset.

Cleaning only missing coordinate data will keep 85-90% of the dataset.


In [66]:
taxi_trips_cleaned.columns

['trip_id',
 'taxi_id',
 'trip_start_timestamp',
 'trip_end_timestamp',
 'trip_seconds',
 'trip_miles',
 'pickup_census_tract',
 'dropoff_census_tract',
 'pickup_community_area',
 'dropoff_community_area',
 'fare',
 'tips',
 'tolls',
 'extras',
 'trip_total',
 'payment_type',
 'company',
 'pickup_centroid_latitude',
 'pickup_centroid_longitude',
 'dropoff_centroid_latitude',
 'dropoff_centroid_longitude',
 'year',
 'month',
 'day',
 'dayofyear',
 'hour']

In [67]:
temperature_df.columns

['DATE',
 'AWND',
 'PRCP',
 'SNOW',
 'SNWD',
 'TMAX',
 'TMIN',
 'TAVG',
 'year',
 'dayofyear']

In [45]:
fact_table = taxi_trips_cleaned.join(temperature_df, on = ["year","dayofyear"])

In [46]:
fact_table.limit(5).toPandas()

Unnamed: 0,year,dayofyear,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_census_tract,dropoff_census_tract,...,day,hour,DATE,AWND,PRCP,SNOW,SNWD,TMAX,TMIN,TAVG
0,2018,112,0c56851ee59396edbf08703fd101cad2661b61b3,5e00738ed97ac63a803381fb963c3b05e2b2f332cd707f...,2018-04-22 16:45:00,2018-04-22 16:45:00,360,0.0,17031081300,17031081600,...,22,16,2018-04-22,4.7,0.0,0.0,0.0,16.7,5.6,10.1
1,2018,112,168038b6eb015565e2769d9e79f817434318861a,341d6e8bdcbbd41081743ae65c0b6cc962bb42c225971b...,2018-04-22 12:00:00,2018-04-22 12:15:00,300,0.0,17031081500,17031839100,...,22,12,2018-04-22,4.7,0.0,0.0,0.0,16.7,5.6,10.1
2,2018,112,20bf0fd9f7efc533b3e54dd9b1f0703b57969b55,f7bec4705b1d00364fdc119b2662a2b68a51e9d88c5f95...,2018-04-22 10:45:00,2018-04-22 11:00:00,540,2.0,17031080300,17031839100,...,22,10,2018-04-22,4.7,0.0,0.0,0.0,16.7,5.6,10.1
3,2018,112,235f71e1f7a0310c2bccbc6dba2330f57e99d2fe,97d540f40171ccb4795908136c5d5139aa925e1aedcc41...,2018-04-22 12:00:00,2018-04-22 12:00:00,720,0.0,17031839100,17031330100,...,22,12,2018-04-22,4.7,0.0,0.0,0.0,16.7,5.6,10.1
4,2018,112,2b88cec89c290f6beb44c0467a8666d5424a91bf,8b07f9156e568a37d362463c84dbd1118b4eeb753bae50...,2018-04-22 14:00:00,2018-04-22 14:45:00,2700,17.5,17031081403,17031980000,...,22,14,2018-04-22,4.7,0.0,0.0,0.0,16.7,5.6,10.1


Finally, the Fact Table can be created that summarizes all data of interest

In [None]:
# This is an example of the join performed above with Spark SQL
taxi_trips_cleaned.createOrReplaceTempView("new_trips")
temperature_df.createOrReplaceTempView("temps")

fact_table = spark.sql("""SELECT * FROM new_trips nt 
                 INNER JOIN temps ON nt.year = temps.year AND nt.dayofyear = temps.dayofyear""")


In [None]:
fact_table.limit(5).show()

In [48]:
check_dataframe_correctness(fact_table)

Dataframe is available


# Step 4c. Saving data in S3 Data Lake

Data lake will be located in S3 bucket and folder will constitute names for separate data models.

In [50]:
# s3 output data location

data_lake_name = "/storage/chicago-extended-taxi-data-lake-capstone/"

Extended Chicago taxi data, can be used for machine Learning w/o filtering by hidden `Census tract` fiedlds (pickup location, dropoff location)

In [75]:
# we going to partition data by day and year, as there are 
taxi_trips.write.mode("append").partitionBy(["year","dayofyear"]).parquet(data_lake_name + "/data/taxi_trips/")

Collected data for each taxi cab, with detailed information for each ride

In [86]:
cabs_df.write.mode("append").partitionBy(["taxi_id"]).parquet(data_lake_name + "/data/cabs/")

Collection of popular pickup locations in each Community Area

In [87]:
community_areas_df.write.mode("append").partitionBy("community_area").parquet(data_lake_name + "/data/locations/")

Temperature and conditions data

In [61]:
temperature_df.write.mode("overwrite").partitionBy(["year","month"]).parquet(data_lake_name + "/data/temperature/")

In [63]:
conditions_df.write.mode("overwrite").partitionBy(["year","month"]).parquet(data_lake_name + "/data/weather_conditions/")

In [64]:
weather_df.write.mode("overwrite").partitionBy(["year","month"]).parquet(data_lake_name + "/data/full_weather_data/")

Final Data Lake Fact Table

In [65]:
fact_table.write.mode('append').partitionBy(["year","dayofyear"]).parquet(data_lake_name + "/data/extended_chicago_taxi_dataset/")

#### Step 4.2. Data Quality Checks

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In each section the number of lines loaded is checked, and also before and after processing and joining the dataframes.

Additionally, datasets are checked for missing values and datasets that cannot contain missing values are processed accordingly.

Data integrity is checked during first step, to show overall summary of each column.

Example call are described below. This can be run for the final dataaset of the each dataset of interest.

In [None]:
check_dataframe_correctness(fact_table)

check_missing_data(taxi_trips_cleaned)

check_total_rows(fact_table)

#### 4.3 Data dictionary 

Next line will ensure that tables are rendered correctly

In [24]:
%%html
<style>
  table {margin-left: 0 !important;}
</style>

### Main Fact Table
Cleaned Taxi Trip data joined with Weather Data (temperature and amount of Precipitation, Snow, etc.)

Colums of **extended_chicago_taxi_rides** table:

 |column| type| description
 |:-----|:----|:-----------
 |trip_id| string|  A unique identifier for the trip.
 |taxi_id| string| A unique identifier for the taxi.
 |trip_start_timestamp| timestamp| When the trip started, rounded to the nearest 15 minutes.
 |trip_end_timestamp| timestamp| When the trip ended, rounded to the nearest 15 minutes.
 |day_of_week| integer| Day of week when trip started
 |month|int| Month when trip started
 |hour|int| Hour when trip started. As data rounded to 15 minutes, we don't need more precision
 |trip_seconds| integer| Time of the trip in seconds.
 |trip_miles| double|  Distance of the trip in miles.
 |pickup_census_tract| long| The Census Tract where the trip began.
 |dropoff_census_tract| long | The Census Tract where the trip ended.
 |pickup_community_area| integer | The Community Area where the trip began
 |dropoff_community_area| integer | The Community Area where the trip ended. 
 |fare| double | The fare for the trip.
 |tip| integer | The tip for the trip. Cash tips generally will not be recorded.
 |additional_charges| double | The tolls for the trip.
 |extra| double |Extra charges for the trip.
 |trip_total| double | Total cost of the trip, the total of the previous columns.
 |payment_type| string| Type of payment for the trip.
 |taxi_company| string | The taxi company.
 |pickup_centroid_latitude| double | The latitude of the center of the pickup census tract.
 |pickup_centroid_longitude| double | The longitude of the center of the pickup census tract.
 |dropoff_centroid_latitude| double | The latitude of the center of the dropoff census tract.
 |dropoff_centroid_longitude| double | The longitude of the center of the dropoff census tract.
 |AWND | double    | Average wind speed
 |PRCP | double    | Precipitation
 |SNOW | double    | Snowfall
 |SNWD | double    | Snow depth
 |TMAX | double    | Maximum temperature
 |TMIN | double    | Minimum temperature
 |TAVG | double    | Average temperature

### 1st Dimension Table
Table will contain information about Taxi Cabs, trips they performed and general infofrmation about each trip.

Colums of **cabs** table:

 |column| type| description
 |:-----|:----|:-----------
 |taxi_id| string| A unique identifier for the taxi.
 |trip_id| string|  A unique identifier for the trip.
 |trip_start_timestamp| timestamp| When the trip started, rounded to the nearest 15 minutes.
 |trip_end_timestamp| timestamp| When the trip ended, rounded to the nearest 15 minutes.
 |trip_seconds| integer| Time of the trip in seconds.
 |trip_miles| double|  Distance of the trip in miles.
 |pickup_community_area| integer | The Community Area where the trip began
 |dropoff_community_area| integer | The Community Area where the trip ended. 
 |fare| double | The fare for the trip.
 |tip| integer | The tip for the trip. Cash tips generally will not be recorded.
 |additional_charges| double | The tolls for the trip.
 |extra| double |Extra charges for the trip.
 |trip_total| double | Total cost of the trip, the total of the previous columns.
 |payment_type| string| Type of payment for the trip.
 |taxi_company| string | The taxi company.
 
 
### 2nd Dimension Table
Table that collects popular locations in each community area in Chicago city.

May be used by Taxi Drivers to choose better location for pickups during time of the day and weather conditions at the given moment.

Colums of **community_areas** table:

 |column| type| description
 |:-----|:----|:-----------
 | community_area| integer | The Community Area id
 | centroid_latitute | double | Latitude of the location inside the Community Area (rounded to 20 meters)
 | centroid_longitude | double | Longitude of the location inside the Community Area (rounded to 20 meters)


### 3d Dimension Table for Machine Learning Team
Table with taxi data with missing values for `Census Tract` and `Location`

Colums of **taxi_data** table:

 |column| type| description
 |:-----|:----|:-----------
 |trip_id| string|  A unique identifier for the trip.
 |taxi_id| string| A unique identifier for the taxi.
 |trip_start_timestamp| timestamp| When the trip started, rounded to the nearest 15 minutes.
 |trip_end_timestamp| timestamp| When the trip ended, rounded to the nearest 15 minutes.
 |trip_seconds| integer| Time of the trip in seconds.
 |trip_miles| double|  Distance of the trip in miles.
 |pickup_census_tract| long| The Census Tract where the trip began.
 |dropoff_census_tract| long | The Census Tract where the trip ended.
 |pickup_community_area| integer | The Community Area where the trip began
 |dropoff_community_area| integer | The Community Area where the trip ended. 
 |fare| double | The fare for the trip.
 |tip| integer | The tip for the trip. Cash tips generally will not be recorded.
 |additional_charges| double | The tolls for the trip.
 |extra| double |Extra charges for the trip.
 |trip_total| double | Total cost of the trip, the total of the previous columns.
 |payment_type| string| Type of payment for the trip.
 |taxi_company| string | The taxi company.
 |pickup_centroid_latitude| double | The latitude of the center of the pickup census tract or the community area if the census tract has been hidden for privacy. This column often will be blank for locations outside Chicago.
 |pickup_centroid_longitude| double | The longitude of the center of the pickup census tract or the community area if the census tract has been hidden for privacy. This column often will be blank for locations outside Chicago.
 |pickup_centroid_location| string | The location of the center of the pickup census tract or the community area if the census tract has been hidden for privacy. This column often will be blank for locations outside Chicago.
 |dropoff_centroid_latitude| double | The latitude of the center of the dropoff census tract or the community area if the census tract has been hidden for privacy. This column often will be blank for locations outside Chicago.
 |dropoff_centroid_longitude| double | The longitude of the center of the dropoff census tract or the community area if the census tract has been hidden for privacy. This column often will be blank for locations outside Chicago.
 |dropoff_centroid_location| string | The location of the center of the dropoff census tract or the community area if the census tract has been hidden for privacy. This column often will be blank for locations outside Chicago.

### Secondary Weather Fact Table
Table about weather data and data conditions.

Joined table of **1st Weather Dimension Table** and **2nd Weather Dimension Table**, both described below.

Table schema left out for readability and to avoid duplication.

Colums of **weather** table - joined columns of **temperature** and **conditions** tables by DATE coulmn.

This table is not of particular interest for our needs, as mostly we are going to work with tables defined below

### 1st Weather Dimension Table
Table that contains temperature and precipitation data

Colums of **temperatures** table:

|column | type  | description |
|:-------|:-------|:-|
|DATE | timestamp | Date of measurements
|AWND | double    | Average wind speed
|PRCP | double    | Precipitation
|SNOW | double    | Snowfall
|SNWD | double    | Snow depth
|TMAX | double    | Maximum temperature
|TMIN | double    | Minimum temperature
|TAVG | double    | Average temperature


### 2nd Weather Dimension Table
Table with weather conditions data for each day

Colums of **conditions** table:

column | type  | description |
:------|:-------|:-|
DATE| timestamp| Date of measurements
FOG| int | Fog during the  day
HEAVY_FOG| int | Heavy or freezing fog during the day
THNDR| int | Thunder during the day
ICE| int | Ice pellets, sleet, snow pellets, or small hail
HAIL| int| Hail during the day
HVSNOW|int | Blowing or drifting snow


#### Step 5: Complete Project Write Up

* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Technologies
* Python: Best choice for exploratory data analysis and integration between various technologies.
* Spark (EMR cluster): for running ETL Pipelines on large datasets (20 Gb).
* S3: fast and highly available storage solution. Can be easily integrated with Redshift or NoSQL database, such as Cassanda (as our data models can by tweaked for exact queries that need to be performed on data).

This example was run in EMR cluster, as well as in local cluster.

EMR clusters, provider better scalability and require much less effort to setup infrastructure correctly.

#### Update Cycles

Taxi Trips: An official dataset is updated monthly, so it would make sense to update data on a monthly basis and append to existing tables.

However if similar datset is available 24/7, it would make sense to update it at least daily (or weekly), to provide as much useful insights for passengers, cab drivers, taxi companies as well as for Chicago Transport official institutions.

Weather: can be updated daily, however monthly updates is enough (and this will match updates to main taxi dataset), as dataset is small but *wide* (with many columns that provide additional information). 


#### Scenarios
**The data was increased by 100x**
<br>To be able to query data faster, more nodes to Spark cluster should be added to handle the load.
<br>Another option is to set up a Streaming solutio to ingest and buffer the incoming trid data.
<br>This can be also achieved with introducting additional Airflow Pipelines that would process only next incoming batch of data (e.g. on hourly or daily schedule), and store it in S3 that can be further queried with Spark cluster (EMR) with less nodes.

**The data populates a dashboard that must be updated on a daily basis by 7am every day**
<br>Solution is similar to previous one. 
<br>A nightly job, for example, can be triggered in airflow that would run necessary pipeline and populate the dashboard.

**The database needed to be accessed by 100+ people**
<br>Using distributed database that can handle large number of concurrent connections is the way to go here.

A very good use case here would be Redshift with several nodes, with at least 16 CPUs each.
<br> Cassandra is might be even better, as data model defined in this project would allow to run queries on demand and database will scale automatically due to discributed nature of Cassandra (of course if cluster with at least 10 nodes is set).
