# ETL

## Importart libraries

First, we need to import needed libraries and some methods from the `utils` and `etl` directories to stablish the database connection and consuming functions from `extract.py`, `transform.py` and `load.py` files.

In [1]:
import pandas as pd
from utils import connection as con
from etl import extract as ex
from etl import load
from etl import transform as t

## Database conexion

__SQLAlchemy__ is a toolkit and ORM for Python. A new connection ca be stablished by creating a new `engine`. 

In [2]:
conn_transport_db= con.connect_transport_db()
conn_dw_transport= con.connect_dw_transport()

## OLTP entities

These are the entities required from our OLTP DB to be stored in the OLAP DB:

### Dimensions

- drivers
- vehicles
- users
- locations
- user_payment_methods
- payment_types
- prices

### Facts

- payments
- trips
- stops

At first glance, it's not necessary to import all entities into DataFrames because some of them require a custom query to be imported.

In [3]:
df_dimension_users = ex.extract_table('users', conn_transport_db)[['user_id', 'rating']]
df_dimension_locations = ex.extract_table('locations', conn_transport_db)
df_fact_stops = ex.extract_table('stops', conn_transport_db)

## Transform

### dimension_vehicles

- driver_id
- brand
- model
- color
- year
- tax token
- insurance_due_date_id
- rating

### fact_trips

- trip_id
- user_id
- driver_id
- pickup_spot
- destination
- initial_price
- total_distance
- final_price
- driver_rating
- user_rating
- start_time_id
- end_time_id
- cancelation_time_id
- reservation_date_id
- sub_total
- discount
- total
- payment_time_id
- refund_time_id
- payment_type
- brand
- unit
- price
- location_id

### dimension_time

- time_id
- year
- quarter
- month
- day
- day_of_week
- day_of_year
- week_of_year
- is_weekend
- hour
- minutes
- seconds

Transform process involves some data conversion and manipulation.

There are multiple columns storing `Date` and `DateTime` data-type. This means that, at some point you might find some duplicated data, specially with Date data-type.

- insurance_due_date
- start_time
- end_time
- reservation_date
- cancelation_time
- payment_time
- refund_time

It's easier for the database to look for an _integer_ value within a dimension table instead of scanning a whole column with a complex data-type such as `Date` or `DateTime`.

This is the why we are creating a _dimension_time_ table to store Date or DateTime data-type in a particular way.

Also, we are combining multiple dataframes with Date or DateTime data-types, so we can drop duplicated rows before storing the data into its destination table.

In [4]:
query = """
select distinct
	DATE_FORMAT(insurance_due_date, '%%Y%%m%%d%%H%%i%%s') AS time_id,
	year(insurance_due_date) as year,
	quarter(insurance_due_date) as quarter,
    month(insurance_due_date) as month,
    day(insurance_due_date) as day,
    dayofweek(insurance_due_date) as day_of_week,
    dayofyear(insurance_due_date) as day_of_year,
    weekofyear(insurance_due_date) as week_of_year,
    case
		when dayofweek(insurance_due_date) in (1, 7) then true
        else false
	end as is_weekend,
    hour(insurance_due_date) as hour,
    minute(insurance_due_date) as minutes,
    second(insurance_due_date) as seconds
from vehicles
where insurance_due_date is not null;
"""

df_dimension_time_insurance = t.transform_time(conn_transport_db, query)
df_dimension_time_insurance

Unnamed: 0,time_id,year,quarter,month,day,day_of_week,day_of_year,week_of_year,is_weekend,hour,minutes,seconds
0,20250418000000,2025,2,4,18,6,108,16,0,0,0,0
1,20250521000000,2025,2,5,21,4,141,21,0,0,0,0
2,20250810000000,2025,3,8,10,1,222,32,1,0,0,0
3,20240911000000,2024,3,9,11,4,255,37,0,0,0,0


In [5]:
query = """
select distinct
	DATE_FORMAT(start_time, '%%Y%%m%%d%%H%%i%%s') AS time_id,
	year(start_time) as year,
	quarter(start_time) as quarter,
    month(start_time) as month,
    day(start_time) as day,
    dayofweek(start_time) as day_of_week,
    dayofyear(start_time) as day_of_year,
    weekofyear(start_time) as week_of_year,
    case
		when dayofweek(start_time) in (1, 7) then true
        else false
	end as is_weekend,
    hour(start_time) as hour,
    minute(start_time) as minutes,
    second(start_time) as seconds
from trips
where start_time is not null;
"""

