# Data from 22 to end of 23 now backfilled in feature store

In [65]:
from dotenv import load_dotenv
import os
from src.paths import PARENT_DIR

load_dotenv(PARENT_DIR / ".env")

HOPSWORKS_API_KEY = os.environ["HOPSWORKS_API_KEY"]
HOPSWORKS_PROJECT_NAME = "NYC_taxi_demand_project"

In [66]:
from datetime import datetime# Valid: Specify year, month, and day
dt = datetime.now().year 
#dt = dt.year # 2024
print(dt)

2025


In [46]:
# Training data 

# All training data included from and to these dates
# 2022-01-29 00:00:00
# 2022-12-31 00:00:00

In [47]:
from datetime import datetime
import pandas as pd
from src.data import load_raw_data

from_year = 2022
to_year = dt # set to 2024 above takes all of 2024 available data
print(f'Downloading raw data from {from_year} to {to_year}')

rides = pd.DataFrame()
for year in range(from_year, to_year+1):
    
    # download data for the whole year
    rides_one_year = load_raw_data(year)
    
    # append rows
    rides = pd.concat([rides, rides_one_year])

Downloading raw data from 2022 to 2024
File 2022-01 was already in local storage
File 2022-02 was already in local storage
File 2022-03 was already in local storage
File 2022-04 was already in local storage
File 2022-05 was already in local storage
File 2022-06 was already in local storage
File 2022-07 was already in local storage
File 2022-08 was already in local storage
File 2022-09 was already in local storage
File 2022-10 was already in local storage
File 2022-11 was already in local storage
File 2022-12 was already in local storage
File 2023-01 was already in local storage
File 2023-02 was already in local storage
File 2023-03 was already in local storage
File 2023-04 was already in local storage
File 2023-05 was already in local storage
File 2023-06 was already in local storage
File 2023-07 was already in local storage
File 2023-08 was already in local storage
File 2023-09 was already in local storage
File 2023-10 was already in local storage
File 2023-11 was already in local sto

In [48]:
print(f'{len(rides)=:,}')

len(rides)=115,465,093


## Data is from 2022 - 2023

# Might be worth simulating like 18 months because in feature pipeline 12 you simulate 52 weeks back

In [49]:
rides.tail(5)

Unnamed: 0,pickup_datetime,pickup_location_id
3646364,2024-11-30 23:11:15,162
3646365,2024-11-30 23:49:30,132
3646366,2024-11-30 23:31:46,100
3646367,2024-11-30 23:41:21,42
3646368,2024-11-30 23:21:52,116


In [50]:
print(rides['pickup_datetime'].dtype)

datetime64[us]


In [51]:
rides.head()

Unnamed: 0,pickup_datetime,pickup_location_id
0,2022-01-01 00:35:40,142
1,2022-01-01 00:33:43,236
2,2022-01-01 00:53:21,166
3,2022-01-01 00:25:21,114
4,2022-01-01 00:36:48,68


In [9]:
# # After kernel restart, load it back:
# import pandas as pd
# rides_2023_to_2024 = pd.read_pickle('rides_2023_to_2024.pkl')
# rides_2023_to_2024

In [10]:
#rides_2022_to_2023 = rides[(rides['pickup_datetime'] >= '2022-01-01') & (rides['pickup_datetime'] <= '2023-12-31')]
#rides_2022_to_2023

In [52]:
print(f'{len(rides)=:,}')

len(rides)=115,465,093


In [53]:
# 1. Check the earliest and latest dates
print("Earliest date:", rides['pickup_datetime'].min())
print("Latest date:", rides['pickup_datetime'].max())

# 2. Check how many rows you have
print("Number of rows:", len(rides))

# 3. Look at the first few and last few rows
print("\nFirst few rows:")
print(rides.head())
print("\nLast few rows:")
print(rides.tail())

# 4. Get a quick summary of the date range
print("\nDate range summary:")
print(rides['pickup_datetime'].describe())

Earliest date: 2022-01-01 00:00:08
Latest date: 2024-11-30 23:59:59
Number of rows: 115465093

First few rows:
      pickup_datetime  pickup_location_id
