In [1]:
# Reg fetch new batch of features and compute predictions and save to feature store
# 

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
import sys
import os

# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
import src.config as config

In [7]:
from src.inference import get_feature_store, load_model_from_registry, get_model_predictions
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import src.config as config
import lightgbm as lgb

try:
    # Step 1: Setup and get feature store data
    current_date = pd.Timestamp.now(tz='Etc/UTC')
    feature_store = get_feature_store()
    
    fetch_data_to = current_date - timedelta(hours=1)
    fetch_data_from = fetch_data_to - timedelta(days=30)
    
    print(f"Fetching data from {fetch_data_from} to {fetch_data_to}")
    
    feature_view = feature_store.get_feature_view(
        name=config.FEATURE_VIEW_NAME,
        version=config.FEATURE_VIEW_VERSION
    )
    
    # Get and prepare initial data
    ts_data = feature_view.get_batch_data(
        start_time=fetch_data_from,
        end_time=fetch_data_to
    )
    
    # Step 2: Prepare continuous time series data
    ts_data['pickup_hour'] = ts_data['pickup_hour'].dt.tz_localize(None)
    
    # Create full date range
    date_range = pd.date_range(
        start=ts_data.pickup_hour.min(),
        end=ts_data.pickup_hour.max(),
        freq='H'
    )
    
    # Process each location to ensure continuous data
    processed_locations = []
    
    for location_id in ts_data.pickup_location_id.unique():
        loc_data = ts_data[ts_data.pickup_location_id == location_id].copy()
        
        # Create continuous series for location
        location_series = pd.DataFrame({
            'pickup_hour': date_range,
            'pickup_location_id': location_id
        })
        
        # Merge with actual data
        location_series = location_series.merge(
            loc_data[['pickup_hour', 'rides']], 
            on='pickup_hour', 
            how='left'
        )
        
        # Fill missing values with 0
        location_series['rides'] = location_series['rides'].fillna(0)
        
        # Only keep locations with sufficient data
        if len(location_series) >= 672:  # 28 days * 24 hours
            processed_locations.append(location_series)
    
    # Combine processed locations
    if not processed_locations:
        raise ValueError("No locations with sufficient data found")
    
    ts_data_processed = pd.concat(processed_locations)
    print(f"\nProcessed {len(processed_locations)} locations with sufficient data")
    
    # Step 3: Generate features
    from src.data_utils import transform_ts_data_info_features
    features = transform_ts_data_info_features(
        ts_data_processed,
        window_size=504,  # 21 days
        step_size=24
    )
    
    # Add missing column
    features['rides_t-672'] = 0
    
    # Step 4: Load and prepare model
    model = load_model_from_registry()
    if isinstance(model, lgb.Booster):
        model.params['predict_disable_shape_check'] = True
    elif hasattr(model, 'steps') and isinstance(model.steps[-1][1], lgb.LGBMRegressor):
        model.steps[-1][1].set_params(predict_disable_shape_check=True)
    
    # Step 5: Generate predictions
    predictions = get_model_predictions(model, features)
    
    if predictions is not None and not predictions.empty:
        predictions['pickup_hour'] = current_date.ceil('h')
        
        # Save to feature store
        feature_group = feature_store.get_or_create_feature_group(
            name=config.FEATURE_GROUP_MODEL_PREDICTION,
            version=1,
            description="Predictions from LGBM Model",
            primary_key=['pickup_location_id', 'pickup_hour'],
            event_time='pickup_hour',
        )
        
        feature_group.insert(predictions, write_options={"wait_for_job": False})
        
        print(f"\nSaved {len(predictions)} predictions to feature store")
        print("\nTop 10 locations by predicted demand:")
        print(predictions.sort_values('predicted_demand', ascending=False)[
            ['pickup_location_id', 'predicted_demand']
        ].head(10))

except Exception as e:
    print(f"Error: {str(e)}")
    print("\nDebug Info:")
    if 'ts_data_processed' in locals():
        print(f"Processed data shape: {ts_data_processed.shape}")
        print(f"Locations processed: {ts_data_processed.pickup_location_id.nunique()}")
    
predictions if 'predictions' in locals() else None

2025-03-04 10:10:09,028 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-03-04 10:10:09,034 INFO: Initializing external client
2025-03-04 10:10:09,034 INFO: Base URL: https://c.app.hopsworks.ai:443


2025-03-04 10:10:09,680 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1214690
Fetching data from 2025-02-02 14:10:09.028157+00:00 to 2025-03-04 14:10:09.028157+00:00
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (4.11s) 

