In [18]:
%load_ext autoreload
%autoreload 2
import os
from datetime import datetime, timedelta
import taxi_demand_predictor.config as cfg
import logging
import pandas as pd
from typing import List
from taxi_demand_predictor.paths import RAW_DATA_DIR, BRONZE_DATA_DIR, SILVER_DATA_DIR, GOLD_DATA_DIR, PARENT_DIR
from taxi_demand_predictor.data import save_data
from taxi_demand_predictor.pipeline import retrieve_data_in_range, validate_data, transform_to_ts_data
import taxi_demand_predictor.config as cfg
import hopsworks


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [36]:
# Fetch data from the API for the last 28 days
current_date = pd.to_datetime(datetime.utcnow()).floor('h') - 1 * timedelta(hours=1)
end = cfg.END_DATE
start = cfg.START_DATE

# Define the base URL for the API
base_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/"

start, end

(Timestamp('2024-03-29 02:00:00'), Timestamp('2024-05-24 02:00:00'))

In [20]:
STREAM_DATA_DIR_RAW = RAW_DATA_DIR / "2023-2024"
STREAM_DATA_DIR_BRONZE = BRONZE_DATA_DIR / "2023-2024"
STREAM_DATA_DIR_SILVER = SILVER_DATA_DIR / "2023-2024"
STREAM_DATA_DIR_GOLD = GOLD_DATA_DIR / "2023-2024"

In [34]:
# Create the directories if they don't exist
base_path = "/Users/borja/Documents/Somniumrema/projects/ml/taxi_demand_predictor/data/raw"
# 
df = retrieve_data_in_range(start, end, RAW_DATA_DIR)
df.head()

Start Date: 2024-03-29 02:00:00
End Date: 2024-05-24 02:00:00


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,1,2024-04-01 00:02:40,2024-04-01 00:30:42,0.0,5.2,1.0,N,161,7,1,29.6,3.5,0.5,8.65,0.0,1.0,43.25,2.5,0.0
1,2,2024-04-01 00:41:12,2024-04-01 00:55:29,1.0,5.6,1.0,N,264,264,1,25.4,1.0,0.5,10.0,0.0,1.0,37.9,0.0,0.0
2,2,2024-04-01 00:48:42,2024-04-01 01:05:30,1.0,3.55,1.0,N,186,236,1,20.5,1.0,0.5,5.1,0.0,1.0,30.6,2.5,0.0
3,2,2024-04-01 00:56:02,2024-04-01 01:05:09,1.0,1.06,1.0,N,137,164,2,10.0,1.0,0.5,0.0,0.0,1.0,15.0,2.5,0.0
4,1,2024-04-01 00:08:32,2024-04-01 00:10:24,1.0,0.7,1.0,N,236,263,1,5.1,3.5,0.5,2.0,0.0,1.0,12.1,2.5,0.0


In [22]:
save_data(df, STREAM_DATA_DIR_RAW, f'yellow_tripdata_{start.year}-{start.month:02d}-{start.day:02d}_to_{end.year}-{end.month:02d}-{end.day:02d}.parquet')

Saving data: 100%|[32m██████████[0m| 7238122/7238122 [00:01<00:00, 3881547.10rows/s]

Data saved to "/Users/borja/Documents/Somniumrema/projects/ml/taxi_demand_predictor/data/raw/2023-2024/yellow_tripdata_2024-03-29_to_2024-05-24.parquet"





In [23]:
validated = validate_data(STREAM_DATA_DIR_RAW, start, end )

save_data(validated, STREAM_DATA_DIR_BRONZE, f'validated_yellow_tripdata_{start.year}-{start.month:02d}-{start.day:02d}_to_{end.year}-{end.month:02d}-{end.day:02d}.parquet')

Saving data: 100%|[32m██████████[0m| 6482804/6482804 [00:00<00:00, 12382255.33rows/s]

Data saved to "/Users/borja/Documents/Somniumrema/projects/ml/taxi_demand_predictor/data/bronze/2023-2024/validated_yellow_tripdata_2024-03-29_to_2024-05-24.parquet"





In [24]:
validated = pd.read_parquet(f'{STREAM_DATA_DIR_BRONZE}/validated_yellow_tripdata_{start.year}-{start.month:02d}-{start.day:02d}_to_{end.year}-{end.month:02d}-{end.day:02d}.parquet')

# Check the range of the data
validated.pickup_datetime.min(), validated.pickup_datetime.max()

(Timestamp('2024-03-29 02:00:02'), Timestamp('2024-05-24 01:59:54'))

In [25]:
# Transform the data
transformed = transform_to_ts_data(validated, freq = 'h')

# Save the transformed data
save_data(transformed, STREAM_DATA_DIR_SILVER, f'ts_data_{start.year}-{start.month:02d}-{start.day:02d}_to_{end.year}-{end.month:02d}-{end.day:02d}.parquet')

Saving data: 100%|[32m██████████[0m| 350523/350523 [00:00<00:00, 20444716.68rows/s]

Data saved to "/Users/borja/Documents/Somniumrema/projects/ml/taxi_demand_predictor/data/silver/2023-2024/ts_data_2024-03-29_to_2024-05-24.parquet"





In [26]:
INPUT_SILVER_DIR =  STREAM_DATA_DIR_SILVER / f'ts_data_{start.year}-{start.month:02d}-{start.day:02d}_to_{end.year}-{end.month:02d}-{end.day:02d}.parquet'

ts_data = pd.read_parquet(INPUT_SILVER_DIR)

# Assuming ts_data is your DataFrame
ts_data = ts_data.assign(
    pickup_time=pd.to_datetime(ts_data['pickup_time']),
    pickup_ts=lambda df: df['pickup_time'].astype(int) // 10**6,
    pickup_location_id=lambda df: df['pickup_location_id'].astype('int64')
)

In [27]:
ts_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 350523 entries, 0 to 350522
Data columns (total 4 columns):
 #   Column              Non-Null Count   Dtype         
---  ------              --------------   -----         
 0   pickup_time         350523 non-null  datetime64[ns]
 1   pickup_location_id  350523 non-null  int64         
 2   ride_count          350523 non-null  float64       
 3   pickup_ts           350523 non-null  int64         
dtypes: datetime64[ns](1), float64(1), int64(2)
memory usage: 10.7 MB


In [28]:
# Connect to the project
project = hopsworks.login(
    project=cfg.HOPSWORKS_PROJECT_NAME,
    api_key_value=cfg.HOPSWORKS_API_KEY
)

# Connect to the feature store
feature_store = project.get_feature_store()

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/933012
Connected. Call `.close()` to terminate connection gracefully.


In [32]:
# Connect to the feature group
feature_group = feature_store.get_or_create_feature_group(
    **cfg.FEATURE_GROUP_METADATA
)

In [33]:
# Insert features into the feature group
feature_group.insert(ts_data, write_options={"wait_for_job": False},
                        overwrite=True)

Uploading Dataframe: 0.00% |          | Rows 0/350523 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: group_features_hourly_2023_03_29_to_2024_05_24_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/933012/jobs/named/group_features_hourly_2023_03_29_to_2024_05_24_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x3571f4b50>, None)