# Citi Bikes in New York City
### Data Engineering Capstone Project

#### Project Summary
This project designs an analytical dataware house and corresponding ETLs for the **bicycle sharing system *Citi Bikes*** in New York City. The project combines **bike trip data** provided *Citi Bikes* with **weather data of New York City**. The data warehouse is designed to enable data analysts and data scientist of *Citi Bikes* to perform different analysis on usage data especially in relation to weather impact.

---

This project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 

The scope of this project is to design a hypothetical single-source-of-truth database for usage data of the **bicycle sharing system *Citi Bikes*** in New York City. Using Python and Spark **trip data provided by *City Bikes* on S3** and ***weather data provided by the *Open-Meteo* API*** is merged into a relational database. Potential data scientists using the database should be able to answer various different questions: Does bad weather influence rental numbers? What are the popular routes taken on a given day of the week by customers? 

#### Data 

The data model consists of bike trip data and weather data.

#### 1.) Trip data

*Citi Bikes* provides its bike trip as open source data (https://ride.citibikenyc.com/system-data). For this project the 2016 data is used. The data is stored in monthly batches on a public S3 bucket. Regarding to content the data contains all bike trips taken by *Citi Bikes* customers in 2016 in New York City. Informations on these trips include - among other things - demographical customer data, start point and end point of the trip. Each monthly batch contains more than around 500,000 bike trips.

#### 2.) Weather data

As weather usually has a big impact on bike usage, daily 2016 weather data for New York City is included the database. The data is queried from the open-source weather API *Open-Meteo* (https://open-meteo.com/). It includes daily information on like maximum temperature or hours of precipitation.

#### Setup

Importing all needed packages and defining reoccuring generic functions.

In [1]:
# Import of need packages
import pandas as pd
from datetime import datetime
import os  
import glob
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col

# Generic function returning current time for logging
def current_time():
    """
    Return current time in hh:mm:ss
    """
    current_time = datetime.now().strftime("%H:%M:%S")
    return(current_time)

#### Bike trip data

The 2016 *Citi Bikes* trip data is queried from S3 and safed as a .csv in /stage.

In [2]:
# list of all 2016 New York City citibike tripdata files on S3. 
# In order to keep the script running smoothly only data from Q1 is loaded.

trip_data_urls = ['https://s3.amazonaws.com/tripdata/201601-citibike-tripdata.zip',
                  'https://s3.amazonaws.com/tripdata/201602-citibike-tripdata.zip',
                  'https://s3.amazonaws.com/tripdata/201603-citibike-tripdata.zip',
                 ]

print(current_time(), '- Starting to parse', len(trip_data_urls), 'S3 urls')

# Create a staging-folder if it not already exists
os.makedirs('stage/trip_data', exist_ok=True)  

# Looping all S3 files: Reading, unzip and save as a .csv in /stage
for elem in trip_data_urls:
    
    # read file from S3
    print(current_time(), "- Reading", elem)
    df = pd.read_csv(elem, compression='zip', header=0, sep=',', quotechar='"')
    
    # write file to .csv in stage
    write_to_path = 'stage/trip_data/' + elem.split('/')[-1].split('.')[0] + '.csv'
    print(current_time(), "- Writing", write_to_path)
    df.to_csv(write_to_path)
    
print(current_time(), '- Finished to parse', len(trip_data_urls), 'S3 urls')    

16:35:28 - Starting to parse 3 S3 urls
16:35:28 - Reading https://s3.amazonaws.com/tripdata/201601-citibike-tripdata.zip
16:35:31 - Writing stage/trip_data/201601-citibike-tripdata.csv
16:35:38 - Reading https://s3.amazonaws.com/tripdata/201602-citibike-tripdata.zip
16:35:42 - Writing stage/trip_data/201602-citibike-tripdata.csv
16:35:50 - Reading https://s3.amazonaws.com/tripdata/201603-citibike-tripdata.zip
16:35:55 - Writing stage/trip_data/201603-citibike-tripdata.csv
16:36:08 - Finished to parse 3 S3 urls


Lets inspect the df of the last interation to get an idea what the raw trip data looks like.

In [3]:
print(df.head())

   tripduration          starttime           stoptime  start station id  \
0          1491  3/1/2016 06:52:42  3/1/2016 07:17:33                72   
1          1044  3/1/2016 07:05:50  3/1/2016 07:23:15                72   
2           714  3/1/2016 07:15:05  3/1/2016 07:26:59                72   
3           329  3/1/2016 07:26:04  3/1/2016 07:31:34                72   
4          1871  3/1/2016 07:31:30  3/1/2016 08:02:41                72   

  start station name  start station latitude  start station longitude  \
0   W 52 St & 11 Ave               40.767272               -73.993929   
1   W 52 St & 11 Ave               40.767272               -73.993929   
2   W 52 St & 11 Ave               40.767272               -73.993929   
3   W 52 St & 11 Ave               40.767272               -73.993929   
4   W 52 St & 11 Ave               40.767272               -73.993929   

   end station id          end station name  end station latitude  \
0             427       Bus Slip & State 

#### Weather data

The daily 2016 weather for NYC is queried from the *Open-Meteo API* and saved as a .csv in /stage.

In [4]:
# API url of daily 2016 weather data of New York city

try:
    print(current_time(), "- Reading weather API")
    url_api = "https://archive-api.open-meteo.com/v1/era5?latitude=40.71&longitude=-74.01&start_date=2016-01-01&end_date=2016-12-31&daily=temperature_2m_max,precipitation_sum,precipitation_hours&timezone=America%2FNew_York"
    
    # request data from api in .json format 
    json_weather_nyc = requests.get(url_api, verify=False).json()
    df_weather_nyc = pd.DataFrame.from_records(json_weather_nyc['daily'])

except:
    raise Exception("Downloading data from API failed")

16:36:08 - Reading weather API




In [5]:
# Create a staging-folder if it not already exists
os.makedirs('stage/weather_data', exist_ok=True)  

# Writing data to stage
write_to_path = 'stage/weather_data/2016-nyc-weather.csv'
print(current_time(), "- Writing", write_to_path)
df_weather_nyc.to_csv(write_to_path)

16:36:09 - Writing stage/weather_data/2016-nyc-weather.csv


Inspect the df to get an idea what the raw weather data looks like.

In [6]:
print(df_weather_nyc.head())

   precipitation_hours  precipitation_sum  temperature_2m_max        time
0                  0.0                0.0                 6.2  2016-01-01
1                  0.0                0.0                 4.2  2016-01-02
2                  0.0                0.0                 6.5  2016-01-03
3                  0.0                0.0                -0.4  2016-01-04
4                  0.0                0.0                -1.4  2016-01-05


### Step 2: Explore and Assess the Data

In step 2 we explore our two data sets, look for possible data quality issues are examined and clean the data if needed. 

In [7]:
# start spark session

try:
    spark = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()
    print(current_time(), "- Spark session created or already running")
except:
    raise Exception("Spark session could not been created")

16:36:19 - Spark session created or already running


#### Trip data

As the trip data sets can be quite large Spark is utilized to explore and clean the data sets.

In [8]:
# read and combine all .csv-files of the trip data into one spark df
print(current_time(), "- Started reading staging trip data into spark dataframe")

df_trip = spark.read \
    .option("header",True) \
    .csv('stage/trip_data/')

print(current_time(), "- Finished reading", df_trip.count(), "rows from trip staging data into spark dataframe")

# Inspect the data structure of the trip data
df_trip.printSchema()

16:36:19 - Started reading staging trip data into spark dataframe
16:36:26 - Finished reading 1990273 rows from trip staging data into spark dataframe
root
 |-- _c0: string (nullable = true)
 |-- tripduration: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- stoptime: string (nullable = true)
 |-- start station id: string (nullable = true)
 |-- start station name: string (nullable = true)
 |-- start station latitude: string (nullable = true)
 |-- start station longitude: string (nullable = true)
 |-- end station id: string (nullable = true)
 |-- end station name: string (nullable = true)
 |-- end station latitude: string (nullable = true)
 |-- end station longitude: string (nullable = true)
 |-- bikeid: string (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birth year: string (nullable = true)
 |-- gender: string (nullable = true)



In [9]:
# Inspect summary of "real" trip data in the trip data
df_trip.describe(['tripduration','starttime', 'stoptime','start station id', 'end station id']).show()

# Inspect summary of station data
df_trip.describe(['start station name','start station latitude', 'start station longitude','end station name','end station latitude', 'start station longitude']).show()

# Inspect summary of customer data within the trip data
df_trip.describe(['bikeid','usertype', 'birth year','gender']).show()

+-------+-----------------+-----------------+-----------------+------------------+-----------------+
|summary|     tripduration|        starttime|         stoptime|  start station id|   end station id|
+-------+-----------------+-----------------+-----------------+------------------+-----------------+
|  count|          1990273|          1990273|          1990273|           1990273|          1990273|
|   mean|935.4001752523397|             null|             null| 843.9573872529045|830.3775070053204|
| stddev|8735.268620734047|             null|             null|1004.2269454016665|992.2558578543673|
|    min|              100|1/1/2016 00:00:41|1/1/2016 00:06:51|               116|              116|
|    max|             9998|3/9/2016 23:59:56|4/9/2016 19:22:49|                83|               83|
+-------+-----------------+-----------------+-----------------+------------------+-----------------+

+-------+------------------+----------------------+-----------------------+---------------

Inspection of the data shows that **trip duration is in a reasonable range** (100 seconds to ca. 2.5 hours). Also **missing data** seems to be no problem according to the counts. Only for *birth year* data is missing. However this seems to be no data quality issue as *Citi Bikes* also serves anonymous customers.

The only problem we can identify is **unreasonably low *birth year*** (e.g. 1885). By now the customer would over 130 years old. Therefore all *birth years* below 1916 are set to missing as these data points are most likely wrong and would skew later analysis.

In [10]:
# How many people are over 100 years old?
print(current_time(),'Count birth year below 1916 before cleaning:',
      df_trip.select('birth year').where(df_trip['birth year'] < 1916).count())

# How many birth dates are missing?
print(current_time(),'Count birth year missing before cleaning:',
      df_trip.filter((df_trip["birth year"] == "") | df_trip["birth year"].isNull() | isnan(df_trip["birth year"])).count())

# Cleaning the data
df_trip = df_trip.withColumn("birth year", when(df_trip['birth year'] < 1916, None) \
    .otherwise(df_trip['birth year'])
    )

# Numbers of people with birth data < 1916 should be 0 now
print(current_time(),'Count birth year below 1916 after cleaning:',
      df_trip.select('birth year').where(df_trip['birth year'] < 1916).count())

# Numbers of missing data points should be increased
print(current_time(),'Count birth year missing after cleaning:',
      df_trip.filter((df_trip["birth year"] == "") | df_trip["birth year"].isNull() | isnan(df_trip["birth year"])).count())

16:37:59 Count birth year below 1916 before cleaning: 766
16:38:05 Count birth year missing before cleaning: 147614
16:38:12 Count birth year below 1916 after cleaning: 0
16:38:17 Count birth year missing after cleaning: 148380


#### Weather data

The weather data only contains daily weather data for one year and one location. So the dataset is quite small and can be assessed in Python.

In [11]:
# read .csv-file of the weather data into pandas df
df_weather_nyc = pd.read_csv('stage/weather_data/2016-nyc-weather.csv')  

# Inspect the data
print(df_weather_nyc.describe())

       Unnamed: 0  precipitation_hours  precipitation_sum  temperature_2m_max
count  366.000000           366.000000         366.000000          366.000000
mean   182.500000             2.918033           2.454918           17.057650
std    105.799338             4.618468           5.403547           10.109954
min      0.000000             0.000000           0.000000           -9.600000
25%     91.250000             0.000000           0.000000            9.000000
50%    182.500000             0.000000           0.000000           16.800000
75%    273.750000             5.000000           2.000000           26.500000
max    365.000000            24.000000          44.000000           35.000000


Inspection of the data shows that **data seems to be in reasonable ranges**. For example minimum and maximums of *precipitation_hours* range from 0 to 24 hours and *temperature_2m_max* seems to be in reasonable celsius ranges.

Still data is additionality checked for possible missing values skewing the data as well as completeness of all 366 days of 2016.

In [12]:
# checking for missing values
print(current_time(), "- Checking weather df for missing-values")
if df_weather_nyc.isnull().values.any() == False:
    print(current_time(), "- Check passed")
else:
    print(current_time(), "- Check failed")
    raise Exception("df has unexpected missing values. Check api query.")

16:38:24 - Checking weather df for missing-values
16:38:24 - Check passed


In [13]:
# checking if the daily weather data has as many days as the year (2016 = 366 days)
print(current_time(), "- Checking weather df for completeness")
if df_weather_nyc.time.nunique() == 366:
    print(current_time(), "- Check passed")
else:
    print(current_time(), "- Check failed")
    raise Exception("Number of unique days in df does not match days of the year. Check for missing days or duplicates genereated in the api query.")

16:38:24 - Checking weather df for completeness
16:38:24 - Check passed


### Step 3: Define the Data Model

After exploring and cleaning the data a data is transformed into a relational data model.

#### 3.1 Conceptual Data Model

To allow users of the database to carry out flexible analysis an relational database model with one fact table (trip_f) and five dimension tables is designed.

![Data-Model](data-model.png "Data Model")

The fact table *trip_f* and the dimension tables *station_d*, *bike_d* and *gender_d* are derived from the trip data. The dimension table *weather_d* is constructed from the weather data. The dimension table *calendar_d* is derived from a combination of trip data and weather data. 


#### 3.2 Mapping Out Data Pipelines

Within the data pipelines  the following steps are taken or already have been executed.

1. Loading data from S3 and API and saving the raw data in /stage as csv-files. One file for the weather data and monthly files for the trip data.
2. Loading multiple csv-files from /stage into aggregated two data frames (trip and weather) with Python/PySpark
3. Checking and cleaning the data with Python/PySpark
4. Transforming the refined data into data frames representing one fact tables and five dimension tables with PySpark
5. Saving each table as a parquet-file in /dwh
6. Performing a number of quality checks on the parquet-files to ensure integrity of the data with PySpark

![etl](etl.png "ETL")


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

In [14]:
# A spark session is already active. Create a temporary spark view in order to run spark.sql queries on the trip data

df_trip.createOrReplaceTempView("trip_v")

In [15]:
# define the station dimension table from all unique stations a trip had as start point or end point
# for some station_ids mutiple station_names exist, they are kept unique with a window function and sub-selects

df_station_d = spark.sql("""
    SELECT station_id,
           station_name,
           station_lat,
           station_long,
           insert_ts
    FROM (
        SELECT *, row_number( ) over (partition by station_id order by station_name desc) as row_num
        FROM (
            SELECT DISTINCT CAST(station_id as INTEGER) as station_id,
                   station_name,
                   ROUND(CAST(station_lat as FLOAT),6) as station_lat,
                   ROUND(CAST(station_long as FLOAT),6) as station_long,
                   NOW() as insert_ts

            FROM (
                SELECT `Start Station ID` AS station_id, 
                       `Start Station Name` AS station_name,
                       `Start Station Latitude` AS station_lat,
                       `Start Station Longitude` AS station_long
                FROM trip_v 

                UNION ALL

                SELECT `End Station ID` AS station_id, 
                       `End Station Name` AS station_name,
                       `End Station Latitude` AS station_lat,
                       `End Station Longitude` AS station_long
                FROM trip_v 
            )
            WHERE station_id IS NOT NULL    
        )
    )
    WHERE row_num = 1
    ORDER BY station_id DESC
    """)

# inspect the table
df_station_d.show()
df_station_d.printSchema()

+----------+--------------------+-----------+------------+--------------------+
|station_id|        station_name|station_lat|station_long|           insert_ts|
+----------+--------------------+-----------+------------+--------------------+
|      3246|Montague St & Cli...|  40.694283|    -73.9923|2023-01-13 16:38:...|
|      3244|University Pl & E...|  40.731438|    -73.9949|2023-01-13 16:38:...|
|      3243|     E 58 St & 1 Ave|  40.758923|  -73.962265|2023-01-13 16:38:...|
|      3242|Schermerhorn St &...|   40.69103|   -73.99184|2023-01-13 16:38:...|
|      3241|Monroe St & Tompk...|  40.686203|  -73.944695|2023-01-13 16:38:...|
|      3240|NYCBS Depot BAL -...|        0.0|         0.0|2023-01-13 16:38:...|
|      3238|     E 80 St & 2 Ave|  40.773914|    -73.9544|2023-01-13 16:38:...|
|      3237|      21 St & 41 Ave|  40.753834|   -73.94268|2023-01-13 16:38:...|
|      3236|  W 42 St & Dyer Ave|  40.758984|    -73.9938|2023-01-13 16:38:...|
|      3235|E 41 St & Madison...|  40.75

In [16]:
# define the bike dimension table from all unique stations booked for a trip 
df_bike_d = spark.sql("""
    SELECT  CAST(bikeid as INTEGER) AS bike_id,
            MAX(to_timestamp(stoptime, "mm/dd/yyyy HH:mm:ss")) AS bike_trip_last_ts,
            CAST(COUNT(*) as INTEGER) AS bike_trip_cnt,
            NOW() as insert_ts

    FROM trip_v 
    GROUP BY bike_id
    ORDER BY bike_id ASC
    """)

# inspect the table
df_bike_d.show()
df_bike_d.printSchema()

+-------+-------------------+-------------+--------------------+
|bike_id|  bike_trip_last_ts|bike_trip_cnt|           insert_ts|
+-------+-------------------+-------------+--------------------+
|  14529|2016-01-31 21:14:22|          294|2023-01-13 16:39:...|
|  14530|2016-01-31 20:37:29|          218|2023-01-13 16:39:...|
|  14531|2016-01-31 14:07:39|          219|2023-01-13 16:39:...|
|  14532|2016-01-31 17:01:34|          208|2023-01-13 16:39:...|
|  14533|2016-01-31 22:10:29|          179|2023-01-13 16:39:...|
|  14534|2016-01-31 19:20:16|          180|2023-01-13 16:39:...|
|  14535|2016-01-31 19:00:43|          280|2023-01-13 16:39:...|
|  14536|2016-01-31 21:09:36|          205|2023-01-13 16:39:...|
|  14537|2016-01-31 12:18:56|          267|2023-01-13 16:39:...|
|  14538|2016-01-31 18:48:18|          212|2023-01-13 16:39:...|
|  14539|2016-01-31 20:58:01|          253|2023-01-13 16:39:...|
|  14540|2016-01-31 17:11:34|          190|2023-01-13 16:39:...|
|  14541|2016-01-31 20:13

In [17]:
# define the station dimension table from the gender codes in the trip data
df_gender_d = spark.sql("""
    SELECT  DISTINCT CAST(`Gender` AS integer) as gender_id,
            CASE  WHEN `Gender` = 0 THEN NULL
                    WHEN `Gender` = 1 THEN 'm'
                    WHEN `Gender` = 2 THEN 'f'
                    END AS gender_name_short, 
            CASE  WHEN `Gender` = 0 THEN NULL
                    WHEN `Gender` = 1 THEN 'male'
                    WHEN `Gender` = 2 THEN 'female'
                    END AS gender_name_long,
            NOW() as insert_ts
                             
    FROM trip_v 
    ORDER BY gender_id ASC
    """)

# inspect the table
df_gender_d.show()
df_gender_d.printSchema()

+---------+-----------------+----------------+--------------------+
|gender_id|gender_name_short|gender_name_long|           insert_ts|
+---------+-----------------+----------------+--------------------+
|        0|             null|            null|2023-01-13 16:39:...|
|        1|                m|            male|2023-01-13 16:39:...|
|        2|                f|          female|2023-01-13 16:39:...|
+---------+-----------------+----------------+--------------------+

root
 |-- gender_id: integer (nullable = true)
 |-- gender_name_short: string (nullable = true)
 |-- gender_name_long: string (nullable = true)
 |-- insert_ts: timestamp (nullable = false)



In [18]:
# define the fact table for trips, remove variables that have been casted to dimension tables
df_trip_f = spark.sql("""
    SELECT  ROW_NUMBER () OVER (ORDER BY starttime) as trip_id,
            to_timestamp(starttime, "mm/dd/yyyy HH:mm:ss") as start_ts,
            DATE(to_timestamp(starttime, "mm/dd/yyyy HH:mm:ss")) as start_dt, 
            to_timestamp(stoptime, "mm/dd/yyyy HH:mm:ss") as stop_ts,
            DATE(to_timestamp(stoptime, "mm/dd/yyyy HH:mm:ss")) as stop_dt,
            CAST(tripduration AS LONG) as trip_duration_in_seconds,
            CAST(`Start Station ID` AS INTEGER) as start_station_id, 
            CAST(`End Station ID` AS INTEGER) as end_station_id,
            CAST(bikeid AS integer) as bike_id,
            usertype as user_type,
            CAST(`Birth Year` AS integer) as user_birth_year,
            CAST(`Gender` AS integer) as gender_id,
            NOW() as insert_ts            
            
    FROM trip_v 
    ORDER BY start_ts ASC
    """)

# inspect the table
df_trip_f.show()
df_trip_f.printSchema()

+-------+-------------------+----------+-------------------+----------+------------------------+----------------+--------------+-------+----------+---------------+---------+--------------------+
|trip_id|           start_ts|  start_dt|            stop_ts|   stop_dt|trip_duration_in_seconds|start_station_id|end_station_id|bike_id| user_type|user_birth_year|gender_id|           insert_ts|
+-------+-------------------+----------+-------------------+----------+------------------------+----------------+--------------+-------+----------+---------------+---------+--------------------+
| 509479|2016-01-01 00:00:08|2016-01-01|2016-01-01 00:07:49|2016-01-01|                     461|             480|           524|  23292|Subscriber|           1966|        1|2023-01-13 16:39:...|
|1070353|2016-01-01 00:00:23|2016-01-01|2016-01-01 00:02:52|2016-01-01|                     149|             479|           449|  22887|Subscriber|           1991|        1|2023-01-13 16:39:...|
|1070354|2016-01-01 00:00

In [19]:
# load weather data into Spark
print(current_time(), "- Started reading staging weather data into spark dataframe")

df_weather = spark.read \
    .option("header",True) \
    .csv('stage/weather_data/')

print(current_time(), "- Finished reading", df_trip.count(), "rows from trip weather data into spark dataframe")

# Inspect columns
df_weather.printSchema()

# Create a temopary spark view in order to run spark.sql queries on the weather data
df_weather.createOrReplaceTempView("weather_v")

16:40:06 - Started reading staging weather data into spark dataframe
16:40:06 - Finished reading 1990273 rows from trip weather data into spark dataframe
root
 |-- _c0: string (nullable = true)
 |-- precipitation_hours: string (nullable = true)
 |-- precipitation_sum: string (nullable = true)
 |-- temperature_2m_max: string (nullable = true)
 |-- time: string (nullable = true)



In [20]:
# Define the weather dimension table
df_weather_d = spark.sql("""
    SELECT  DISTINCT DATE(time) as cal_dt, 
            CAST(precipitation_hours AS FLOAT),
            CAST(precipitation_sum  AS FLOAT) AS precipitation_sum_in_mm,
            CAST(temperature_2m_max AS FLOAT) AS temperature_max_in_celsius,
            NOW() as insert_ts            
            
    FROM weather_v 
    """)

# Inspect the table
df_weather_d.show()
df_weather_d.printSchema()

+----------+-------------------+-----------------------+--------------------------+--------------------+
|    cal_dt|precipitation_hours|precipitation_sum_in_mm|temperature_max_in_celsius|           insert_ts|
+----------+-------------------+-----------------------+--------------------------+--------------------+
|2016-01-16|                9.0|                    7.7|                      10.0|2023-01-13 16:40:...|
|2016-08-25|                0.0|                    0.0|                      29.5|2023-01-13 16:40:...|
|2016-07-03|                0.0|                    0.0|                      27.3|2023-01-13 16:40:...|
|2016-09-29|                9.0|                    3.7|                      18.2|2023-01-13 16:40:...|
|2016-01-14|                1.0|                    0.1|                       2.7|2023-01-13 16:40:...|
|2016-02-15|               11.0|                    7.8|                       4.7|2023-01-13 16:40:...|
|2016-12-15|                0.0|                    0.0

In [21]:
# Define the calendar dimension table using unique dates from weather data and trip data
df_calendar_d = spark.sql("""
    SELECT  DISTINCT cal_dt,
            EXTRACT(year from cal_dt) as cal_year,
            EXTRACT(quarter from cal_dt) as cal_quarter,
            EXTRACT(month from cal_dt) as cal_month,
            EXTRACT(day from cal_dt) as cal_day,
            EXTRACT(dayofweek from cal_dt) as cal_day_of_week,
            NOW() as insert_ts            
            
    FROM (
        SELECT DISTINCT DATE(time) as cal_dt
        FROM weather_v 

        UNION ALL
        
        SELECT DISTINCT DATE(to_timestamp(starttime, "mm/dd/yyyy HH:mm:ss")) as cal_dt
        FROM trip_v
        
        UNION ALL
        
        SELECT DISTINCT DATE(to_timestamp(stoptime, "mm/dd/yyyy HH:mm:ss")) as cal_dt
        FROM trip_v    

    )
    
    ORDER BY cal_dt ASC
    """)

# Inspect the table
df_calendar_d.show()
df_calendar_d.printSchema()

+----------+--------+-----------+---------+-------+---------------+--------------------+
|    cal_dt|cal_year|cal_quarter|cal_month|cal_day|cal_day_of_week|           insert_ts|
+----------+--------+-----------+---------+-------+---------------+--------------------+
|2016-01-01|    2016|          1|        1|      1|              6|2023-01-13 16:40:...|
|2016-01-02|    2016|          1|        1|      2|              7|2023-01-13 16:40:...|
|2016-01-03|    2016|          1|        1|      3|              1|2023-01-13 16:40:...|
|2016-01-04|    2016|          1|        1|      4|              2|2023-01-13 16:40:...|
|2016-01-05|    2016|          1|        1|      5|              3|2023-01-13 16:40:...|
|2016-01-06|    2016|          1|        1|      6|              4|2023-01-13 16:40:...|
|2016-01-07|    2016|          1|        1|      7|              5|2023-01-13 16:40:...|
|2016-01-08|    2016|          1|        1|      8|              6|2023-01-13 16:40:...|
|2016-01-09|    2016|

In [22]:
# Create the dwh-folder if it not already exists
os.makedirs('dwh', exist_ok=True)  

# List all data frames that should be written into a parqurt-file
list_dwh_df=[
    {"df": df_bike_d, "table_name": 'bike_d'},
    {"df": df_station_d, "table_name": 'station_d'},    
    {"df": df_calendar_d, "table_name": 'calendar_d'},
    {"df": df_trip_f, "table_name": 'trip_f'},
    {"df": df_gender_d, "table_name": 'gender_d'},
    {"df": df_weather_d, "table_name": 'weather_d'},
]

print(current_time(), '- Starting to write', len(list_dwh_df), 'parquet-files')
    
# loop over list and dict to write to parquet-files
for elem in list_dwh_df:
    print(current_time(), "- Reading", elem['df'])
    
    # target path
    parquet_path = 'dwh/' + elem['table_name'] + '.parquet'
    
    print(current_time(), "- Writing", parquet_path)    
    elem['df'].write.mode("overwrite").parquet(parquet_path)

print(current_time(), '- Finished to write', len(list_dwh_df), 'parquet-files')

16:40:36 - Starting to write 6 parquet-files
16:40:36 - Reading DataFrame[bike_id: int, bike_trip_last_ts: timestamp, bike_trip_cnt: int, insert_ts: timestamp]
16:40:36 - Writing dwh/bike_d.parquet
16:40:59 - Reading DataFrame[station_id: int, station_name: string, station_lat: float, station_long: float, insert_ts: timestamp]
16:40:59 - Writing dwh/station_d.parquet
16:41:39 - Reading DataFrame[cal_dt: date, cal_year: int, cal_quarter: int, cal_month: int, cal_day: int, cal_day_of_week: int, insert_ts: timestamp]
16:41:39 - Writing dwh/calendar_d.parquet
16:42:09 - Reading DataFrame[trip_id: int, start_ts: timestamp, start_dt: date, stop_ts: timestamp, stop_dt: date, trip_duration_in_seconds: bigint, start_station_id: int, end_station_id: int, bike_id: int, user_type: string, user_birth_year: int, gender_id: int, insert_ts: timestamp]
16:42:09 - Writing dwh/trip_f.parquet
16:42:52 - Reading DataFrame[gender_id: int, gender_name_short: string, gender_name_long: string, insert_ts: times

#### 4.2 Data Quality Checks

The tables have now been saved into 6 parquet-files. To ensure integrity and data quality each file is read again and checked for existance, completeness and integrity of the keys.

In [23]:
# The dictionary in the list includes and defines values for...
# - table name (does the table exist?) 
# - minimum expected rows (has data been written to the table?)
# - key name and key data type (does the key column exists and has the right data type and is it unique?)

list_qualitiy_check=[
    {"table_name": 'bike_d', "min_rows": 10, "key": 'bike_id', "key_dtype": 'int'},
    {"table_name": 'calendar_d', "min_rows": 366, "key": 'cal_dt', "key_dtype": 'date'},
    {"table_name": 'station_d', "min_rows": 10, "key": 'station_id', "key_dtype": 'int'},        
    {"table_name": 'trip_f', "min_rows": 1000, "key": 'trip_id', "key_dtype": 'int'},
    {"table_name": 'gender_d', "min_rows": 3, "key": 'gender_id', "key_dtype": 'int'},
    {"table_name": 'weather_d', "min_rows": 366, "key": 'cal_dt', "key_dtype": 'date'}
]

# start quality check
print(current_time(), '- Starting qualtiy check for', len(list_qualitiy_check), 'tables')

# loop over defined tables
for elem in list_qualitiy_check:
    
    # read parquet-file
    file_name = 'dwh/' + elem['table_name'] + '.parquet'
    
    # Qualtiy Check 1/4: Does the file exist
    print(current_time(), '- Checking', file_name)    
    df_qc=spark.read.parquet(file_name)
    print(current_time(), '- 1/4 passed -', file_name, 'exists and has succesfully been read')  
    
    # Qualtiy Check 2/4: File contains defined minimum rows
    row_count = 0 # reset count
    row_count = df_qc.count()
    
    if row_count >= elem['min_rows']:
        print(current_time(), '- 2/4 passed -', 'Rows in table:', row_count, '| Rows needed to pass:', elem['min_rows'])  
        
    else:
        print(current_time(), '- 2/4 failed')          
        raise Exception('Minimum rows for table not fulfilled')
        
    # Qualtiy Check 3/4: Key is unique
    if df_qc.select(elem['key']).distinct().count() == df_qc.select(elem['key']).count():
        print(current_time(), '- 3/4 passed -', 'Key', elem["key"], 'is unique')  
        
    else:
        print(current_time(), '- 3/4 failed')          
        raise Exception('Key is not unique')          

    # Qualtiy Check 4/4: Key has right datatype
    df_qc_dtype_key = '' # reset
    df_qc_dtype_key = dict(df_qc.dtypes)[elem['key']]
    
    if elem['key_dtype'] == df_qc_dtype_key:
        print(current_time(), '- 4/4 passed -', 'Key dtype in table:', df_qc_dtype_key, '| Key dtype expected:', elem['key_dtype'])  
        
    else:
        print(current_time(), '- 4/4 failed')          
        raise Exception('Key dtype does not match',df_qc_dtype_key,elem['key_dtype'])    
        
print(current_time(), '- Qualitiy check successful for all', len(list_qualitiy_check), 'elements')


16:43:06 - Starting qualtiy check for 6 tables
16:43:06 - Checking dwh/bike_d.parquet
16:43:06 - 1/4 passed - dwh/bike_d.parquet exists and has succesfully been read
16:43:07 - 2/4 passed - Rows in table: 7771 | Rows needed to pass: 10
16:43:10 - 3/4 passed - Key bike_id is unique
16:43:10 - 4/4 passed - Key dtype in table: int | Key dtype expected: int
16:43:10 - Checking dwh/calendar_d.parquet
16:43:10 - 1/4 passed - dwh/calendar_d.parquet exists and has succesfully been read
16:43:11 - 2/4 passed - Rows in table: 366 | Rows needed to pass: 366
16:43:13 - 3/4 passed - Key cal_dt is unique
16:43:13 - 4/4 passed - Key dtype in table: date | Key dtype expected: date
16:43:13 - Checking dwh/station_d.parquet
16:43:13 - 1/4 passed - dwh/station_d.parquet exists and has succesfully been read
16:43:13 - 2/4 passed - Rows in table: 490 | Rows needed to pass: 10
16:43:15 - 3/4 passed - Key station_id is unique
16:43:15 - 4/4 passed - Key dtype in table: int | Key dtype expected: int
16:43:15 

#### 4.3 Data dictionary 

Brief documentation of the data included in the dwh-files.

#### 4.3.1 Fact table trip_f

The fact table trip_f contains all valid trips registered by Citi Bike New York including information on start point, end point and the user.

|column |dtype|description|key|origin|
|:---|:---|:---|:---|:---|
|trip_id|integer|Unique identifier of a trip|Primary key|Calculated on trip data|
|start_ts|timestamp|Timestamp when a user rented and started a trip with a bike||Original trip data|
|start_dt|date|Date when a user rented and started a trip with a bike|Foreign key calendar_d.cal_dt and weather_d.cal_dt|Original trip data|
|stop_ts|timestamp|Timestamp when a user ended a trip with a bike||Original trip data|
|stop_dt|date|Date when a user ended a trip with a bike |Foreign key calendar_d.cal_dt and weather_d.cal_dt|Original trip data|
|trip_duration_in_seconds|long|Duration of the trip in seconds||Original trip data|
|start_station_id|integer|Station identifier of the station where a user started a trip|Foreign key station_d.station_id|Original trip data|
|end_station_id|integer|Station identifier of the station where a user ended a trip|Foreign key station_d.station_id|Original trip data|
|bike_id|integer|Identifier of the bike the user rented for the trip|Foreign key bike_d.bike_id|Original trip data|
|user_type|string|User type is defined as customer (24-hour pass or 3-day pass user) or subscriber (Annual Member)||Original trip data|
|user_birth_year|integer|Year of birth of the user||Original trip data|
|gender_id|integer|Identifier of user gender|Foreign key gender_d.gender_id|Original trip data|
|insert_ts|timestamp|Technical timestamp marking the point in time where data was inserted into the table||Calculated in ETL|

#### 4.3.2 Dimension table bike_d

The dimension table bike_d contains meta information on usage of the individual bikes.

|column|dtype|description|key|origin|
|:---|:---|:---|:---|:---|
|bike_id|integer|Unique identifier of the bike|Primary key|Original trip data|
|bike_trip_last_ts|timestamp|Timestamp when the last trip was taken with the bike||Calculated on trip data|
|bike_trip_cnt|integer|Number of trips that have been taken with the bike||Calculated on trip data|
|insert_ts|timestamp|Technical timestamp marking the point in time where data was inserted into the table||Calculated in ETL|

#### 4.3.3 Dimension table station_d

The dimension table station_d contains meta information on all bike sharing stations of Citi Bikes.

|column|dtype|description|key|origin|
|:---|:---|:---|:---|:---|
|station_id|integer|Unique identifier of the station|Primary key|Original trip data|
|station_name|string|Name of the station||Original trip data|
|station_lat|float|Latitude of the location of the station||Original trip data|
|station_long|float|Longitude of the location of the station||Original trip data|
|insert_ts|timestamp|Technical timestamp marking the point in time where data was inserted into the table||Calculated in ETL|

#### 4.3.4 Dimension table calendar_d

The dimension table station_d contains meta information on all dates used in this project. It can be joined e.g. on the fact table to get information on which day of the week or which quarter a trip was taken.

|column |dtype|description|key|origin|
|:---|:---|:---|:---|:---|
|cal_dt|date|Unique calendar date|Primary key|From trip data and weather data|
|cal_year|integer|Year of the calendar date||Calculated from cal_dt|
|cal_quarter|integer|Quarter of the calendar date||Calculated from cal_dt|
|cal_month|integer|Month of the calendar date||Calculated from cal_dt|
|cal_day|integer|Calendar day of the calendar date||Calculated from cal_dt|
|cal_day_of_week|integer|Day of the week of the calendar date||Calculated from cal_dt|
|insert_ts|timestamp|Technical timestamp marking the point in time where data was inserted into the table||Calculated in ETL|

#### 4.3.5 Dimension table gender_d

The dimension table gender_d resolves the gender_id into speaking variables.

|column|dtype|description|key|origin|
|:---|:---|:---|:---|:---|
|gender_id|integer|Unique identifier of gender|Primary key|Original trip data|
|gender_name_short|string|Abbreviation of the gender (m = male and f = female)||Calculated in ETL|
|gender_name_long|string|Full name of the gender||Calculated in ETL|
|insert_ts|timestamp|Technical timestamp marking the point in time where data was inserted into the table||Calculated in ETL|

#### 4.3.6 Dimension table weather_d

The dimension table weather_d contains daily weather information for New York City.

|column |dtype|description|key|origin|
|:---|:---|:---|:---|:---|
|cal_dt|date|Calendar date of measurement|Primary key|Original weather data|
|precipitation_hours|float|The number of hours with rain||Original weather data|
|precipitation_sum_in_mm|float|Sum of daily precipitation in mm (including rain, showers and snowfall)||Original weather data|
|temperature_max_in_celsius|float|Maximum daily air temperature at 2 meters above ground in celsius||Original weather data|
|insert_ts|timestamp|Technical timestamp marking the point in time where data was inserted into the table||Calculated in ETL|

#### Step 5: Complete Project Write Up

The data warehouse would now be useable for a data scientist. For example he or she could evaluate on which day of the week the most trips are taken.

In [24]:
# Read in needed tables with Spark and create views
calendar_d = spark.read.parquet('dwh/calendar_d.parquet')
calendar_d.createOrReplaceTempView("calendar_d")

trip_f = spark.read.parquet('dwh/trip_f.parquet')
trip_f.createOrReplaceTempView("trip_f")

# Aggregate the trip data on daily basis
trip_f_daily =  spark.sql("""
    SELECT  t.start_dt
            , COUNT(t.trip_id) as cnt_trips
    FROM   trip_f t
    GROUP BY 1
    """
    )
trip_f_daily.createOrReplaceTempView("trip_f_daily")


# First analysis
df_analysis_day_of_week = spark.sql("""
    SELECT  cal.cal_day_of_week
            , ROUND(AVG(cnt_trips),0) as avg_trips_per_day_of_week
            , MIN(cnt_trips) as min_trips_per_day_of_week
            , MAX(cnt_trips) as max_trips_per_day_of_week
    FROM   trip_f_daily t
    LEFT JOIN calendar_d cal
        ON t.start_dt = cal.cal_dt
    GROUP BY cal.cal_day_of_week
    ORDER BY 2 DESC
    """)

# Show result
df_analysis_day_of_week.show()

+---------------+-------------------------+-------------------------+-------------------------+
|cal_day_of_week|avg_trips_per_day_of_week|min_trips_per_day_of_week|max_trips_per_day_of_week|
+---------------+-------------------------+-------------------------+-------------------------+
|              6|                  74833.0|                    65852|                    84256|
|              2|                  68358.0|                    59199|                    79880|
|              7|                  65589.0|                    46657|                    85077|
|              1|                  61284.0|                    46256|                    78947|
|              5|                  61025.0|                    39363|                    76870|
|              3|                  59379.0|                    49746|                    74431|
|              4|                  56674.0|                    46470|                    64582|
+---------------+-----------------------

The results show that the on average most trips are taken on saturdays. In a next step we could analyse how minus degrees impact average rental numbers.

In [25]:
# read in needed tables with Spark and create views
weather_d = spark.read.parquet('dwh/weather_d.parquet')
weather_d.createOrReplaceTempView("weather_d")

# Seconds analysis
df_analysis_temperature = spark.sql("""
    SELECT  CASE WHEN w.temperature_max_in_celsius < 0 then True else False end as is_freezing
            , ROUND(AVG(cnt_trips),0) as avg_trips
    FROM   trip_f_daily t
    LEFT JOIN weather_d w
        ON t.start_dt = w.cal_dt
    GROUP BY 1
    ORDER BY 2 DESC
    """)

# Show result
df_analysis_temperature.show()

+-----------+---------+
|is_freezing|avg_trips|
+-----------+---------+
|      false|  65560.0|
|       true|  60883.0|
+-----------+---------+



When the maximum temperature on a given day stays below 0 degrees (-> "is_freezing == True") slightly less people are renting bikes on average.

#### 5.1 Tool and technology choice

**Data storage**
- Data has been **read from S3  and an API**
- Raw input data is safed as a **csv-file in /stage** in order to have the raw data for backup-cases (e.g. outage of the API)
- Refined dwh table data is safed as **parquet-files in /dwh** in order to define data types

**ETL Tools**
- For data processing, cleaning and processing **PySpark** as the project works with a large dataset. Spark excels in processing big data as it utilizes streaming and parallelization and also can be hosted on the cloud. 
- For smaller datasets within the ETL **base Python3 and Pandas** is used to inspect and transform data.



#### 5.2 Data updates

Currently data is provided monthly by *Citi Bikes* and - theoretically - daily by *Open-Meteo*. Therefore, the data should be updated monthly in order to keep the database up to date. However, in cooperation with *Citi Bikes* maybe a higher frequency of updates ranging from weekly to real-time could be realized. This would enable data scientists and user users to provide faster feedback to *Citi Bikes* and therefore enable *Citi Bikes* to faster react to current situations or trends.

#### 5.3 Design changes under different scenarios

#### 5.3.1 The data was increased by 100x
- Currently the data pipeline is executed in local Spark or even standalone Python. To handle larger data volumes the ETL could be **migrated to cloud services such as AWS EMR** and utilize their far superior computing power.
- The data should be stored in a real **database such as AWS Redshift** to utilize its computing power and take advantages of its architecture (like MPP or columnar storage). Therefore, correct distribution keys and sort keys would need to be defined.
- In addition, the data pipeline could be reconfigured from the current **full load to a incremental load** (e.g. daily or hourly). This would lead to less data that needs to be processed in every ETL iteration.


#### 5.3.2 The pipelines would be run on a daily basis by 7 am every day
- First of all: As mentioned above the ETL should be migrated from **full load to incremental load** in order to reduce daily load on the ETL infrastructure
- An **orchestration tool like Apache Airflow** should be used to control and steer the daily pipelines. Apache Airflow would allow easier debugging, recovery processes in case a data source goes offline and the opportunity to integrate new data sources in a simple way

#### 5.3.3 The database needed to be accessed by 100+ people
- Currently our "database" is a collection of local files. The files are hard to share and a lot of user working on them could lead to high load on the system. A solution would be to host the relational data base on a **cloud database like AWS Redshift**. Redshift allows a high number of active database connections and users at the same time. In addition it is easily scalable if the number of users should increase further.
- Furthermore **schemas and user roles** in redshift would allow us to define which users have access to which data. Which would allow us to provide users access to particular parts of the database further improving load balancing.