Processed 251 locations with sufficient data
2025-03-04 10:10:18,065 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-03-04 10:10:18,068 INFO: Initializing external client
2025-03-04 10:10:18,068 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-04 10:10:18,635 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1214690
Feature Group created successfully, explore it at NE
https://c.app.hopsworks.ai:443/p/1214690/fs/1202325/fg/1403471


Uploading Dataframe: 100.00% |██████████| Rows 1757/1757 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: taxi_hourly_model_prediction_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1214690/jobs/named/taxi_hourly_model_prediction_1_offline_fg_materialization/executions

Saved 1757 predictions to feature store

Top 10 locations by predicted demand:
      pickup_location_id  predicted_demand
1192                 132               6.0
1195                 132               3.0
615                  170               2.0
1191                 132               2.0
1194                 132               2.0
1358                  45               1.0
1363                  45               1.0
1362                  45               1.0
1361                  45               1.0
907                  236               1.0


Unnamed: 0,pickup_location_id,predicted_demand,pickup_hour
0,216,0.0,2025-03-04 16:00:00+00:00
1,216,0.0,2025-03-04 16:00:00+00:00
2,216,0.0,2025-03-04 16:00:00+00:00
3,216,0.0,2025-03-04 16:00:00+00:00
4,216,0.0,2025-03-04 16:00:00+00:00
...,...,...,...
1752,202,0.0,2025-03-04 16:00:00+00:00
1753,202,0.0,2025-03-04 16:00:00+00:00
1754,202,0.0,2025-03-04 16:00:00+00:00
1755,202,0.0,2025-03-04 16:00:00+00:00


In [8]:
from src.inference import load_model_from_registry

model = load_model_from_registry()

2025-03-04 10:10:34,720 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-03-04 10:10:34,725 INFO: Initializing external client
2025-03-04 10:10:34,725 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-04 10:10:35,415 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1214690
Downloading model artifact (0 dirs, 1 files)... DONE

In [10]:
from src.inference import get_feature_store, load_model_from_registry, get_model_predictions
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import src.config as config
import lightgbm as lgb

try:
    # Step 1: Setup time window
    current_date = pd.Timestamp.now(tz='Etc/UTC')
    fetch_data_to = current_date - timedelta(hours=1)
    fetch_data_from = fetch_data_to - timedelta(days=30)
    
    print(f"Fetching data from {fetch_data_from} to {fetch_data_to}")
    
    # Step 2: Get feature store data
    feature_store = get_feature_store()
    feature_view = feature_store.get_feature_view(
        name=config.FEATURE_VIEW_NAME,
        version=config.FEATURE_VIEW_VERSION
    )
    
    ts_data = feature_view.get_batch_data(
        start_time=fetch_data_from,
        end_time=fetch_data_to
    )
    
    ts_data['pickup_hour'] = ts_data['pickup_hour'].dt.tz_localize(None)
    ts_data = ts_data.sort_values(['pickup_location_id', 'pickup_hour'])
    
    print(f"Data loaded: {len(ts_data)} records")
    
    # Step 3: Generate initial features
    from src.data_utils import transform_ts_data_info_features
    features = transform_ts_data_info_features(
        ts_data,
        window_size=504,  # 21 days
        step_size=24
    )
    
    # Step 4: Add all required time lag columns
    max_lag = 672  # Maximum required lag
    current_lags = set(col for col in features.columns if col.startswith('rides_t-'))
    
    # Add missing time lag columns with zeros
    for lag in range(1, max_lag + 1):
        col_name = f'rides_t-{lag}'
        if col_name not in current_lags:
            features[col_name] = 0
    
    # Step 5: Load and prepare model
    model = load_model_from_registry()
    
    # Modify model parameters to handle shape mismatch
    if isinstance(model, lgb.Booster):
        model.params['predict_disable_shape_check'] = True
    elif hasattr(model, 'steps') and isinstance(model.steps[-1][1], lgb.LGBMRegressor):
        model.steps[-1][1].set_params(predict_disable_shape_check=True)
    
    # Step 6: Generate predictions
    predictions = get_model_predictions(model, features)
    
    if predictions is not None and not predictions.empty:
        # Add timestamp and save predictions
        predictions['pickup_hour'] = current_date.ceil('h')
        
        feature_group = feature_store.get_or_create_feature_group(
            name=config.FEATURE_GROUP_MODEL_PREDICTION,
            version=1,
            description="Predictions from LGBM Model",
            primary_key=['pickup_location_id', 'pickup_hour'],
            event_time='pickup_hour',
        )
        
        feature_group.insert(predictions, write_options={"wait_for_job": False})
        
        print(f"\nSaved {len(predictions)} predictions to feature store")
        print("\nTop 10 locations by predicted demand:")
        print(predictions.sort_values('predicted_demand', ascending=False)[
            ['pickup_location_id', 'predicted_demand']
        ].head(10))

