## Notebook Configuration

In [1]:
# IMPORTS & CONFIG

# utils
import requests
import sys
import os
import configparser
from watermark import watermark
from loguru import logger
from pathlib import Path

# data science
import pandas as pd
import numpy as np

# config
logger_format = (
    "<green>{time:MMM-D HH:mm:ss.SSS}</green> | "
    "<level>{level: <8}</level> | "
    "<level>{message}</level>"
)
logger.configure(extra={"ip": "", "user": ""})  # Default values
logger.remove()
logger.add(sys.stderr, format=logger_format)
pass

In [2]:
# FUNCTIONS

def download_one_file_of_raw_data_from_nyc_gov_website(dir_path:str, year:int, month:int) -> str:
    """"""
    url_string = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02d}.parquet'
    response = requests.get(url_string)

    if response.status_code==200:
        logger.info(f"Writing rides_{year}-{month:02d}.parquet")
        path = f'{dir_path}/rides_{year}-{month:02d}.parquet'
        open(path, 'wb').write(response.content)
        return path
    
    else:
        raise Exception(f'{url_string} is not available')

In [3]:
# WATERMARK
wmrk_dict = {
    'author':'Gustavo Morales',
    'current_date':True,
    'updated':True,
    'python':True,
    'packages':'pandas,numpy',
}

print(watermark(**wmrk_dict))

Author: Gustavo Morales

Last updated: 2023-08-21

Python implementation: CPython
Python version       : 3.11.4
IPython version      : 8.14.0

pandas: 2.0.3
numpy : 1.25.0



In [4]:
# hardcoded notebook parameters
config = configparser.ConfigParser()
config.read('config.cfg')

years = [2022]
months = list(np.arange(1,13))
raw_data_dir_path = config['PATHS']['raw_data_dir_path']
validated_raw_data_file_path = config['PATHS']['validated_raw_data_file_path']

## 1. Fetching the data

In [5]:
# fetch and write parquet data files; one per year-month
total_expected_files = len(months)*len(years)

# check if files exist already, otherwise download them to disk
number_of_files = len([entry for entry in os.listdir(raw_data_dir_path) if os.path.isfile(os.path.join(raw_data_dir_path, entry))])
if number_of_files==total_expected_files:
    logger.success(f"{number_of_files} parquet files already present, fetch skipped.")
    os.system(f"cd {raw_data_dir_path}; ls -h")
else:
    logger.info(f"Fetching files...")

    for y in years:
        for m in months:
            download_one_file_of_raw_data_from_nyc_gov_website(raw_data_dir_path,y,m)

    logger.success("✓")

[32mAug-21 01:12:45.554[0m | [32m[1mSUCCESS [0m | [32m[1m12 parquet files already present, fetch skipped.[0m


rides_2022-01.parquet
rides_2022-02.parquet
rides_2022-03.parquet
rides_2022-04.parquet
rides_2022-05.parquet
rides_2022-06.parquet
rides_2022-07.parquet
rides_2022-08.parquet
rides_2022-09.parquet
rides_2022-10.parquet
rides_2022-11.parquet
rides_2022-12.parquet


In [6]:
# concatenate parquet tables into a pandas-df
# this could be done faster; check https://jpweytjens.be/read-multiple-files-with-pandas-fast/
raw_data_dir = Path(raw_data_dir_path)
full_df = pd.concat(
    pd.read_parquet(parquet_file)
    for parquet_file in raw_data_dir.glob('*.parquet')
)

In [7]:
# check full df
full_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 39656098 entries, 0 to 3183766
Data columns (total 19 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int64         
 1   tpep_pickup_datetime   datetime64[ns]
 2   tpep_dropoff_datetime  datetime64[ns]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   RatecodeID             float64       
 6   store_and_fwd_flag     object        
 7   PULocationID           int64         
 8   DOLocationID           int64         
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount           float64       
 17  congestion_surcharge   float64       
 18  airport_fee            flo

In [8]:
# slice relevant data and check
rides_df = full_df.copy()[['tpep_pickup_datetime', 'PULocationID']]

rides_df.rename(columns={
    'tpep_pickup_datetime': 'pickup_datetime',
    'PULocationID': 'pickup_location_id',
}, inplace=True)


rides_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 39656098 entries, 0 to 3183766
Data columns (total 2 columns):
 #   Column              Dtype         
---  ------              -----         
 0   pickup_datetime     datetime64[ns]
 1   pickup_location_id  int64         
dtypes: datetime64[ns](1), int64(1)
memory usage: 907.7 MB


## 2. Data validation

In [9]:
# check if data spans the correct time window
rides_df['pickup_datetime'].describe()

count                         39656098
mean     2022-07-05 04:04:16.211664640
min                2001-01-01 00:03:14
25%                2022-04-07 21:59:21
50%         2022-07-01 06:36:25.500000
75%                2022-10-04 20:17:12
max                2023-04-18 14:30:05
Name: pickup_datetime, dtype: object

In [10]:
# fix the time window
rides_df = rides_df[rides_df.pickup_datetime >= '2022-01-01']
rides_df = rides_df[rides_df.pickup_datetime < '2023-01-01']
rides_df['pickup_datetime'].describe()

count                         39655537
mean     2022-07-05 06:17:00.079098624
min                2022-01-01 00:00:08
25%                2022-04-07 22:03:08
50%                2022-07-01 06:45:00
75%                2022-10-04 20:18:32
max                2022-12-31 23:59:59
Name: pickup_datetime, dtype: object

In [11]:
# write the transformed table to a parquet file for further processing
rides_df.to_parquet(validated_raw_data_file_path)

In [12]:
logger.success("Notebook finished.")

[32mAug-21 01:13:09.186[0m | [32m[1mSUCCESS [0m | [32m[1mNotebook finished.[0m


<style>
tt {
    font-family:
    "MonoLisa",
    "Courier Prime",
    "Courier New",
    "Inconsolata",
    "Lucida Console",
    "Menlo",
    "Monaco", 
    monospace;
    color: "black";
    font: 10px;
}

alert {
    display: inline-block;
}
    
.alert-block{
    margin: 20px;
    width: 92%;
    padding: 15px;
    border-radius: 4px;
    font: 16px Tahoma, Alegreya, Garamond, Times;
    font-weight: 400;
}

.alert-info {
    color: #0399A9;
    background: #B2EBF3;
    border: 2px solid #4FD1E1;
}

.alert-warning {
    color: #F67C00;
    background: #FFE0B2;
    border: 2px solid #FFC165;
}

.alert-danger {
    color: #C4453E;
    background: #F7CFD3;
    border: 2px solid #E29C9B;
}

.alert-success {
    color: #388E3C;
    background: #C8E6C9;
    border: 2px solid #81C784;
}
</style>

<div class="alert alert-block alert-success"> </div>