df_dimension_time_start = t.transform_time(conn_transport_db, query)
df_dimension_time_start

Unnamed: 0,time_id,year,quarter,month,day,day_of_week,day_of_year,week_of_year,is_weekend,hour,minutes,seconds
0,20240101000001,2024,1,1,1,2,1,1,0,0,0,1
1,20240102011001,2024,1,1,2,3,2,1,0,1,10,1
2,20240103022001,2024,1,1,3,4,3,1,0,2,20,1
3,20240105044001,2024,1,1,5,6,5,1,0,4,40,1
4,20240107060016,2024,1,1,7,1,7,1,1,6,0,16


In [6]:
query = """
select distinct
	DATE_FORMAT(end_time, '%%Y%%m%%d%%H%%i%%s') AS time_id,
	year(end_time) as year,
	quarter(end_time) as quarter,
    month(end_time) as month,
    day(end_time) as day,
    dayofweek(end_time) as day_of_week,
    dayofyear(end_time) as day_of_year,
    weekofyear(end_time) as week_of_year,
    case
		when dayofweek(end_time) in (1, 7) then true
        else false
	end as is_weekend,
    hour(end_time) as hour,
    minute(end_time) as minutes,
    second(end_time) as seconds
from trips
where end_time is not null;
"""

df_dimension_time_end = t.transform_time(conn_transport_db, query)
df_dimension_time_end

Unnamed: 0,time_id,year,quarter,month,day,day_of_week,day_of_year,week_of_year,is_weekend,hour,minutes,seconds
0,20240101001001,2024,1,1,1,2,1,1,0,0,10,1
1,20240102012201,2024,1,1,2,3,2,1,0,1,22,1
2,20240103023201,2024,1,1,3,4,3,1,0,2,32,1
3,20240105045501,2024,1,1,5,6,5,1,0,4,55,1
4,20240107061616,2024,1,1,7,1,7,1,1,6,16,16


In [7]:
query = """
select distinct
	DATE_FORMAT(reservation_date, '%%Y%%m%%d%%H%%i%%s') AS time_id,
	year(reservation_date) as year,
	quarter(reservation_date) as quarter,
    month(reservation_date) as month,
    day(reservation_date) as day,
    dayofweek(reservation_date) as day_of_week,
    dayofyear(reservation_date) as day_of_year,
    weekofyear(reservation_date) as week_of_year,
    case
		when dayofweek(reservation_date) in (1, 7) then true
        else false
	end as is_weekend,
    hour(reservation_date) as hour,
    minute(reservation_date) as minutes,
    second(reservation_date) as seconds
from trips
where reservation_date is not null;
"""

df_dimension_time_reservation = t.transform_time(conn_transport_db, query)
df_dimension_time_reservation

Unnamed: 0,time_id,year,quarter,month,day,day_of_week,day_of_year,week_of_year,is_weekend,hour,minutes,seconds
0,20240101231541,2024,1,1,1,2,1,1,0,23,15,41
1,20240104034401,2024,1,1,4,5,4,1,0,3,44,1
2,20240106050501,2024,1,1,6,7,6,1,1,5,5,1


In [8]:
query = """
select distinct
	DATE_FORMAT(cancelation_time, '%%Y%%m%%d%%H%%i%%s') AS time_id,
	year(cancelation_time) as year,
	quarter(cancelation_time) as quarter,
    month(cancelation_time) as month,
    day(cancelation_time) as day,
    dayofweek(cancelation_time) as day_of_week,
    dayofyear(cancelation_time) as day_of_year,
    weekofyear(cancelation_time) as week_of_year,
    case
		when dayofweek(cancelation_time) in (1, 7) then true
        else false
	end as is_weekend,
    hour(cancelation_time) as hour,
    minute(cancelation_time) as minutes,
    second(cancelation_time) as seconds
from trips
where cancelation_time is not null;
"""

df_dimension_time_cancelation = t.transform_time(conn_transport_db, query)
df_dimension_time_cancelation

Unnamed: 0,time_id,year,quarter,month,day,day_of_week,day_of_year,week_of_year,is_weekend,hour,minutes,seconds
0,20240104033001,2024,1,1,4,5,4,1,0,3,30,1
1,20240106055001,2024,1,1,6,7,6,1,1,5,50,1