0 2022-01-01 00:35:40                 142
1 2022-01-01 00:33:43                 236
2 2022-01-01 00:53:21                 166
3 2022-01-01 00:25:21                 114
4 2022-01-01 00:36:48                  68

Last few rows:
            pickup_datetime  pickup_location_id
3646364 2024-11-30 23:11:15                 162
3646365 2024-11-30 23:49:30                 132
3646366 2024-11-30 23:31:46                 100
3646367 2024-11-30 23:41:21                  42
3646368 2024-11-30 23:21:52                 116

Date range summary:
count                     115465093
mean     2023-06-23 05:59:07.530483
min             2022-01-01 00:00:08
25%             2022-09-27 10:54:44
50%             2023-06-17 15:02:36
75%             2024-03-23 20:02:31
max             2024-11-30 23:59:59
Name: pickup_datetime, dtype: object


In [54]:
from src.data import transform_raw_data_into_ts_data

ts_data = transform_raw_data_into_ts_data(rides)

100%|██████████| 263/263 [00:04<00:00, 53.05it/s]


In [55]:
# string to datetime
ts_data['pickup_hour'] = pd.to_datetime(ts_data['pickup_hour'], utc=True)

# add column with Unix epoch milliseconds
ts_data['pickup_ts'] = ts_data['pickup_hour'].astype(int) // 10**6

ts_data

Unnamed: 0,pickup_hour,rides,pickup_location_id,pickup_ts
0,2022-01-01 00:00:00+00:00,11,4,1640995200000
1,2022-01-01 01:00:00+00:00,15,4,1640998800000
2,2022-01-01 02:00:00+00:00,26,4,1641002400000
3,2022-01-01 03:00:00+00:00,8,4,1641006000000
4,2022-01-01 04:00:00+00:00,9,4,1641009600000
...,...,...,...,...
6722275,2024-11-30 19:00:00+00:00,0,110,1732993200000
6722276,2024-11-30 20:00:00+00:00,0,110,1732996800000
6722277,2024-11-30 21:00:00+00:00,0,110,1733000400000
6722278,2024-11-30 22:00:00+00:00,0,110,1733004000000


In [56]:
print(ts_data['pickup_hour'].min())
ts_data['pickup_hour'].max()

2022-01-01 00:00:00+00:00


Timestamp('2024-11-30 23:00:00+0000', tz='UTC')

In [57]:
import hopsworks

In [16]:
# hopsworks.logout()

# # Clear any existing connection
# project = None
# feature_store = None
# feature_group = None

In [58]:
project = hopsworks.login(
    project=HOPSWORKS_PROJECT_NAME,
    api_key_value=HOPSWORKS_API_KEY
)

2025-01-27 21:56:05,693 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-01-27 21:56:05,720 INFO: Initializing external client
2025-01-27 21:56:05,720 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-01-27 21:56:06,795 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1192104


In [59]:
feature_store = project.get_feature_store()

In [60]:
FEATURE_GROUP_NAME = 'time_series_hourly_feature_group'
FEATURE_GROUP_VERSION = 1

In [61]:
feature_group = feature_store.get_or_create_feature_group(
    name=FEATURE_GROUP_NAME,
    version=FEATURE_GROUP_VERSION,
    description="Time-series data at hourly frequency",
    primary_key = ['pickup_location_id', 'pickup_ts'],
    event_time='pickup_ts',
)#

In [63]:
print(f"Total rows in ts_data: {len(ts_data)}")

Total rows in ts_data: 6722280


In [64]:
feature_group.insert(ts_data, write_options={"wait_for_job": False})

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1192104/fs/1181777/fg/1398262


Uploading Dataframe: 75.56% |███████▌  | Rows 5079274/6722280 | Elapsed Time: 14:11 | Remaining Time: 05:10%4|1738015835.304|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.208:9093/bootstrap]: ssl://51.161.81.208:9093/2: Disconnected (after 855399ms in state UP)
Uploading Dataframe: 100.00% |██████████| Rows 6722280/6722280 | Elapsed Time: 18:43 | Remaining Time: 00:00


Launching job: time_series_hourly_feature_group_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1192104/jobs/named/time_series_hourly_feature_group_1_offline_fg_materialization/executions


(Job('time_series_hourly_feature_group_1_offline_fg_materialization', 'SPARK'),
 None)