except Exception as e:
    print(f"Error: {str(e)}")
    print("\nDebug Info:")
    if 'features' in locals():
        print(f"Features shape: {features.shape}")
        print(f"Number of time lag columns: {len([c for c in features.columns if c.startswith('rides_t-')])}")
    
# Display predictions
predictions if 'predictions' in locals() else None

Fetching data from 2025-02-02 14:11:47.864426+00:00 to 2025-03-04 14:11:47.864426+00:00
2025-03-04 10:11:47,864 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-03-04 10:11:47,868 INFO: Initializing external client
2025-03-04 10:11:47,869 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-04 10:11:48,454 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1214690
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (5.46s) 
Data loaded: 168672 records
2025-03-04 10:11:57,533 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-03-04 10:11:57,537 INFO: Initializing external client
2025-03-04 10:11:57,537 INFO: Base URL: https://c.app.hopsworks.ai:443




2025-03-04 10:11:58,088 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1214690
Downloading model artifact (0 dirs, 1 files)... DONE

Uploading Dataframe: 100.00% |██████████| Rows 1757/1757 | Elapsed Time: 00:00 | Remaining Time: 00:00



Saved 1757 predictions to feature store

Top 10 locations by predicted demand:
      pickup_location_id  predicted_demand
889                  137              20.0
1390                 211              20.0
1391                 211              19.0
527                   79              19.0
499                   75              19.0
721                  113              18.0
941                  144              18.0
1637                 246              18.0
528                   79              18.0
1526                 231              18.0


Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/lohithvattikuti/Resources/jobs/taxi_hourly_model_prediction_1_offline_fg_materialization/config_1741101021072) to trigger the materialization job again.


Unnamed: 0,pickup_location_id,predicted_demand,pickup_hour
0,2,0.0,2025-03-04 16:00:00+00:00
1,2,0.0,2025-03-04 16:00:00+00:00
2,2,0.0,2025-03-04 16:00:00+00:00
3,2,0.0,2025-03-04 16:00:00+00:00
4,2,0.0,2025-03-04 16:00:00+00:00
...,...,...,...
1752,263,4.0,2025-03-04 16:00:00+00:00
1753,263,-0.0,2025-03-04 16:00:00+00:00
1754,263,-0.0,2025-03-04 16:00:00+00:00
1755,263,2.0,2025-03-04 16:00:00+00:00


In [11]:
predictions["pickup_hour"] = current_date.ceil('h')
predictions

Unnamed: 0,pickup_location_id,predicted_demand,pickup_hour
0,2,0.0,2025-03-04 16:00:00+00:00
1,2,0.0,2025-03-04 16:00:00+00:00
2,2,0.0,2025-03-04 16:00:00+00:00
3,2,0.0,2025-03-04 16:00:00+00:00
4,2,0.0,2025-03-04 16:00:00+00:00
...,...,...,...
1752,263,4.0,2025-03-04 16:00:00+00:00
1753,263,-0.0,2025-03-04 16:00:00+00:00
1754,263,-0.0,2025-03-04 16:00:00+00:00
1755,263,2.0,2025-03-04 16:00:00+00:00


In [12]:
from src.inference import get_feature_store

feature_group = get_feature_store().get_or_create_feature_group(
    name=config.FEATURE_GROUP_MODEL_PREDICTION,
    version=1,
    description="Predictions from LGBM Model",
    primary_key=["pickup_location_id", "pickup_hour"],
    event_time="pickup_hour",
)

2025-03-04 10:12:28,664 INFO: Closing external client and cleaning up certificates.


Connection closed.
2025-03-04 10:12:28,668 INFO: Initializing external client
2025-03-04 10:12:28,668 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-04 10:12:29,340 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1214690


In [13]:
feature_group.insert(predictions, write_options={"wait_for_job": False})

Uploading Dataframe: 100.00% |██████████| Rows 1757/1757 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: taxi_hourly_model_prediction_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1214690/jobs/named/taxi_hourly_model_prediction_1_offline_fg_materialization/executions


(Job('taxi_hourly_model_prediction_1_offline_fg_materialization', 'SPARK'),
 None)