# Udacity Data Engineering Capstone Project 
#### by KONE AZIZ R.

#### Project Summary
The purpose of the data engineering capstone project is to combine all the data engineering skills we have learned during the Udacity Data Engineering Journey.
We will deal with nyc taxi trips records

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
from datetime import datetime
import configparser
from sqlalchemy import create_engine
import sql_queries as sq
%reload_ext sql

In [2]:
config = configparser.ConfigParser()
# Reading configuration file where Redshift connection informations are stored
config.read_file(open('db.cfg'))

#### Getting our redshift datawarehouse configuration

In [3]:
DWH_HOST               = config.get("CLUSTER", "HOST")
DWH_DB                 = config.get("CLUSTER","DB_NAME")
DWH_DB_USER            = config.get("CLUSTER","DB_USER")
DWH_DB_PASSWORD        = config.get("CLUSTER","DB_PASSWORD")
DWH_PORT               = config.get("CLUSTER","DB_PORT")

In [4]:
#Create a direct connection with our redshift database
conn_string=f'postgresql://{DWH_DB_USER}:{DWH_DB_PASSWORD}@{DWH_HOST}:{DWH_PORT}/{DWH_DB}'
db_engine = create_engine(conn_string)
db_conn_url = f'jdbc:postgresql://{DWH_HOST}:{DWH_PORT}/{DWH_DB}'

In [5]:
# Start the spark Session
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import functions as F
from pyspark.sql import types

conf = SparkConf()  # create the configuration
conf.set("spark.jars", "/home/workspace/postgresql-42.5.0.jar") # This config helps spark to communicate with redshift and other postgreSQL database.
conf=conf
spark = SparkSession.builder \
        .config(conf=conf) \
        .appName("Udacity Data engineer capstone project") \
        .enableHiveSupport() \
        .getOrCreate()

### Data Gathering

In [68]:
# These has to be downloaded once
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-01.parquet
#!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv

In [69]:
%%sh
mv taxi+_zone_lookup.csv personal_data/taxi_zone_lookup.csv
mv yellow_tripdata_2022-01.parquet personal_data/
mv green_tripdata_2022-01.parquet personal_data/

In [6]:
payment_type_data = [
        ('1','Credit card'),
        ('2','Cash'),
        ('3','No charge'),
        ('4','Dispute'),
        ('5','Unknow'),
        ('6','Voided trip')
        ]

Load all data into Spark

In [7]:
DATA_DIR = 'personal_data'
YEAR = 2022  #datetime.now().strftime('%Y')
MONTH = '01' #datetime.now().strftime('%m')

df_yellow = spark.read.parquet(f'{DATA_DIR}/yellow_tripdata_{YEAR}-{MONTH}.parquet')
df_green = spark.read.parquet(f'{DATA_DIR}/green_tripdata_{YEAR}-{MONTH}.parquet')

taxi_lookup_schema = types.StructType([
    types.StructField("LocationID", types.IntegerType(), True),
    types.StructField("Borough", types.StringType(), True),
    types.StructField("Zone", types.StringType(), True),
    types.StructField("service_zone", types.StringType(), True)
])
df_taxi_lookup = spark\
                        .read.option("header",True) \
                        .csv(f'{DATA_DIR}/taxi_zone_lookup.csv' , schema=taxi_lookup_schema)


df_pt = spark.createDataFrame(data=payment_type_data, schema = ["TypeId","TypeLabel"])

In [8]:
df_yellow.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [9]:
df_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [10]:
df_pt.printSchema()

root
 |-- TypeId: string (nullable = true)
 |-- TypeLabel: string (nullable = true)



In [11]:
df_taxi_lookup.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [12]:
def get_createSqlStatement(df,tbname):
    return pd.io.sql.get_schema(df.limit(5).toPandas(), name=tbname, con=db_engine)

### Data Cleaning - Data Transformation

In [13]:
df_yellow = df_yellow\
.withColumnRenamed('tpep_pickup_datetime' , 'pickup_datetime') \
.withColumnRenamed('tpep_dropoff_datetime' , 'dropoff_datetime')\
.withColumn('taxi_type' , F.lit('yellow'))

In [14]:
df_green = df_green\
.withColumnRenamed('lpep_pickup_datetime' , 'pickup_datetime') \
.withColumnRenamed('lpep_dropoff_datetime' , 'dropoff_datetime') \
.withColumn('taxi_type' , F.lit('green'))

We would like to join both dataframe green and yellow trips.
1. We will only keep the columns they have in common 
2. We will then apply a union on these 2 dataframes 

In [15]:
common_columns = []
for column in df_yellow.columns:
    if column in df_green.columns:
        common_columns.append(column)

In [16]:
common_columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'taxi_type']

In [17]:
df_taxi_trips = df_green.select(common_columns).unionAll(df_yellow.select(common_columns))

In [18]:
print(get_createSqlStatement(df_taxi_trips,'fact_trips'))
print(get_createSqlStatement(df_taxi_lookup,'dim_location'))
print(get_createSqlStatement(df_pt,'dim_payment_type'))
print(get_createSqlStatement(df_taxi_lookup,'dim_location'))


CREATE TABLE fact_trips (
	"VendorID" BIGINT, 
	pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	taxi_type TEXT
)



CREATE TABLE dim_location (
	"LocationID" INTEGER, 
	"Borough" TEXT, 
	"Zone" TEXT, 
	service_zone TEXT
)



CREATE TABLE dim_payment_type (
	"TypeId" TEXT, 
	"TypeLabel" TEXT
)



CREATE TABLE dim_location (
	"LocationID" INTEGER, 
	"Borough" TEXT, 
	"Zone" TEXT, 
	service_zone TEXT
)




