In [10]:
import pandas as pd
import requests
from pathlib import Path

In [None]:
#
# "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"

In [13]:
def download_data_file(year: int, month: int)->Path:
    """
    Downloads a file from url and stores it locally
    """
    url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02d}.parquet'
    response = requests.get(url=url)
    
    if response.status_code == 200:
        DESTINATION_PATH = Path(f"../data/raw/trips_{year}_{month:02d}.parquet")
        DESTINATION_PATH.parent.mkdir(parents=True, exist_ok=True)

        raw_data = response.content
        with open(DESTINATION_PATH, 'wb') as f:
            f.write(raw_data)
        return DESTINATION_PATH

    else:
        raise Exception(f"{url} doesn't exist.")



In [14]:
months_to_download = range(1,12)
years_to_download = [2023]

for month in months_to_download:
    download_data_file(year=2023, month=month)


WindowsPath('../data/raw/trips_2023_01.parquet')

In [17]:
# Load a parquet file into a pandas df
rides = pd.read_parquet('../data/raw/rides_2023_01.parquet')
rides.head(5)


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.1,1.0,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.9,1.0,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0


In [20]:
# Subset columns for time pickup and location and rename
keep_columns = ["tpep_pickup_datetime", "PULocationID"]
rides = rides[keep_columns]
rides.rename(columns = {
    "tpep_pickup_datetime":"pickup_datetime", 
    "PULocationID":"location_id"}, inplace=True)
rides.head(3)

Unnamed: 0,pickup_datetime,location_id
0,2023-01-01 00:32:10,161
1,2023-01-01 00:55:08,43
2,2023-01-01 00:25:04,48


In [None]:
rides.dtypes

In [22]:
# Validate the data
rides.describe()

Unnamed: 0,pickup_datetime,location_id
count,3066766,3066766.0
mean,2023-01-17 00:22:26.288164,166.398
min,2008-12-31 23:01:42,1.0
25%,2023-01-09 16:21:57.250000,132.0
50%,2023-01-17 08:42:29.500000,162.0
75%,2023-01-24 16:26:27,234.0
max,2023-02-01 00:56:53,265.0
std,,64.24413


In [26]:
rides = rides[rides.pickup_datetime >= "2023-01-01"]
rides = rides[rides.pickup_datetime < "2023-02-01"]
rides.pickup_datetime.describe()

count                       3066718
mean     2023-01-17 00:27:48.391113
min             2023-01-01 00:00:00
25%      2023-01-09 16:22:12.250000
50%      2023-01-17 08:42:40.500000
75%             2023-01-24 16:26:28
max             2023-01-31 23:59:59
Name: pickup_datetime, dtype: object

In [28]:
year = 2023
month = 1
VALIDATED_PATH = Path(f"../data/validated/val_rides_{year}_{month:02d}.parquet")
VALIDATED_PATH.parent.mkdir(parents=True, exist_ok=True)
rides.to_parquet(VALIDATED_PATH)

In [32]:
def validate_raw_data(data:pd.DataFrame, year:int, month:int)->pd.DataFrame:
    first_day_month = f"{year}-{month:02d}-01"
    next_month_start=f"{year}-{1+month:02d}-01" if month <12 else f"{year+1}-01-01"
    data = data[data.pickup_datetime >= first_day_month]
    data = data[data.pickup_datetime < next_month_start]
    return data

In [35]:
validate_raw_data(rides, year=2023, month=1).describe()

Unnamed: 0,pickup_datetime,location_id
count,3066718,3066718.0
mean,2023-01-17 00:27:48.391113,166.3983
min,2023-01-01 00:00:00,1.0
25%,2023-01-09 16:22:12.250000,132.0
50%,2023-01-17 08:42:40.500000,162.0
75%,2023-01-24 16:26:28,234.0
max,2023-01-31 23:59:59,265.0
std,,64.24394
