# RIDE SERVICE ANALYTICS (ETL FRAMEWORK)

## IMPORT LIBRARIES

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import time

## ETL PIPELINE (EXTRACT TRANSFORM LOAD)

### EXTRACT

In [None]:
def extract_data(url):
    data = pd.read_csv(url)
    return data


### TRANSFORM

#### CLEAN DUPLICATES

In [None]:
def drop_duplicates(data):
    drop_d = data.drop_duplicates()
    return drop_d


#### EXPORT BAD RECORDS

In [None]:
def get_nulls(data):
    get_nulls_df = data[data.isnull().any(axis=1)]
    timestamp = time.strftime("%Y%m%d_%H%M%S")
    file_name = f"/content/drive/MyDrive/Null_Records_{timestamp}.csv"
    get_nulls_df.to_csv(file_name)
    return get_nulls_df


#### CLEAN NULL VALUES

In [None]:
def drop_nulls(data):
    drop_n = data.dropna()
    return drop_n


#### BUILD DATE-TIME DIMENSIONS

In [None]:
def datetime_dim(data, feature, name):
    datetime_dim_df = data[[feature]].drop_duplicates().reset_index(drop=True)
    datetime_dim_df[feature] = pd.to_datetime(datetime_dim_df[feature])
    datetime_dim_df[f"{name}_Day"] = datetime_dim_df[feature].dt.day
    datetime_dim_df[f"{name}_Month"] = datetime_dim_df[feature].dt.month
    datetime_dim_df[f"{name}_Quarter"] = datetime_dim_df[feature].dt.quarter
    datetime_dim_df[f"{name}_Year"] = datetime_dim_df[feature].dt.year
    datetime_dim_df[f"{name}_Hour"] = datetime_dim_df[feature].dt.hour
    datetime_dim_df[f"{name}_Minute"] = datetime_dim_df[feature].dt.minute
    datetime_dim_df[f"{name}_Second"] = datetime_dim_df[feature].dt.second
    return datetime_dim_df


#### NORMALIZE LOCATION BASED DIMENSIONS

In [None]:
def loc_dim(data, feature_list, feature_name):
    loc_dim_df = data[feature_list].drop_duplicates().reset_index(drop=True)
    loc_dim_df[feature_name] = loc_dim_df.index
    return loc_dim_df


#### NORMALIZE SIMPLE FEATURE DIMENSIONS

In [None]:
def norm_dim(data, feature_name, new_feature, feature_dict):
    norm_dim_df = data[[feature_name]].drop_duplicates().reset_index(drop=True)
    norm_dim_df[new_feature] = norm_dim_df[feature_name].map(feature_dict)
    return norm_dim_df


#### ACTUAL TRANSFORMATION