### Data Ingestion

In [19]:
def loadIntoSpark(tbname):
    """
        Load postgres table in spark.
        - tbname: The table we want to load into spark 
    """
    df = spark.read\
    .format("jdbc")\
    .option("url",db_conn_url)\
    .option("driver", "org.postgresql.Driver")\
    .option("dbtable", tbname) \
    .option("user", f'{DWH_DB_USER}')\
    .option("password", f'{DWH_DB_PASSWORD}')\
    .load()
    return df

In [20]:
def insertInto(df,table,partition=5,batchsize=10000,mode='append'):
    """
        Insert Data into Redshit table
        - df : The park dataframe
        - table : The target table
        - partition : Number of partitions default 5
        - batchsize : Batch size default 1000
        - mode : Mode of insertion. Can be overwrite / append / ignore / errorifexists or error
    """
    try:
        # For time saving purpose i limit tables with only 100 rows
        df = df.limit(100)
        print(f'There are {loadIntoSpark(table).count()} rows in {table} before insertion')
        df.write\
            .format("jdbc")\
            .option("url", db_conn_url) \
            .option("driver", "org.postgresql.Driver")\
            .option("dbtable", table) \
            .option("user", f'{DWH_DB_USER}')\
            .option("password", f'{DWH_DB_PASSWORD}')\
            .option("numPartitions", partition)\
            .option("batchsize", batchsize)\
            .mode(mode).save()
        print(f'There are {loadIntoSpark(table).count()} rows in {table} after insertion')
    except:
        print('Insertion failed !')

Insert Data into tables

In [25]:
insertInto(df_taxi_trips,'fact_trips',partition=5,batchsize=10000,mode='overwrite')

There are 0 rows in fact_trips before insertion
There are 100 rows in fact_trips after insertion


In [26]:
insertInto(df_taxi_lookup,'dim_location',partition=5,batchsize=10000,mode='overwrite')

There are 0 rows in dim_location before insertion
There are 100 rows in dim_location after insertion


In [27]:
insertInto(df_pt,'dim_payment_type',partition=5,batchsize=10000,mode='overwrite')

There are 0 rows in dim_payment_type before insertion
There are 6 rows in dim_payment_type after insertion


For time, we need to: </br>
1- Load existing data into a dataframe  </br>
2- Combine with the newly created dataframe with a join and exclude already existing data

In [28]:
# Create a temporary table and extract time info from it
df_taxi_trips = df_taxi_trips.limit(100)
df_taxi_trips.registerTempTable('tmp_trips')

In [29]:
# Loading the existing time dimension data
df_existing_time = loadIntoSpark('dim_time')
df_existing_time.registerTempTable('tp_dim_time')

In [30]:
# Creating the Time dimension table
df_time = spark.sql("""
SELECT distinct
       date(X.the_date)                                           AS date,
       EXTRACT(year FROM X.the_date)                              AS year,
       EXTRACT(quarter FROM X.the_date)                           AS quarter,
       EXTRACT(month FROM X.the_date)                             AS month,
       EXTRACT(day FROM X.the_date)                               AS day,
       EXTRACT(week FROM X.the_date)                              AS week,
       CASE WHEN weekday(X.the_date)  IN (6, 7) THEN true ELSE false END AS is_weekend
FROM 
(SELECT pickup_datetime as the_date FROM tmp_trips
UNION 
SELECT dropoff_datetime as the_date FROM tmp_trips) X
Where X.the_date not in (select date from tp_dim_time)
""")

In [31]:
print(get_createSqlStatement(df_time,'dim_time'))


CREATE TABLE dim_time (
	date DATE, 
	year INTEGER, 
	quarter INTEGER, 
	month INTEGER, 
	day INTEGER, 
	week INTEGER, 
	is_weekend BOOLEAN
)




In [32]:
insertInto(df_time,'dim_time',partition=5,batchsize=10000,mode='append')

There are 0 rows in dim_time before insertion
There are 3 rows in dim_time after insertion


### Some tests 

In [34]:
%sql $conn_string

'Connected: dwhuser@dwh'

1- number of fact trips by day and taxi type

In [35]:
%%sql
SELECT 
    t.date, 
    p.taxi_type , 
    count(p.*) nb_trips 
from fact_trips p 
inner join dim_time t on p.pickup_datetime = t.date
group by  
t.date, 
p.taxi_type

 * postgresql://dwhuser:***@dwh.c3ujdkixlmmf.us-west-2.redshift.amazonaws.com:5439/dwh
0 rows affected.


date,taxi_type,nb_trips


2- number of fact trips by day and pick up location

In [36]:
%%sql
SELECT 
    t.date, 
    l.zone,
    count(p.*) nb_trips 
from fact_trips p 
inner join dim_time t on p.pickup_datetime = t.date
inner join dim_location l on l.LocationID = p.PULocationID
group by t.date , l.zone

 * postgresql://dwhuser:***@dwh.c3ujdkixlmmf.us-west-2.redshift.amazonaws.com:5439/dwh
0 rows affected.


date,zone,nb_trips


3 - Average, Max and Min fare amount by taxi type 

In [37]:
%%sql
SELECT 
    p.taxi_type , 
    AVG(p.fare_amount) avg_amount ,
    MAX(p.fare_amount) max_amount ,
    MIN(p.fare_amount) min_amount 
from fact_trips p 
group by p.taxi_type

 * postgresql://dwhuser:***@dwh.c3ujdkixlmmf.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


taxi_type,avg_amount,max_amount,min_amount
green,13.155,61.5,2.5


**Result show in Tests above are not relevant as I set a limit on data before loading them.**
**One can remove .limit(100) and run again all tests**