In [9]:
query = """
select distinct
	DATE_FORMAT(payment_time, '%%Y%%m%%d%%H%%i%%s') AS time_id,
	year(payment_time) as year,
	quarter(payment_time) as quarter,
    month(payment_time) as month,
    day(payment_time) as day,
    dayofweek(payment_time) as day_of_week,
    dayofyear(payment_time) as day_of_year,
    weekofyear(payment_time) as week_of_year,
    case
		when dayofweek(payment_time) in (1, 7) then true
        else false
	end as is_weekend,
    hour(payment_time) as hour,
    minute(payment_time) as minutes,
    second(payment_time) as seconds
from payments
where payment_time is not null;
"""

df_dimension_time_payment = t.transform_time(conn_transport_db, query)
df_dimension_time_payment

Unnamed: 0,time_id,year,quarter,month,day,day_of_week,day_of_year,week_of_year,is_weekend,hour,minutes,seconds
0,20240101001001,2024,1,1,1,2,1,1,0,0,10,1
1,20240102012201,2024,1,1,2,3,2,1,0,1,22,1
2,20240103023201,2024,1,1,3,4,3,1,0,2,32,1
3,20240105045501,2024,1,1,5,6,5,1,0,4,55,1
4,20240107061616,2024,1,1,7,1,7,1,1,6,16,16


In [10]:
query = """
select distinct
	DATE_FORMAT(refund_time, '%%Y%%m%%d%%H%%i%%s') AS time_id,
	year(refund_time) as year,
	quarter(refund_time) as quarter,
    month(refund_time) as month,
    day(refund_time) as day,
    dayofweek(refund_time) as day_of_week,
    dayofyear(refund_time) as day_of_year,
    weekofyear(refund_time) as week_of_year,
    case
		when dayofweek(refund_time) in (1, 7) then true
        else false
	end as is_weekend,
    hour(refund_time) as hour,
    minute(refund_time) as minutes,
    second(refund_time) as seconds
from payments
where refund_time is not null;
"""

df_dimension_time_refund = t.transform_time(conn_transport_db, query)
df_dimension_time_refund

Unnamed: 0,time_id,year,quarter,month,day,day_of_week,day_of_year,week_of_year,is_weekend,hour,minutes,seconds
0,20240104033001,2024,1,1,4,5,4,1,0,3,30,1
1,20240106055001,2024,1,1,6,7,6,1,1,5,50,1


In [11]:
query = 'select * from dimension_time;'

df_dimension_time_dw = t.transform_time(conn_dw_transport, query)
df_dimension_time_dw

Unnamed: 0,time_id,year,quarter,month,day,day_of_week,day_of_year,week_of_year,is_weekend,hour,minutes,seconds


### Duplicates validation

In the next cell, we are validating the content of the combined DataFrame to drop duplicated rows. Also, there is a data comparation between the combined DataFrame and _dimension_time_ to store only non-existing data into this table. This can be done by using __Panda__'s `merge` method and specifying and `indicator` with a `True` value.

In [12]:
df_dimension_time = pd.concat([
    df_dimension_time_insurance,
    df_dimension_time_start,
    df_dimension_time_end,
    df_dimension_time_reservation,
    df_dimension_time_cancelation,
    df_dimension_time_payment,
    df_dimension_time_refund
], ignore_index=True)

df_dimension_time_clean = df_dimension_time.drop_duplicates()
df_dimension_time_clean = df_dimension_time_clean.astype('Int64')
df_dimension_time_pivot = df_dimension_time_clean.merge(df_dimension_time_dw, how='outer', indicator=True)
df_dimension_time_ready = df_dimension_time_pivot.query('_merge == "left_only"').drop(columns=['_merge'])
df_dimension_time_ready

Unnamed: 0,time_id,year,quarter,month,day,day_of_week,day_of_year,week_of_year,is_weekend,hour,minutes,seconds
0,20240101000001,2024,1,1,1,2,1,1,0,0,0,1
1,20240101001001,2024,1,1,1,2,1,1,0,0,10,1
2,20240101231541,2024,1,1,1,2,1,1,0,23,15,41
3,20240102011001,2024,1,1,2,3,2,1,0,1,10,1
4,20240102012201,2024,1,1,2,3,2,1,0,1,22,1
5,20240103022001,2024,1,1,3,4,3,1,0,2,20,1
6,20240103023201,2024,1,1,3,4,3,1,0,2,32,1
7,20240104033001,2024,1,1,4,5,4,1,0,3,30,1
8,20240104034401,2024,1,1,4,5,4,1,0,3,44,1
9,20240105044001,2024,1,1,5,6,5,1,0,4,40,1


## Load

Only clean and transformed DataFrames are going to be stored into the OLAP database and `df_dimension_time_ready` is going to be our first DataFrame to be loaded into _dimension_time_.

