# ETL

                                                                              Team 1

In [1]:
# Importing the required libraries
import pandas as pd  # Importing the pandas library for data manipulation and analysis
import pandas_profiling  # Importing the pandas_profiling library for generating data profiling reports
import requests  # Importing the requests library for making HTTP requests
import boto3  # Importing the boto3 library for interacting with AWS services
from sqlalchemy import create_engine # Importing the create_engine function from the sqlalchemy library for database interaction
import configparser  # Importing the configparser library for reading configuration files
import warnings  # Importing the warnings module for handling warnings
warnings.filterwarnings("ignore")  # Ignore warning messages
from botocore.exceptions import NoCredentialsError  # Importing the NoCredentialsError exception from the botocore library
import json # Importing jason library to ready the jason file
from io import StringIO

# load file from S3:datalz2 file: Yellow_Taxi_Trip_Data
# Define your bucket names, file patterns, and AWS profile name
taxi_bucket_name = 'datalz2'
zone_bucket_name = 'datalz1'
taxi_file_pattern = 'Yellow_Taxi_Trip_Data'
zone_file_pattern = 'taxi_zones'
aws_profile_name = 'datapro'

# Initialize a session using your profile
session = boto3.Session(profile_name=aws_profile_name)
s3 = session.client('s3')

# Getting taxi data file list and reading the first file
response_taxi = s3.list_objects_v2(Bucket=taxi_bucket_name, Prefix=taxi_file_pattern)
if 'Contents' in response_taxi:
    for file in response_taxi['Contents']:
        file_name = file['Key']
        if taxi_file_pattern in file_name:
            obj = s3.get_object(Bucket=taxi_bucket_name, Key=file_name)
            dfs = pd.read_csv(StringIO(obj['Body'].read().decode('utf-8')))
            break

# Getting zone data file list and reading the first file
response_zone = s3.list_objects_v2(Bucket=zone_bucket_name, Prefix=zone_file_pattern)
if 'Contents' in response_zone:
    for file in response_zone['Contents']:
        file_name = file['Key']
        if zone_file_pattern in file_name:
            obj = s3.get_object(Bucket=zone_bucket_name, Key=file_name)
            tzdfs = pd.read_csv(StringIO(obj['Body'].read().decode('utf-8')))
            break
            


  import pandas_profiling  # Importing the pandas_profiling library for generating data profiling reports


In [4]:
##-------##---------##--------##--------##---------##---------##-----------##

# Droping Duplicates
dfs = dfs.drop_duplicates().reset_index(drop=True)
# Droping Na values
dfs = dfs.dropna(how='any')

# Standarizing the values as per the data dictionary
	# In Passenger count should not be grater than 9
	# In RatecodeID should not be grater than 6
	# In payment_type should not be grater than 6

dfs = dfs.loc[(dfs['ratecodeid'] <= 6) & (dfs['passenger_count'] <= 9) & (dfs['payment_type'] <= 6)]

# Convert all column names to lowercase and replace spaces with underscores
dfs.columns = dfs.columns.str.lower().str.replace(' ', '_')

# Rename specific columns
dfs = dfs.rename(columns={
    'vendorid': 'vendor_id',
    'ratecodeid': 'rate_code_id',
    'pulocationid': 'pu_location_id',
    'dolocationid': 'do_location_id',
})



In [5]:
##-------##---------##--------##--------##---------##---------##-----------##

# Standardize the columns dtypes
	# pickup_datetime and dropoff_datetime were both of type object.
	# convert it to datetime object

dfs['tpep_pickup_datetime']=pd.to_datetime(dfs['tpep_pickup_datetime'])
dfs['tpep_dropoff_datetime']=pd.to_datetime(dfs['tpep_dropoff_datetime'])

# Fill NaN values with a specific value
dfs['vendor_id'].fillna(-1, inplace=True)
dfs['passenger_count'].fillna(-1, inplace=True)
dfs['rate_code_id'].fillna(-1, inplace=True)
dfs['payment_type'].fillna(-1, inplace=True)
dfs['pu_location_id'].fillna(-1, inplace=True)
dfs['do_location_id'].fillna(-1, inplace=True)

