# NYC_Yellow_Taxi_Data_Pipeline


In [1]:
# importing the necessary libraries
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime
from time import time

In [2]:
#  creating the database connection
def connect(user,password,host,port,db_name):
    '''establish connection with postgresql database'''
    try:
        engine=create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db_name}')
        engine.connect()
    except Exception as e:
        print(e)
    else:
        print('Successfully connected to the database')
    return engine

### Logging File 

In [3]:
# time format
date_format='%Y-%m-%d %H:%M:%S'
current_time=datetime.now().strftime(date_format)

In [4]:
def log(message):
    with open('log.txt', 'a') as file:
        file.write(message)

## Data Ingestion and Preparation

In [5]:
# Extract dataset
def extract(chunk=100000):
    '''extraction of dataset in chunks'''
    try:
        path=%pwd
        filepath=f'{path}\\yellow_tripdata_2016-02.csv'
        dataframe=pd.read_csv(filepath,chunksize=chunk)
    except Exception as e:
        print(e)
    return dataframe


In [6]:
# transform dataset
def transform(dataframe):
    '''converting dtype for tpep_pickup_datetime and tpep_dropoff_datetime to datetime64'''
    try:
        dataframe['tpep_pickup_datetime']=pd.to_datetime(dataframe['tpep_pickup_datetime'])
        dataframe['tpep_dropoff_datetime']=pd.to_datetime(dataframe['tpep_dropoff_datetime'])
    except Exception as e:
        print(e)

In [7]:
# ingestion
engine=connect()
def ingestion(dataframe):
    '''Ingestion of dataset into the database'''
    dataframe.to_sql(name='nyc_yellow_taxi', con=engine, if_exists='append',index=False)

Successfully connected to the database


In [8]:
# loading into the postgre database
def load(chunk,batch=10):
    "loading of datasets into the database"
    message=f'\n{current_time}\t{chunk*batch:,} of NYC Yellow Taxi Trip datasets to be loaded.\n\n\n'
    log(message)
    try:
        log(message=130*"-"+"\n")
        message=f'|\tBatch\t| Data Loaded \t| Remaining Data  |\t Start Time \t|\t End Time \t|\t Time Spent(seconds) \t|\n'
        log(message)
        log(message=130*"-"+"\n")
        count=1
        data=extract()
        overall_start_time=time()
        for dataframe in data:
            if count>batch:
                break;
            else:
                start_time=datetime.now().strftime("%H:%M:%S")
                transform(dataframe)
                ingestion(dataframe)
                
            end_time=datetime.now().strftime("%H:%M:%S")
            time_difference=datetime.strptime(end_time,'%H:%M:%S')-datetime.strptime(start_time,'%H:%M:%S')
            message=f'|\t {count} \t|\t {count*chunk:,} \t|\t {(batch-count)*chunk:,} \t|\t {start_time} \t|\t {end_time} \t|\t {time_difference.total_seconds()}  \t|\n'
            log(message)
            log(message=124*"-"+"\n")
            count+=1
    except Exception as e:
        message=f'\n{current_time}\t An error occurred:{e}'
        log(message)
    else:
        message=f'\n{current_time}\t{chunk*batch:,} of datasets have been successfully loaded in {((time()-overall_start_time)/60):3f} minutes.\n'
        log(message)

In [9]:
load(100000)

## Building Reporting Pipelines

## operations_and_performance