In [None]:
def perform_transform(data):
    # A) Clean Duplicates:
    data = drop_duplicates(data)

    # B) Export Null Records:
    null_records_df = get_nulls(data)

    # C) Clean Null Values:
    data = drop_nulls(data)

    # 1) Pickup DateTime Dimensions:
    pickup_date_time_dim = datetime_dim(data, "tpep_pickup_datetime", "Pickup")

    # 2) Dropoff DateTime Dimensions:
    dropoff_date_time_dim = datetime_dim(data, "tpep_dropoff_datetime", "Dropoff")

    # 3) Pickup Location Dimension:
    pickup_location_dim = loc_dim(data, ["pickup_longitude", "pickup_latitude"], "PickupLocation_ID")

    # 4) Dropoff Location Dimension:
    dropoff_location_dim = loc_dim(data, ["dropoff_longitude", "dropoff_latitude"], "DropoffLocation_ID")

    # 5) RateCode Dimension:
    rate_code_dim = norm_dim(data, "RatecodeID", "RateCode_Type", {
        1: "Standard rate",
        2: "JFK",
        3: "Newark",
        4: "Nassau or Westchester",
        5: "Negotiated fare",
        6: "Group ride"
    })

    # 6) Payment Type Dimension:
    payment_type_dim = norm_dim(data, "payment_type", "payment_category", {
        1: "Credit card",
        2: "Cash",
        3: "No charge",
        4: "Dispute",
        5: "Unknown",
        6: "Voided trip"
    })

    # 7) Fact Records:
    # fact_records = data.merge(pickup_location_dim, on=["pickup_longitude", "pickup_latitude"])
    # fact_records = fact_records.merge(dropoff_location_dim, on=["dropoff_longitude", "dropoff_latitude"])
    # fact_records = fact_records.merge(rate_code_dim, on="RatecodeID")
    # fact_records = fact_records.merge(payment_type_dim, on="payment_type")
    # fact_records["Trip_ID"] = fact_records.index
    # fact_records = fact_records[["Trip_ID", "VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance",
    #                              "PickupLocation_ID", "DropoffLocation_ID", "RatecodeID", "store_and_fwd_flag", "payment_type", "fare_amount",
    #                              "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount"]]

    # 8) Final Records:
    Final_Table = data.copy()
    Final_Table = data.merge(pickup_location_dim, on=["pickup_longitude", "pickup_latitude"])
    Final_Table = Final_Table.merge(dropoff_location_dim, on=["dropoff_longitude", "dropoff_latitude"])
    Final_Table = Final_Table.merge(rate_code_dim, on="RatecodeID")
    Final_Table = Final_Table.merge(payment_type_dim, on="payment_type")
    Final_Table["Trip_ID"] = Final_Table.index
    # Final_Table["Pickup_Day"] = Final_Table["tpep_pickup_datetime"].dt.day
    # Final_Table["Pickup_Month"] = Final_Table["tpep_pickup_datetime"].dt.month
    # Final_Table["Pickup_Quarter"] = Final_Table["tpep_pickup_datetime"].dt.quarter
    # Final_Table["Pickup_Year"] = Final_Table["tpep_pickup_datetime"].dt.year
    # Final_Table["Pickup_Hour"] = Final_Table["tpep_pickup_datetime"].dt.hour
    # Final_Table["Pickup_Minute"] = Final_Table["tpep_pickup_datetime"].dt.minute
    # Final_Table["Pickup_Second"] = Final_Table["tpep_pickup_datetime"].dt.second
    # Final_Table["Dropoff_Day"] = Final_Table["tpep_dropoff_datetime"].dt.day
    # Final_Table["Dropoff_Month"] = Final_Table["tpep_dropoff_datetime"].dt.month
    # Final_Table["Dropoff_Quarter"] = Final_Table["tpep_dropoff_datetime"].dt.quarter
    # Final_Table["Dropoff_Year"] = Final_Table["tpep_dropoff_datetime"].dt.year
    # Final_Table["Dropoff_Hour"] = Final_Table["tpep_dropoff_datetime"].dt.hour
    # Final_Table["Dropoff_Minute"] = Final_Table["tpep_dropoff_datetime"].dt.minute
    # Final_Table["Dropoff_Second"] = Final_Table["tpep_dropoff_datetime"].dt.second
    # Final_Table = Final_Table[["Trip_ID", "VendorID", "tpep_pickup_datetime", "Pickup_Day", "Pickup_Month",
    #                            "Pickup_Quarter", "Pickup_Year", "Pickup_Hour", "Pickup_Minute", "Pickup_Second",
    #                            "tpep_dropoff_datetime", "Dropoff_Day", "Dropoff_Month", "Dropoff_Quarter",
    #                            "Dropoff_Year", "Dropoff_Hour", "Dropoff_Minute", "Dropoff_Second",
    #                              "passenger_count", "trip_distance", "PickupLocation_ID", "pickup_longitude",
    #                              "pickup_latitude", "DropoffLocation_ID", "dropoff_longitude", "dropoff_latitude", "RatecodeID",
    #                              "RateCode_Type", "store_and_fwd_flag", "payment_type", "payment_category", "fare_amount",
    #                              "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount"]]

    return Final_Table

### LOAD