# Convert the columns to integer
dfs['vendor_id'] = dfs['vendor_id'].astype(int)
dfs['passenger_count'] = dfs['passenger_count'].astype(int)
dfs['rate_code_id'] = dfs['rate_code_id'].astype(int)
dfs['payment_type'] = dfs['payment_type'].astype(int)
dfs['pu_location_id'] = dfs['pu_location_id'].astype(int)
dfs['do_location_id'] = dfs['do_location_id'].astype(int)

##-------##---------##--------##--------##---------##---------##-----------##

# Defining RDS MySQL details and creating engine

# using json file to get the credentials of MySql
mysql_username = 'taxirpt' #MYSQL_USERNAME
mysql_password = 'taxiadmin!1' #MYSQL_PASSWORD

# Defining RDS Hostname, Port and Schema
mysql_host = 'taxidw.cabnnjliqhn5.us-east-1.rds.amazonaws.com'
mysql_port = '3306'
mysql_database = 'taxi_dw'  # Schema 


# Creating the db engine
engine = create_engine(f'mysql+pymysql://{mysql_username}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}')


In [6]:
##-------##---------##--------##--------##---------##---------##-----------##

# Creating Dimensional data tables

#Date Time Dim table
	#- dropoff_datetime_dim
	#- pickup_datetime_dim

# Creating new dataset fro datetime dimension. 
pickup_datetime_dim = dfs[['tpep_pickup_datetime']].drop_duplicates().copy().reset_index(drop=True)
dropoff_datetime_dim = dfs[['tpep_dropoff_datetime']].drop_duplicates().copy().reset_index(drop=True)

# tpep_pickup_datetime
pickup_datetime_dim['tpep_pickup_datetime'] = pickup_datetime_dim['tpep_pickup_datetime']
pickup_datetime_dim['pickup_hour'] = pickup_datetime_dim['tpep_pickup_datetime'].dt.hour
pickup_datetime_dim['pickup_day'] = pickup_datetime_dim['tpep_pickup_datetime'].dt.day_name()
pickup_datetime_dim['pickup_month'] = pickup_datetime_dim['tpep_pickup_datetime'].dt.month
pickup_datetime_dim['pickup_year'] = pickup_datetime_dim['tpep_pickup_datetime'].dt.year
pickup_datetime_dim['pickup_weekday'] = pickup_datetime_dim['tpep_pickup_datetime'].dt.weekday

# tpep_dropoff_datetime
dropoff_datetime_dim['tpep_dropoff_datetime'] = dropoff_datetime_dim['tpep_dropoff_datetime']
dropoff_datetime_dim['dropoff_hour'] = dropoff_datetime_dim['tpep_dropoff_datetime'].dt.hour
dropoff_datetime_dim['dropoff_day'] = dropoff_datetime_dim['tpep_dropoff_datetime'].dt.day_name()
dropoff_datetime_dim['dropoff_month'] = dropoff_datetime_dim['tpep_dropoff_datetime'].dt.month
dropoff_datetime_dim['dropoff_year'] = dropoff_datetime_dim['tpep_dropoff_datetime'].dt.year
dropoff_datetime_dim['dropoff_weekday'] = dropoff_datetime_dim['tpep_dropoff_datetime'].dt.weekday

pickup_datetime_dim['pu_datetime_id'] = pickup_datetime_dim.index + 1
dropoff_datetime_dim['do_datetime_id'] = dropoff_datetime_dim.index + 1

##-------##---------##--------##--------##---------##---------##-----------##

# defining a function that lets us determine what time of the day the ride was taken. 
# Creates 4 time zones ‘Morning’ (from 6:00 am to 11:59 pm), 
# ‘Afternoon’ (from 12 noon to 3:59 pm), ‘Evening’ (from 4:00 pm to 9:59 pm),
# ‘Late Night’ (from 10:00 pm to 5:59 am)