In [10]:
try:
    reporting='''
    --How many trips were recorded in the dataset
    
    SELECT CURRENT_DATE AS ingestion_date, * FROM 

        (SELECT total_trips FROM

            (SELECT COUNT (*) AS total_trips FROM nyc_yellow_taxi) AS records) AS a,
  
    --What is the average trip distance for all trips

        (SELECT avg_trip_distance FROM
            (SELECT AVG(trip_distance) AS avg_trip_distance from nyc_yellow_taxi) as avg_distance) AS b,
            
    --Which Vendor has the highest number of trips

        (SELECT highest_trip_vendor  FROM
            (SELECT 
                (CASE
                    WHEN "VendorID"= 1 THEN 'Creative Mobile Technologies'
                    ELSE 'VeriFone Inc.'
                END) AS highest_trip_vendor, 
            count(*) AS trip_number FROM nyc_yellow_taxi
            GROUP BY  highest_trip_vendor 
            ORDER BY 2 DESC
            LIMIT 1) AS trip_vendor) AS c ,
            
    --Which Vendor has the lowest number of trips

        (SELECT lowest_trip_vendor  FROM
            (SELECT 
                (CASE
                    WHEN "VendorID"= 1 THEN 'Creative Mobile Technologies'
                    ELSE 'VeriFone Inc.'
                END) AS lowest_trip_vendor, 
            count(*) AS trip_number FROM nyc_yellow_taxi
            GROUP BY  lowest_trip_vendor 
            ORDER BY 2 ASC
            LIMIT 1) AS trip_vendor) AS d ,
            
    --What is the average passenger count per trip

        (SELECT avg_passenger_count FROM
            (SELECT round(AVG(passenger_count),0) as avg_passenger_count from nyc_yellow_taxi) AS passenger) AS e;
    '''
    
    df_OP=pd.read_sql(reporting,con=engine)
    
except Exception as e:
    print(e)
        
else:
    df_OP.to_sql('op_and_perf_rpt', con=engine, if_exists='append',index=False)
    print('Operations and performance reporting table generated')

Operations and performance reporting table generated


## customer_demographics_and_preferences

In [11]:
try:
    reporting='''
    --What is the average trip amount given by passengers
    
    SELECT CURRENT_DATE AS ingestion_date,* FROM
        (SELECT avg_tip_amount FROM 
            (SELECT AVG(tip_amount) as avg_tip_amount FROM nyc_yellow_taxi) AS tip_amount) AS a,

    --What is the average trip distance by passengers

        (SELECT avg_trip_distance_by_passenger FROM
            (SELECT AVG(trip_distance) as avg_trip_distance_by_passenger FROM nyc_yellow_taxi) 
            AS trip_distance_by_passenger) AS b,
            
    --How many trips were flagged as 'store and forward'

        (SELECT store_and_forward_trips FROM
            (SELECT COUNT(*) AS store_and_forward_trips
            FROM nyc_yellow_taxi
            WHERE store_and_fwd_flag <> 'N') AS store_and_forward) AS c,
    
    --How many trips were shared rides (passenger count > 1)
    
    (SELECT shared_ride_count FROM
            (SELECT COUNT(*) AS shared_ride_count
            FROM nyc_yellow_taxi
            WHERE passenger_count > 1 
            AND trip_distance>0) AS shared_ride) AS d;
    '''

    df_CDP=pd.read_sql(reporting,con=engine)
    
except Exception as e:
    print(e)
    
else:
    df_CDP.to_sql('cust_demograf_pref', con=engine, if_exists='append',index=False)
    print('Customer, Demographics and Preferences reporting table generated')

Customer, Demographics and Preferences reporting table generated


## financial_performance_and_trends

In [12]:
try:
    reporting='''
    
    --What is the average fare amount per trip
    
    SELECT CURRENT_DATE AS ingestion_date, * FROM 
        (SELECT avg_fare_amount FROM 
            (SELECT AVG(fare_amount) as avg_fare_amount FROM nyc_yellow_taxi) AS fare_amount) AS a,
            
    --How much revenue was generated from tolls and surcharges combined
    
        (SELECT tolls_and_surcharges_revenue FROM
            (SELECT (sum(tolls_amount+improvement_surcharge)) as tolls_and_surcharges_revenue
            FROM nyc_yellow_taxi) AS total_tolls_and_surcharges_revenue) AS b,
            
    --What is the average total amount paid by passengers
    
        (SELECT avg_total_amount FROM
            (SELECT AVG(total_amount) as avg_total_amount
            FROM nyc_yellow_taxi) AS total_amount) AS c;
    '''

    finan_perf=pd.read_sql(reporting,con=engine)

except Exception as e:
    print(e)
    
else:
    finan_perf.to_sql('financial_perf',con=engine,if_exists='append',index=False)
    print('Financial Performance reporting table generated')

Financial Performance reporting table generated