In [None]:
# Example usage:
url = "https://raw.githubusercontent.com/darshilparmar/uber-etl-pipeline-data-engineering-project/main/data/uber_data.csv"
df = extract_data(url)
transformed_data = perform_transform(df)
final_data = transformed_data
final_data["tpep_pickup_datetime"] = pd.to_datetime(final_data["tpep_pickup_datetime"])
final_data["tpep_dropoff_datetime"] = pd.to_datetime(final_data["tpep_dropoff_datetime"])
final_data["Pickup_Day"] = final_data["tpep_pickup_datetime"].dt.day
final_data["Pickup_Month"] = final_data["tpep_pickup_datetime"].dt.month
final_data["Pickup_Quarter"] = final_data["tpep_pickup_datetime"].dt.quarter
final_data["Pickup_Year"] = final_data["tpep_pickup_datetime"].dt.year
final_data["Pickup_Hour"] = final_data["tpep_pickup_datetime"].dt.hour
final_data["Pickup_Minute"] = final_data["tpep_pickup_datetime"].dt.minute
final_data["Pickup_Second"] = final_data["tpep_pickup_datetime"].dt.second
final_data["Dropoff_Day"] = final_data["tpep_dropoff_datetime"].dt.day
final_data["Dropoff_Month"] = final_data["tpep_dropoff_datetime"].dt.month
final_data["Dropoff_Quarter"] = final_data["tpep_dropoff_datetime"].dt.quarter
final_data["Dropoff_Year"] = final_data["tpep_dropoff_datetime"].dt.year
final_data["Dropoff_Hour"] = final_data["tpep_dropoff_datetime"].dt.hour
final_data["Dropoff_Minute"] = final_data["tpep_dropoff_datetime"].dt.minute
final_data["Dropoff_Second"] = final_data["tpep_dropoff_datetime"].dt.second
final_data = final_data[["Trip_ID", "VendorID", "tpep_pickup_datetime", "Pickup_Day", "Pickup_Month",
                            "Pickup_Quarter", "Pickup_Year", "Pickup_Hour", "Pickup_Minute", "Pickup_Second",
                            "tpep_dropoff_datetime", "Dropoff_Day", "Dropoff_Month", "Dropoff_Quarter",
                            "Dropoff_Year", "Dropoff_Hour", "Dropoff_Minute", "Dropoff_Second",
                              "passenger_count", "trip_distance", "PickupLocation_ID", "pickup_longitude",
                              "pickup_latitude", "DropoffLocation_ID", "dropoff_longitude", "dropoff_latitude", "RatecodeID",
                              "RateCode_Type", "store_and_fwd_flag", "payment_type", "payment_category", "fare_amount",
                              "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount"]]
final_data.head()

Unnamed: 0,Trip_ID,VendorID,tpep_pickup_datetime,Pickup_Day,Pickup_Month,Pickup_Quarter,Pickup_Year,Pickup_Hour,Pickup_Minute,Pickup_Second,...,store_and_fwd_flag,payment_type,payment_category,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,0,1,2016-03-01,1,3,1,2016,0,0,0,...,N,1,Credit card,9.0,0.5,0.5,2.05,0.0,0.3,12.35
1,1,1,2016-03-01,1,3,1,2016,0,0,0,...,N,1,Credit card,11.0,0.5,0.5,3.05,0.0,0.3,15.35
2,2,2,2016-03-01,1,3,1,2016,0,0,0,...,N,1,Credit card,54.5,0.5,0.5,8.0,0.0,0.3,63.8
3,3,2,2016-03-01,1,3,1,2016,0,0,0,...,N,1,Credit card,31.5,0.0,0.5,3.78,5.54,0.3,41.62
4,4,2,2016-03-01,1,3,1,2016,0,0,0,...,N,1,Credit card,23.5,1.0,0.5,5.06,0.0,0.3,30.36


In [None]:
final_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 38 columns):
 #   Column                 Non-Null Count   Dtype         
---  ------                 --------------   -----         
 0   Trip_ID                100000 non-null  int64         
 1   VendorID               100000 non-null  int64         
 2   tpep_pickup_datetime   100000 non-null  datetime64[ns]
 3   Pickup_Day             100000 non-null  int32         
 4   Pickup_Month           100000 non-null  int32         
 5   Pickup_Quarter         100000 non-null  int32         
 6   Pickup_Year            100000 non-null  int32         
 7   Pickup_Hour            100000 non-null  int32         
 8   Pickup_Minute          100000 non-null  int32         
 9   Pickup_Second          100000 non-null  int32         
 10  tpep_dropoff_datetime  100000 non-null  datetime64[ns]
 11  Dropoff_Day            100000 non-null  int32         
 12  Dropoff_Month          100000 non-null  int32

#### EXECUTION

In [None]:
timestamp = time.strftime("%Y%m%d_%H%M%S")
final_data.to_csv(f"/content/drive/MyDrive/final_data{timestamp}.csv")

----