Later on, we are building some queries to transform Date and DateTime data-types considering that we need to repleace them with the correct `time_id` from the table _dimension_time_. All the previous data is gonna be assigned to DataFrames. 

Finally, those new DataFrames are going to be loaded into dimensions and facts tables respectively. 

In [13]:
is_empty = df_dimension_time_ready.empty
if is_empty:
    print('No data in DataFrame df_dimension_time_ready')
else:
    load.load_dw_transportation_service('dimension_time', conn_dw_transport, df_dimension_time_ready)

In [14]:
conn_dw_transport.commit()

In [15]:
query = """
select
  v.driver_id,
  v.brand,
  v.model,
  v.color,
  v.year,
  v.tax_token,
  dt.time_id as insurance_due_date_id,
  d.rating
from transportation_service_db.vehicles v
inner join dw_transportation_service.dimension_time dt on dt.time_id = date_format(v.insurance_due_date, '%%Y%%m%%d%%H%%i%%s')
inner join transportation_service_db.drivers d on d.driver_id = v.driver_id;
"""

df_dimension_vehicles = t.transform_time(conn_dw_transport, query)
df_dimension_vehicles

Unnamed: 0,driver_id,brand,model,color,year,tax_token,insurance_due_date_id,rating
0,1,Toyota,Corolla,Black,2014,,20250418000000,5.0
1,2,Nissan,Versa,White,2013,,20250521000000,3.0
2,3,Mitsubishi,Lancer,Red,2015,,20250810000000,4.0
3,4,Ford,Escape,White,2019,,20240911000000,5.0
4,5,Honda,Acura,Blue,2013,,20240911000000,3.0
5,6,Kia,Rio,Green,2019,,20240911000000,4.0
6,7,Huyndai,Elantra,Black,2021,,20240911000000,5.0


In [16]:
query = """
select
  t.trip_id,
  t.user_id,
  t.driver_id,
  t.initial_price,
  t.total_distance,
  t.final_price,
  t.driver_rating,
  t.user_rating,
  dt1.time_id as start_time_id,
  dt2.time_id as end_time_id,
  dt3.time_id as cancelation_time_id,
  dt4.time_id as reservation_time_id,
  p.sub_total,
  p.discount,
  p.total,
  dt5.time_id as payment_time_id,
  dt6.time_id as refund_time_id,
  pt.type as payment_type,
  pt.brand,
  pr.unit,
  pr.price,
  u.location_id
from transportation_service_db.trips t
inner join transportation_service_db.payments p on p.trip_id = t.trip_id
inner join transportation_service_db.user_payment_methods upm on upm.user_payment_method_id = p.user_payment_method_id
inner join transportation_service_db.payment_types pt on pt.payment_type_id = upm.payment_type_id
inner join transportation_service_db.prices pr on pr.price_id = t.price_id
inner join transportation_service_db.users u on u.user_id = t.user_id
left join dw_transportation_service.dimension_time dt1 on dt1.time_id = date_format(t.start_time, '%%Y%%m%%d%%H%%i%%s')
left join dw_transportation_service.dimension_time dt2 on dt2.time_id = date_format(t.end_time, '%%Y%%m%%d%%H%%i%%s')
left join dw_transportation_service.dimension_time dt3 on dt3.time_id = date_format(t.cancelation_time, '%%Y%%m%%d%%H%%i%%s')
left join dw_transportation_service.dimension_time dt4 on dt4.time_id = date_format(t.reservation_date, '%%Y%%m%%d%%H%%i%%s')
left join dw_transportation_service.dimension_time dt5 on dt5.time_id = date_format(p.payment_time, '%%Y%%m%%d%%H%%i%%s')
left join dw_transportation_service.dimension_time dt6 on dt6.time_id = date_format(p.refund_time, '%%Y%%m%%d%%H%%i%%s');
"""

df_trips = t.transform_time(conn_transport_db, query)
df_trips