def time_of_day(x):
    if x in range(6,12):
        return 'Morning'
    elif x in range(12,16):
        return 'Afternoon'
    elif x in range(16,22):
        return 'Evening'
    else:
        return 'Late night'

##-------##---------##--------##--------##---------##---------##-----------##

# Now let us apply this function and create new columns in the dataset.

pickup_datetime_dim['pickup_timeofday']=pickup_datetime_dim['pickup_hour'].apply(time_of_day)
dropoff_datetime_dim['dropoff_timeofday']=dropoff_datetime_dim['dropoff_hour'].apply(time_of_day)

# datetime_dim = datetime_dim.rename(columns={'tpep_pickup_datetime': 'datetime_id'}).reset_index(drop=True)
pickup_datetime_dim = pickup_datetime_dim[['pu_datetime_id', 'tpep_pickup_datetime', \
                                           'pickup_hour', 'pickup_day', \
                                           'pickup_month', 'pickup_year', \
                                           'pickup_weekday', 'pickup_timeofday',]]
# datetime_dim = datetime_dim.rename(columns={'tpep_pickup_datetime': 'datetime_id'}).reset_index(drop=True)
dropoff_datetime_dim = dropoff_datetime_dim[['do_datetime_id','tpep_dropoff_datetime', \
                                             'dropoff_hour', 'dropoff_day', \
                                             'dropoff_month', 'dropoff_year', \
                                             'dropoff_weekday', 'dropoff_timeofday']]

# Coverting tpep_pickup_datetime to object dtype
pickup_datetime_dim['tpep_pickup_datetime'] = pickup_datetime_dim['tpep_pickup_datetime'].astype(str)
# Coverting tpep_pickup_datetime to object dtype
dropoff_datetime_dim['tpep_dropoff_datetime'] = dropoff_datetime_dim['tpep_dropoff_datetime'].astype(str)

# Loading pickup_datetime_dim data to RDS

# Select columns 'pickup_datetime_dim' from the DataFrame
pu_dt_dim = pickup_datetime_dim[['tpep_pickup_datetime', 'pickup_hour', 'pickup_day', \
'pickup_month', 'pickup_year', 'pickup_weekday', 'pickup_timeofday' ]]

# Loading data
pu_dt_dim.to_sql('pickup_datetime_dim', con=engine, if_exists='append', index=False)

# Loading dropoff_datetime_dim data to RDS 

# Select columns 'pickup_datetime_dim' from the DataFrame
do_dt_dim = dropoff_datetime_dim[['tpep_dropoff_datetime', 'dropoff_hour', 'dropoff_day', \
'dropoff_month', 'dropoff_year', 'dropoff_weekday', 'dropoff_timeofday' ]]

# Loading data
do_dt_dim.to_sql('dropoff_datetime_dim', con=engine, if_exists='append', index=False)

## Retriving Data from local rds, table: rate_code_dim

# specifying SQL query
sql_query1 = "SELECT * FROM pickup_datetime_dim"

# executing the query and assign the result to a pandas DataFrame
pudt_dim = pd.read_sql_query(sql_query1, engine)

# Conveting tpep_pickup_datetime to datetime format
pudt_dim['tpep_pickup_datetime'] = pd.to_datetime(pudt_dim['tpep_pickup_datetime'])

## Retriving Data from local rds, table: rate_code_dim

# specifying SQL query
sql_query2 = "SELECT * FROM dropoff_datetime_dim"

# executing the query and assign the result to a pandas DataFrame
dodt_dim = pd.read_sql_query(sql_query2, engine)

# Conveting tpep_dropoff_datetime to datetime format
dodt_dim['tpep_dropoff_datetime'] = pd.to_datetime(dodt_dim['tpep_dropoff_datetime'])

In [7]:
##-------##---------##--------##--------##---------##---------##-----------##

