In [135]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [136]:
import logging
import os
import sys
from datetime import datetime, timedelta, timezone
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
import hopsworks
import pandas as pd

import config.config as config
from src.data_fetching_and_processing.fetch_batch_raw_data import fetch_batch_raw_data
from src.data_fetching_and_processing.transform_raw_to_timeseries_data import transform_raw_to_timeseries_data

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",  # Log format
    handlers=[
        logging.StreamHandler(sys.stdout),
    ],
)
logger = logging.getLogger(__name__)

**Step 1: Get the current date and time (timezone-aware)**

In [137]:
import pytz

utc = pytz.utc
est = pytz.timezone('America/New_York')

current_date = pd.to_datetime(datetime.now(timezone.utc)).ceil("h")
current_date_ny = current_date.astimezone(est)
logger.info(f"Current date and time (UTC): {current_date}")
logger.info(f"Current date and time (America/NewYork): {current_date_ny}")

2025-03-01 00:26:42,385 INFO: Current date and time (UTC): 2025-03-01 06:00:00+00:00
2025-03-01 00:26:42,385 INFO: Current date and time (America/NewYork): 2025-03-01 01:00:00-05:00


**Step 2: Define the data fetching range**

In [138]:
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=28)
logger.info(f"Fetching data from {fetch_data_from} to {fetch_data_to}")

2025-03-01 00:26:44,408 INFO: Fetching data from 2025-02-01 06:00:00+00:00 to 2025-03-01 06:00:00+00:00


**Step 3: Fetch raw data**

In [139]:
logger.info("Fetching raw data...")
rides = fetch_batch_raw_data(fetch_data_from, fetch_data_to)
logger.info(f"Raw data fetched. Number of records: {len(rides)}")

2025-03-01 00:26:46,158 INFO: Fetching raw data...
File already exists for 2023-02.
Loading data for 2023-02...
Total records: 2,913,955
Valid records: 2,845,058
Records dropped: 68,897 (2.36%)
Successfully processed data for 2023-02.
Successfully saved as Parquet: ../data/raw/rides_zones.parquet
Combining all monthly data...
Data loading and processing complete!
File already exists for 2024-03.
Loading data for 2024-03...
Total records: 3,582,628
Valid records: 3,518,066
Records dropped: 64,562 (1.80%)
Successfully processed data for 2024-03.
Successfully saved as Parquet: ../data/raw/rides_zones.parquet
Combining all monthly data...
Data loading and processing complete!
2025-03-01 00:26:49,873 INFO: Raw data fetched. Number of records: 2637884


In [140]:
rides.head()

Unnamed: 0,pickup_datetime,pickup_location_id,zone
185605,2024-02-05 11:45:03,2,"Jamaica Bay, Queens"
874172,2024-02-12 07:04:57,2,"Jamaica Bay, Queens"
174666,2024-02-05 09:33:19,3,"Allerton/Pelham Gardens, Bronx"
189355,2024-02-05 12:36:31,3,"Allerton/Pelham Gardens, Bronx"
298663,2024-02-06 15:47:16,3,"Allerton/Pelham Gardens, Bronx"


**Step 4: Transform raw data into time-series data**

In [141]:
logger.info("Transforming raw data into time-series data...")
ts_data = transform_raw_to_timeseries_data(rides)
logger.info(
    f"Transformation complete. Number of records in time-series data: {len(ts_data)}"
)
ts_data.head(20)

2025-03-01 00:26:57,369 INFO: Transforming raw data into time-series data...
2025-03-01 00:30:13,418 INFO: Transformation complete. Number of records in time-series data: 2389632


Unnamed: 0,pickup_hour,pickup_location_id,zone,rides
0,2024-02-03 06:00:00,2,"Jamaica Bay, Queens",0
1,2024-02-03 07:00:00,2,"Jamaica Bay, Queens",0
2,2024-02-03 08:00:00,2,"Jamaica Bay, Queens",0
3,2024-02-03 09:00:00,2,"Jamaica Bay, Queens",0
4,2024-02-03 10:00:00,2,"Jamaica Bay, Queens",0
5,2024-02-03 11:00:00,2,"Jamaica Bay, Queens",0
6,2024-02-03 12:00:00,2,"Jamaica Bay, Queens",0
7,2024-02-03 13:00:00,2,"Jamaica Bay, Queens",0
8,2024-02-03 14:00:00,2,"Jamaica Bay, Queens",0
9,2024-02-03 15:00:00,2,"Jamaica Bay, Queens",0


**Hopsworks Connection**

In [142]:
# Step 5: Connect to the Hopsworks project
logger.info("Connecting to Hopsworks project...")
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME, api_key_value=config.HOPSWORKS_API_KEY
)
logger.info("Connected to Hopsworks project.")

# Step 6: Connect to the feature store
logger.info("Connecting to the feature store...")
feature_store = project.get_feature_store()
logger.info("Connected to the feature store.")

# Step 7: Connect to or create the feature group
logger.info(
    f"Connecting to the feature group: {config.FEATURE_GROUP_NAME} (version {config.FEATURE_GROUP_VERSION})..."
)
feature_group = feature_store.get_or_create_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
    description="Time series data at hourly frequency v2",
    primary_key=['pickup_location_id','pickup_hour','zone'],
    event_time = ['pickup_hour']
)
logger.info("Feature group ready.")

# Step 8: Insert data into the feature group
logger.info("Inserting data into the feature group...")
feature_group.insert(ts_data, write_options={"wait_for_job": False})
logger.info("Data insertion completed.")

2025-03-01 00:30:39,176 INFO: Connecting to Hopsworks project...
2025-03-01 00:30:39,177 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-03-01 00:30:39,241 INFO: Initializing external client
2025-03-01 00:30:39,242 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-01 00:30:39,984 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1214648
2025-03-01 00:30:40,664 INFO: Connected to Hopsworks project.
2025-03-01 00:30:40,665 INFO: Connecting to the feature store...
2025-03-01 00:30:40,789 INFO: Connected to the feature store.
2025-03-01 00:30:40,791 INFO: Connecting to the feature group: time_series_hourly_feature_group_v2 (version 1)...
2025-03-01 00:30:41,047 INFO: Feature group ready.
2025-03-01 00:30:41,048 INFO: Inserting data into the feature group...


Uploading Dataframe: 100.00% |███████████████████████████████| Rows 2389632/2389632 | Elapsed Time: 02:10 | Remaining Time: 00:00


Launching job: time_series_hourly_feature_group_v2_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1214648/jobs/named/time_series_hourly_feature_group_v2_1_offline_fg_materialization/executions
2025-03-01 00:32:59,478 INFO: Data insertion completed.
