In [1]:
import psycopg2
import pandas as pd

## Acess to database with raw data

In [2]:
conn_params = {
    "host": "localhost",
    "database": "ny_taxi",
    "user": "root",
    "password": "root"
}

In [10]:
df = pd.DataFrame()

with psycopg2.connect(**conn_params) as conn, conn.cursor() as cursor:
    print('connect to the database')
    cursor.execute("SELECT * FROM yellow_taxi_data LIMIT 1 OFFSET 0;")
    column_names = [desc[0] for desc in cursor.description]
    print('Columns: ',column_names)

    df = pd.DataFrame(columns=column_names)

    total_rows = cursor.fetchone()[0]

    start_row = 0
    batch_size = 5000

    print('Quering rows')
    while start_row < total_rows:
        query = f'SELECT * FROM yellow_taxi_data LIMIT {batch_size} OFFSET {start_row};'
        print(f'{batch_size} / {total_rows}')
        cursor.execute(query)
        data = cursor.fetchall()

        new_df = pd.DataFrame(data, columns=column_names)
        df = pd.concat([df, new_df], ignore_index=True)

        start_row += batch_size

connect to the database
Columns:  ['index', 'VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee']
Quering rows


In [7]:
df.head()

Unnamed: 0,index,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee


## Fact Table and Dimension Tables Creation

In [None]:
datetime_dim = df[['tpep_pickup_datetime','tpep_dropoff_datetime']].drop_duplicates().reset_index(drop=True)

datetime_dim['pick_hour'] = datetime_dim['tpep_pickup_datetime'].dt.hour
datetime_dim['pick_day'] = datetime_dim['tpep_pickup_datetime'].dt.day
datetime_dim['pick_month'] = datetime_dim['tpep_pickup_datetime'].dt.month
datetime_dim['pick_year'] = datetime_dim['tpep_pickup_datetime'].dt.year
datetime_dim['pick_weekday'] = datetime_dim['tpep_pickup_datetime'].dt.weekday

datetime_dim['drop_hour'] = datetime_dim['tpep_dropoff_datetime'].dt.hour
datetime_dim['drop_day'] = datetime_dim['tpep_dropoff_datetime'].dt.day
datetime_dim['drop_month'] = datetime_dim['tpep_dropoff_datetime'].dt.month
datetime_dim['drop_year'] = datetime_dim['tpep_dropoff_datetime'].dt.year
datetime_dim['drop_weekday'] = datetime_dim['tpep_dropoff_datetime'].dt.weekday

datetime_dim['datetime_id'] = datetime_dim.index
datetime_dim = datetime_dim[['datetime_id', 'tpep_pickup_datetime', 'pick_hour', 'pick_day', 'pick_month', 'pick_year', 'pick_weekday',
                             'tpep_dropoff_datetime', 'drop_hour', 'drop_day', 'drop_month', 'drop_year', 'drop_weekday']]

In [None]:
passenger_count_dim = df[['passenger_count']].drop_duplicates().reset_index(drop=True)
passenger_count_dim['passenger_count_id'] = passenger_count_dim.index
passenger_count_dim = passenger_count_dim[['passenger_count_id','passenger_count']]

In [None]:
trip_distance_dim = df[['trip_distance']].drop_duplicates().reset_index(drop=True)
trip_distance_dim['trip_distance_id'] = trip_distance_dim.index
trip_distance_dim = trip_distance_dim[['trip_distance_id','trip_distance']]

In [None]:
rate_code_type = {
    1:"Standard rate",
    2:"JFK",
    3:"Newark",
    4:"Nassau or Westchester",
    5:"Negotiated fare",
    6:"Group ride"
}
rate_code_dim = df[['RatecodeID']].drop_duplicates().reset_index(drop=True)
rate_code_dim['rate_code_id'] = rate_code_dim.index
rate_code_dim['rate_code_name'] = rate_code_dim['RatecodeID'].map(rate_code_type)
rate_code_dim = rate_code_dim[['rate_code_id','RatecodeID','rate_code_name']]

In [None]:
payment_type_name = {
    1:"Credit card",
    2:"Cash",
    3:"No charge",
    4:"Dispute",
    5:"Unknown",
    6:"Voided trip"
}
payment_type_dim = df[['payment_type']].drop_duplicates().reset_index(drop=True)
payment_type_dim['payment_type_id'] = payment_type_dim.index
payment_type_dim['payment_type_name'] = payment_type_dim['payment_type'].map(payment_type_name)
payment_type_dim = payment_type_dim[['payment_type_id','payment_type','payment_type_name']]

### Especial part to location transformations

In [None]:
df_locations = pd.read_csv('https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv')
df_locations

In [122]:
pickup_location_dim = pd.DataFrame()
pickup_location_dim['LocationID'] = df[['PULocationID']].drop_duplicates().reset_index(drop=True)
pickup_location_dim['pickup_location_id'] = pickup_location_dim.index
pickup_location_dim = pd.merge(pickup_location_dim, df_locations, on='LocationID')
pickup_location_dim = pickup_location_dim.rename(columns={'LocationID':'PULocationID'})

dropoff_location_dim = pd.DataFrame()
dropoff_location_dim['LocationID'] = df[['DOLocationID']].drop_duplicates().reset_index(drop=True)
dropoff_location_dim['dropoff_location_id'] = dropoff_location_dim.index
dropoff_location_dim = pd.merge(dropoff_location_dim, df_locations, on='LocationID')
dropoff_location_dim = dropoff_location_dim.rename(columns={'LocationID':'DOLocationID'})

### Fact Table Definition

In [129]:
fact_table = df.merge(passenger_count_dim, on='passenger_count') \
             .merge(trip_distance_dim, on='trip_distance') \
             .merge(rate_code_dim, on='RatecodeID') \
             .merge(pickup_location_dim, on='PULocationID') \
             .merge(dropoff_location_dim, on='DOLocationID')\
             .merge(datetime_dim, on=['tpep_pickup_datetime','tpep_dropoff_datetime']) \
             .merge(payment_type_dim, on='payment_type') \
             [['VendorID', 'datetime_id', 'passenger_count_id',
               'trip_distance_id', 'rate_code_id', 'store_and_fwd_flag', 'pickup_location_id', 'dropoff_location_id',
               'payment_type_id', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
               'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee']]


In [132]:
fact_table.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1565000 entries, 0 to 1564999
Data columns (total 18 columns):
 #   Column                 Non-Null Count    Dtype  
---  ------                 --------------    -----  
 0   VendorID               1565000 non-null  object 
 1   datetime_id            1565000 non-null  int64  
 2   passenger_count_id     1565000 non-null  int64  
 3   trip_distance_id       1565000 non-null  int64  
 4   rate_code_id           1565000 non-null  int64  
 5   store_and_fwd_flag     1532935 non-null  object 
 6   pickup_location_id     1565000 non-null  int64  
 7   dropoff_location_id    1565000 non-null  int64  
 8   payment_type_id        1565000 non-null  int64  
 9   fare_amount            1565000 non-null  float64
 10  extra                  1565000 non-null  float64
 11  mta_tax                1565000 non-null  float64
 12  tip_amount             1565000 non-null  float64
 13  tolls_amount           1565000 non-null  float64
 14  improvement_surcha