#rate_code_dim

## Creating rate_code_dim Dimension: 

# deriving from dfs

rate_code_type = {
    1: "Standard rate",
    2: "JFK",
    3: "Newark",
    4: "Nassau or Westchester",
    5: "Negotiated fare",
    6: "Group ride"
}

rate_code_dim = dfs[['rate_code_id']].drop_duplicates().copy().reset_index(drop=True)
rate_code_dim['rc_id'] = rate_code_dim.index + 1 # Add 1 to the index
rate_code_dim['rate_code_name'] = rate_code_dim['rate_code_id'].map(rate_code_type)
rate_code_dim = rate_code_dim[['rc_id', 'rate_code_id', 'rate_code_name']]

# Hard Coded
# define the rate code types
rate_code_type = {
    1: "Standard rate",
    2: "JFK",
    3: "Newark",
    4: "Nassau or Westchester",
    5: "Negotiated fare",
    6: "Group ride"
}

# create a DataFrame from the dictionary
rate_code_dim = pd.DataFrame(list(rate_code_type.items()), columns=['rate_code_id', 'rate_code_name'])

# add the rc_id column
rate_code_dim['rc_id'] = rate_code_dim.index + 1

# rearrange the columns
rate_code_dim = rate_code_dim[['rc_id', 'rate_code_id', 'rate_code_name']]

# Loading data to RDS

# Select columns 'rate_code_id' and 'rate_code_name' from the DataFrame
df_selected = rate_code_dim[['rate_code_id', 'rate_code_name']]

df_selected.to_sql('rate_code_dim', con=engine, if_exists='append', index=False)

## Retriving Data from local rds, table: rate_code_dim

# specifying SQL query
sql_query3 = "SELECT * FROM rate_code_dim"

# executing the query and assign the result to a pandas DataFrame
rc_dim = pd.read_sql_query(sql_query3, engine)

rc_dim['rate_code_id'] = rc_dim['rate_code_id'].astype(int)

In [8]:
##-------##---------##--------##--------##---------##---------##-----------##

##passenger_count_dim

passenger_count_dim = dfs[['passenger_count']].drop_duplicates().copy().reset_index(drop=True)

passenger_count_dim['passenger_count_id'] = passenger_count_dim.index + 1

passenger_count_dim = passenger_count_dim[['passenger_count_id','passenger_count']]

# Loading data to RDS,

# Select columns 'passenger_count' from the DataFrame
pass_count_dim = passenger_count_dim[['passenger_count']]

# Loading data
pass_count_dim.to_sql('passenger_count_dim', con=engine, if_exists='append', index=False)

## Retriving Data from local rds, table: rate_code_dim

# specifying SQL query
sql_query4 = "SELECT * FROM passenger_count_dim"

# executing the query and assign the result to a pandas DataFrame
pc_dim = pd.read_sql_query(sql_query4, engine)

pc_dim['passenger_count'] = pc_dim['passenger_count'].astype(int)

##-------##---------##--------##--------##---------##---------##-----------##

#vendor_dim

# driving from dfs dataframe

vendor_name = {
    1:"Creative Mobile Technologies, LLC",
    2:"Curb Mobility - formerly VeriFone Inc"
}
vendor_dim = dfs[['vendor_id']].drop_duplicates().copy().reset_index(drop=True)
vendor_dim['vc_id'] = vendor_dim.index + 1 # Add 1 to the index
vendor_dim['vendor_name'] = vendor_dim['vendor_id'].map(vendor_name)
vendor_dim = vendor_dim[['vc_id', 'vendor_id', 'vendor_name']]

# hardcoded 

# define the vendor names
vendor_name = {
    1: "Creative Mobile Technologies, LLC",
    2: "Curb Mobility - formerly VeriFone Inc",
}

# create a DataFrame from the dictionary
vendor_dim = pd.DataFrame(list(vendor_name.items()), columns=['vendor_id', 'vendor_name'])

