In [1]:
import sys
import os

# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
import src.config as config

In [2]:
from datetime import datetime, timedelta

import pandas as pd

current_date = pd.to_datetime(datetime.utcnow())
print(f"{current_date}")

2025-03-02 23:54:20.057454


In [3]:
type(current_date)

pandas._libs.tslibs.timestamps.Timestamp

In [4]:
from datetime import datetime, timedelta, timezone
current_date = pd.to_datetime(datetime.now(timezone.utc)).floor("h")

In [5]:
current_date.to_datetime64()

numpy.datetime64('2025-03-02T23:00:00.000000000')

In [6]:
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=29)

In [7]:
fetch_data_to

Timestamp('2025-03-02 23:00:00+0000', tz='UTC')

In [8]:
from datetime import datetime, timedelta
import pandas as pd
from src.data_utils import load_and_process_taxi_data

def fetch_batch_raw_data(from_date, to_date):
    """
    Fetch historical taxi ride data and shift it forward by 52 weeks.

    Args:
        from_date (datetime or str): The start date for the data batch.
        to_date (datetime or str): The end date for the data batch.

    Returns:
        pd.DataFrame: DataFrame containing the simulated production data.
    """
    # Convert string inputs to datetime if necessary
    if isinstance(from_date, str):
        from_date = datetime.fromisoformat(from_date)
    if isinstance(to_date, str):
        to_date = datetime.fromisoformat(to_date)

    # Ensure both are timezone-aware
    from_date = from_date.replace(tzinfo=None)  # Remove timezone info
    to_date = to_date.replace(tzinfo=None)

    # Shift dates back by 52 weeks (1 year)
    historical_from_date = from_date - timedelta(weeks=52)
    historical_to_date = to_date - timedelta(weeks=52)

    print("Historical From Date:", historical_from_date)
    print("Historical To Date:", historical_to_date)

    # Load and filter data for the historical period
    rides_from = load_and_process_taxi_data(year=historical_from_date.year, months=[historical_from_date.month])

    # Ensure pickup_datetime column exists
    if "pickup_datetime" not in rides_from.columns:
        raise ValueError("Error: 'pickup_datetime' column missing from dataset.")

    # Convert pickup_datetime to timezone-naive format
    rides_from["pickup_datetime"] = pd.to_datetime(rides_from["pickup_datetime"]).dt.tz_localize(None)

    # Ensure both timestamps are in the same format (no timezone info)
    historical_from_date = historical_from_date.replace(tzinfo=None)

    # Filter data
    rides_from = rides_from[rides_from["pickup_datetime"] >= historical_from_date]

    if historical_to_date.month != historical_from_date.month:
        rides_to = load_and_process_taxi_data(year=historical_to_date.year, months=[historical_to_date.month])
        rides_to["pickup_datetime"] = pd.to_datetime(rides_to["pickup_datetime"]).dt.tz_localize(None)
        rides_to = rides_to[rides_to["pickup_datetime"] < historical_to_date]
        rides = pd.concat([rides_from, rides_to], ignore_index=True)
    else:
        rides = rides_from

    # Shift the data forward by 52 weeks to simulate recent data
    rides["pickup_datetime"] += timedelta(weeks=52)

    # Sort the data for consistency
    rides.sort_values(by=["pickup_location_id", "pickup_datetime"], inplace=True)

    return rides

In [9]:
rides = fetch_batch_raw_data(fetch_data_from, fetch_data_to)

Historical From Date: 2024-02-03 23:00:00
Historical To Date: 2024-03-03 23:00:00
File already exists for 2024-02.
Loading data for 2024-02...
Total records: 3,007,526
Valid records: 2,954,709
Records dropped: 52,817 (1.76%)
Successfully processed data for 2024-02.
Combining all monthly data...
Data loading and processing complete!
File already exists for 2024-03.
Loading data for 2024-03...
Total records: 3,582,628
Valid records: 3,518,066
Records dropped: 64,562 (1.80%)
Successfully processed data for 2024-03.
Combining all monthly data...
Data loading and processing complete!


In [10]:
rides

Unnamed: 0,pickup_datetime,pickup_location_id
984378,2025-02-12 16:25:44,2
1291079,2025-02-15 16:56:40,2
26445,2025-02-02 08:19:12,3
104533,2025-02-03 09:11:09,3
118668,2025-02-03 12:14:43,3
...,...,...
2935417,2025-03-02 22:51:16,263
2934064,2025-03-02 22:51:22,263
2933744,2025-03-02 22:52:48,263
2933286,2025-03-02 22:55:30,263


In [11]:
from src.data_utils import transform_raw_data_into_ts_data
ts_data = transform_raw_data_into_ts_data(rides)

In [12]:
ts_data

Unnamed: 0,pickup_hour,pickup_location_id,rides
0,2025-02-01 23:00:00,2,0
1,2025-02-02 00:00:00,2,0
2,2025-02-02 01:00:00,2,0
3,2025-02-02 02:00:00,2,0
4,2025-02-02 03:00:00,2,0
...,...,...,...
174691,2025-03-02 18:00:00,263,114
174692,2025-03-02 19:00:00,263,93
174693,2025-03-02 20:00:00,263,85
174694,2025-03-02 21:00:00,263,67


In [13]:
ts_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 174696 entries, 0 to 174695
Data columns (total 3 columns):
 #   Column              Non-Null Count   Dtype         
---  ------              --------------   -----         
 0   pickup_hour         174696 non-null  datetime64[ns]
 1   pickup_location_id  174696 non-null  int16         
 2   rides               174696 non-null  int16         
dtypes: datetime64[ns](1), int16(2)
memory usage: 2.0 MB


In [14]:
import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
)



  from .autonotebook import tqdm as notebook_tqdm


2025-03-03 05:24:34,219 INFO: Initializing external client
2025-03-03 05:24:34,220 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-03 05:24:39,167 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1215645


In [15]:
from confluent_kafka import Producer

In [16]:
feature_group.insert(ts_data, write_options={"wait_for_job": False})


ploading Dataframe: 100.00% |███████████████████████| Rows 174696/174696 | Elapsed Time: 00:06 | Remaining Time: 00:00

Launching job: time_series_hourly_feature_group_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1215645/jobs/named/time_series_hourly_feature_group_1_offline_fg_materialization/executions


(Job('time_series_hourly_feature_group_1_offline_fg_materialization', 'SPARK'),
 None)