Unnamed: 0,trip_id,user_id,driver_id,initial_price,total_distance,final_price,driver_rating,user_rating,start_time_id,end_time_id,...,sub_total,discount,total,payment_time_id,refund_time_id,payment_type,brand,unit,price,location_id
0,1,1,7,4.4,2.3,4.62,5,3,,,...,4.62,0.0,4.62,,,Credit card,Mastercard,Mile,2.0,1
1,2,2,6,0.8,0.4,0.82,4,4,,,...,0.82,0.21,0.61,,,Credit card,Visa,Mile,2.0,1
2,3,3,5,2.4,1.2,2.5,3,5,,,...,2.5,0.75,1.75,,,Cash,,Mile,2.0,1
3,4,4,4,1.54,2.3,1.62,5,4,,,...,1.62,0.0,1.62,,,Voucher,,Kilometer,0.7,2
4,5,5,3,1.26,1.8,1.29,4,5,,,...,1.29,0.19,1.1,,,Gitf card,,Kilometer,0.7,2
5,6,6,2,0.8,0.8,0.83,3,4,,,...,0.83,0.0,0.83,,,App cash,,Kilometer,1.0,3
6,7,7,1,5.0,5.2,5.15,5,5,,,...,5.15,0.0,5.15,,,Credit card,American Express,Kilometer,1.0,3


In [17]:

df_stops_pivot_max = (df_fact_stops[['trip_id', 'order']].groupby('trip_id').max('order')).sort_values(by='trip_id')
df_stops_pivot = df_stops_pivot_max.merge(df_fact_stops, on=['trip_id', 'order'], how='left')[['trip_id', 'destination']]
df_stops_pivot_min = df_fact_stops.query('order == 1')[['trip_id', 'pickup_spot']]
df_stops_pivot_min_max = df_stops_pivot_min.merge(df_stops_pivot, on=['trip_id'], how='inner')
# trips - complete table
df_fact_trips = df_trips.merge(df_stops_pivot_min_max, on='trip_id', how='inner')[[
    'trip_id',
    'user_id',
    'driver_id',
    'pickup_spot',
    'destination',
    'initial_price',
    'total_distance',
    'final_price',
    'driver_rating',
    'user_rating',
    'start_time_id',
    'end_time_id',
    'cancelation_time_id',
    'reservation_time_id',
    'sub_total',
    'discount',
    'total',
    'payment_time_id',
    'refund_time_id',
    'payment_type',
    'brand',
    'unit',
    'price',
    'location_id'
]]
df_fact_trips

Unnamed: 0,trip_id,user_id,driver_id,pickup_spot,destination,initial_price,total_distance,final_price,driver_rating,user_rating,...,sub_total,discount,total,payment_time_id,refund_time_id,payment_type,brand,unit,price,location_id
0,1,1,7,"b""\x00\x00\x00\x00\x01\x01\x00\x00\x00\xbfE'K\...",b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\xee{\xd...,4.4,2.3,4.62,5,3,...,4.62,0.0,4.62,,,Credit card,Mastercard,Mile,2.0,1
1,2,2,6,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\x88\x84...,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\x11\xe3...,0.8,0.4,0.82,4,4,...,0.82,0.21,0.61,,,Credit card,Visa,Mile,2.0,1
2,3,3,5,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\xcd\x95...,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\x90\x86...,2.4,1.2,2.5,3,5,...,2.5,0.75,1.75,,,Cash,,Mile,2.0,1
3,4,4,4,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\xfdJ\xe...,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\x94\xdc...,1.54,2.3,1.62,5,4,...,1.62,0.0,1.62,,,Voucher,,Kilometer,0.7,2
4,5,5,3,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00~p>u\xac...,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00L\xfa{)<...,1.26,1.8,1.29,4,5,...,1.29,0.19,1.1,,,Gitf card,,Kilometer,0.7,2
5,6,6,2,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00E.8\x83\...,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\x91\x9d...,0.8,0.8,0.83,3,4,...,0.83,0.0,0.83,,,App cash,,Kilometer,1.0,3
6,7,7,1,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\xfd.l\x...,b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\t5C\xaa...,5.0,5.2,5.15,5,5,...,5.15,0.0,5.15,,,Credit card,American Express,Kilometer,1.0,3


In [18]:
load.load_dw_transportation_service('dimension_locations', conn_dw_transport, df_dimension_locations)

In [19]:
load.load_dw_transportation_service('dimension_users', conn_dw_transport, df_dimension_users)

In [20]:
load.load_dw_transportation_service('dimension_vehicles', conn_dw_transport, df_dimension_vehicles)

In [21]:
load.load_dw_transportation_service('fact_trips', conn_dw_transport, df_fact_trips)

In [22]:
load.load_dw_transportation_service('fact_stops', conn_dw_transport, df_fact_stops)

## Commit transaction and close connection

Once all tables from the OLAP database are loaded with their DataFrames, it's important to commit all changes and close the connection.

In [23]:
conn_transport_db.commit()
conn_transport_db.close()
conn_dw_transport.commit()
conn_dw_transport.close()