# add the vc_id column
vendor_dim['vc_id'] = vendor_dim.index + 1

# rearrange the columns
vendor_dim = vendor_dim[['vc_id', 'vendor_id', 'vendor_name']]

# Loading data to RDS

# Select columns 'passenger_count' from the DataFrame
van_dim = vendor_dim[['vendor_id', 'vendor_name']]

# Loading data
van_dim.to_sql('vendor_dim', con=engine, if_exists='append', index=False)


## Retriving Data from rds, table: rate_code_dim

# specifying SQL query
sql_query5 = "SELECT * FROM vendor_dim"

# executing the query and assign the result to a pandas DataFrame
v_dim = pd.read_sql_query(sql_query5, engine)

v_dim['vendor_id'] = v_dim['vendor_id'].astype(int)

##-------##---------##--------##--------##---------##---------##-----------##

##payment_type_dim

# hardcoded

payment_type_name = {
    1: "Credit card",
    2: "Cash",
    3: "No charge",
    4: "Dispute",
    5: "Unknown",
    6: "Voided trip"
}

# Convert the dictionary into a list of tuples
payment_type_list = [(k, v) for k, v in payment_type_name.items()]

# Convert the list of tuples into a DataFrame
payment_type_dim = pd.DataFrame(payment_type_list, columns=['payment_type_id', 'payment_type_name'])

# Adding a new column 'payment_type'
payment_type_dim['payment_type'] = payment_type_dim['payment_type_id'].astype('category')

payment_type_dim = payment_type_dim[['payment_type_id', 'payment_type', 'payment_type_name']]

# Loading data to RDS

# Select columns 'passenger_count' from the DataFrame
payment_dim = payment_type_dim[['payment_type','payment_type_name']]

# Loading data
payment_dim.to_sql('payment_type_dim', con=engine, if_exists='append', index=False)

## Retriving Data from local rds, table: payment_dim

# specifying SQL query
sql_query6 = "SELECT * FROM payment_type_dim"

# executing the query and assign the result to a pandas DataFrame
pay_dim = pd.read_sql_query(sql_query6, engine)

pay_dim['payment_type'] = pay_dim['payment_type'].astype(int)

##-------##---------##--------##--------##---------##---------##-----------##

##store_and_fwd_flag_dim

# hardcode

store_and_fwd_flag_name = {
    1: "Y",
    2: "N"
}

# Convert the dictionary into a list of tuples
store_and_fwd_flag_list = [(k, v) for k, v in store_and_fwd_flag_name.items()]

# Convert the list of tuples into a DataFrame
store_and_fwd_flag_dim = pd.DataFrame(store_and_fwd_flag_list, columns=['store_and_fwd_flag_id', 'store_and_fwd_flag'])

# Adding a new column 'store_and_fwd_flag_name'
store_and_fwd_flag_dim['store_and_fwd_flag_name'] = store_and_fwd_flag_dim['store_and_fwd_flag'].map({'Y': 'store and forward trip', 'N': 'not a store and forward trip'})

# Loading data to RDS

# Select columns 'store_and_fwd_flag_dim' from the DataFrame
store_dim = store_and_fwd_flag_dim[['store_and_fwd_flag','store_and_fwd_flag_name']]

# Loading data
store_dim.to_sql('store_and_fwd_flag_dim', con=engine, if_exists='append', index=False)

## Retriving Data from local rds, table: payment_dim

# specifying SQL query
sql_query7 = "SELECT * FROM store_and_fwd_flag_dim"

# executing the query and assign the result to a pandas DataFrame
sflag_dim = pd.read_sql_query(sql_query7, engine)

In [9]:
##-------##---------##--------##--------##---------##---------##-----------##

##tazi_zone_dim


taxi_zone_dim = tzdfs.rename(columns={"OBJECTID": "tz_id",
                              "Shape_Leng": "shape_leng",
                              "the_geom": "lng_lat",
                               "Shape_Area": "shape_area",
                             "zone": "zone_name",
                              "LocationID": "taxi_zone_location_id",
                             "borough": "taxi_zone_borough_name"})

# Loading data to RDS

# Select columns 'taxi_zone_dim' from the DataFrame
taxiz_dim = taxi_zone_dim[['shape_leng','shape_area','zone_name','taxi_zone_location_id','taxi_zone_borough_name']]

# Loading data
taxiz_dim.to_sql('taxi_zone_dim', con=engine, if_exists='append', index=False)

## Retriving Data from local rds, table: payment_dim

# specifying SQL query
sql_query8 = "SELECT * FROM taxi_zone_dim"

# executing the query and assign the result to a pandas DataFrame
tz_dim = pd.read_sql_query(sql_query8, engine)

tz_dim['taxi_zone_location_id'] = tz_dim['taxi_zone_location_id'].astype(int)

##-------##---------##--------##--------##---------##---------##-----------##

In [10]:


## Fact-table mapping

# Fact table Merging 
ft = dfs.merge(rc_dim, on='rate_code_id', how='left') \
.merge(pc_dim, on='passenger_count', how='left')\
.merge(v_dim, on='vendor_id', how='left')\
.merge(pay_dim, on='payment_type', how='left')\
.merge(sflag_dim, on='store_and_fwd_flag', how='left')\
.merge(pudt_dim, on='tpep_pickup_datetime', how='left')\
.merge(dodt_dim, on='tpep_dropoff_datetime', how='left')


# Merging on 'pu_location_id' and 'taxi_zone_location_id'
ft = ft.merge(tz_dim, left_on='pu_location_id', right_on='taxi_zone_location_id', suffixes=('', '_pu'))

# Renaming 'taxi_zone_id' to 'pu_taxi_zone_loc_id'
ft = ft.rename(columns={'taxi_zone_id': 'pu_taxi_zone_loc_id'})

# Merging on 'do_location_id' and 'taxi_zone_location_id'
ft = ft.merge(tz_dim, left_on='do_location_id', right_on='taxi_zone_location_id', suffixes=('_pu', '_do'))

# Renaming 'taxi_zone_id' to 'do_taxi_zone_loc_id'
ft = ft.rename(columns={'taxi_zone_id': 'do_taxi_zone_loc_id'})

# Getting only required columns 
fts = ft[['rate_code_id','rc_id', 'passenger_count', 'passenger_count_id', 'vendor_id', \
  'vc_id', 'payment_type','payment_type_id', 'store_and_fwd_flag', 'store_and_fwd_flag_id',\
          'do_location_id','taxi_zone_location_id_do', 'pu_location_id','taxi_zone_location_id_pu',\
          'tpep_pickup_datetime','pu_datetime_id', 'tpep_dropoff_datetime','do_datetime_id']]

##-------##---------##--------##--------##---------##---------##-----------##

## Final fact table 

ft_final = ft[['vc_id','rc_id', 'passenger_count_id', 'payment_type_id','store_and_fwd_flag_id', 'pu_datetime_id', \
               'do_datetime_id','taxi_zone_location_id_do', 'taxi_zone_location_id_pu', \
               'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 
'improvement_surcharge', 'total_amount', 'congestion_surcharge']]


# Loading data Final fact table to RDS

# Select columns 'taxi_zone_dim' from the DataFrame
fft = ft_final[['vc_id', 
                'passenger_count_id', 
                'taxi_zone_location_id_pu',
                'taxi_zone_location_id_do',
                'payment_type_id',
                'rc_id', 
                'store_and_fwd_flag_id', 
                'pu_datetime_id', 
                'do_datetime_id',
                'fare_amount', 
                'extra', 
                'mta_tax', 
                'tip_amount', 
                'tolls_amount', 
                'improvement_surcharge', 
                'total_amount', 
                'congestion_surcharge']]

# Loading data
fft.to_sql('fact_table', con=engine, if_exists='append', index=False)

##-------##---------##--------##--------##---------##---------##-----------##

987609