In [94]:
import sys
from pathlib import Path
from datetime import datetime

# Añade src al path
sys.path.append(str(Path().resolve().parent / "src"))

import config

In [95]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## (Opcional) Este código permite conocer lo que hay en Hopsworks

In [96]:
# Conocer qué datos hay en el Feature Store
import hopsworks
import pandas as pd

FV_NAME = "time_series_hourly_feature_view"
FV_VERSION = 1

## 1) Conéctate a Hopsworks y al Feature Store
project = hopsworks.login(project=config.HOPSWORKS_PROJECT_NAME, api_key_value=config.HOPSWORKS_API_KEY)
fs  = project.get_feature_store()

# 2) Lista tus Feature Groups
fgs = fs.get_feature_groups(name='time_series_hourly_feature_group')
print("=== Feature Groups ===")
for fg in fgs:
    print(f"- {fg.name} v{fg.version}")


# 3) Para cada FG, saca número de filas y rango de timestamp
for fg in fgs:
    print(f"\n> FG: {fg.name} v{fg.version}")
    df = fg.read()            # ajusta el limit si hace falta
    print("  • Shape:", df.shape)
    # cambia 'pickup_hour' por la columna timestamp de tu FG si difiere
    if 'pickup_hour' in df.columns:
        print("  • Min pickup_hour:", df.pickup_hour.min())
        print("  • Max pickup_hour:", df.pickup_hour.max())
    print("  • Primeras filas:")
    print(df.head(5))

# 4) Lista tus Feature Views
fvs = fs.get_feature_views(name='time_series_hourly_feature_view')
print("\n=== Feature Views ===")
for fv in fvs:
    print(f"- {fv.name} v{fv.version}")

# 5) Para la FV que uses, haz un batch_data amplio
fv = fs.get_feature_view(name=FV_NAME, version=FV_VERSION)
print(f"\n> Feature View: {fv.name} v{fv.version}")

# Prueba un rango amplio: desde hace un año hasta hoy
start = pd.Timestamp.today() - pd.Timedelta(days=365)
end   = pd.Timestamp.today()
df_fv = fv.get_batch_data(start_time=start, end_time=end)
print("  • Shape batch_data:", df_fv.shape)
print("  • Min timestamp:", df_fv['pickup_hour'].min())
print("  • Max timestamp:", df_fv['pickup_hour'].max())
print("  • Primeras filas:")
print(df_fv.head(5))


2025-05-23 17:55:33,759 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-23 17:55:33,771 INFO: Initializing external client
2025-05-23 17:55:33,772 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-23 17:55:34,898 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1224869
=== Feature Groups ===
- time_series_hourly_feature_group v1

> FG: time_series_hourly_feature_group v1
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (12.53s) 
  • Shape: (2854812, 3)
  • Min pickup_hour: 2024-01-01 00:00:00+00:00
  • Max pickup_hour: 2025-05-08 16:00:00+00:00
  • Primeras filas:
                pickup_hour  pickup_location_id  rides
0 2024-10-16 16:00:00+00:00                 118      0
1 2024-06-30 06:00:00+00:00                 226      9
2 2024-07-25 09:00:00+00:00                 132    163
3 2024-06-15 16:00:00+00:00                 138    163
4 2024-09-19 03:00:00+00:0

## Conexión al proyecto, feature store y model registry en Hopsworks

In [None]:
import hopsworks

## 1) Conexión a Hopsworks y proyecto
project = hopsworks.login(project=config.HOPSWORKS_PROJECT_NAME, api_key_value=config.HOPSWORKS_API_KEY) # Conexión al proyecto
fs      = project.get_feature_store() # Conexión al Feature Store
mr      = project.get_model_registry() # Conexión al Model Registry

print("Conectado a proyecto:", project.name)
print("Conectado a Feature Store:", fs.name)
print("Conectado a Model Registry:", mr)

2025-05-23 15:50:05,294 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-23 15:50:05,298 INFO: Initializing external client
2025-05-23 15:50:05,298 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-23 15:50:06,373 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1224869
Conectado a proyecto: taxo_demand
Conectado a Feature Store: taxo_demand_featurestore
Conectado a Model Registry: ModelRegistry(project: 'taxo_demand')


## Leemos Batch Score del Feature View

In [99]:
from datetime import datetime, timedelta
import numpy as np

current_date = pd.to_datetime(datetime.utcnow()).floor('H') - timedelta(days=100)
print(f'{current_date=}')

fetch_data_from = current_date - timedelta(days=1)
fetch_data_to = current_date - timedelta(hours=1)

n_features = 24 # 24 horas de datos anteriores


# 3) Lectura batch cruda de la Feature View
fv = fs.get_feature_view(name=FV_NAME, version=FV_VERSION)

ts_data = fv.get_batch_data(
    start_time=current_date - timedelta(days=1),
    end_time=current_date + timedelta(days=1)
)
print("Registros en ±1 día:", ts_data.shape[0])
print("Timestamps únicos:", ts_data["pickup_hour"].drop_duplicates().sort_values().tolist()[:5])

ts_data.head()

current_date=Timestamp('2025-02-12 16:00:00')
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (6.25s) 
Registros en ±1 día: 12624
Timestamps únicos: [Timestamp('2025-02-11 16:00:00+0000', tz='Etc/UTC'), Timestamp('2025-02-11 17:00:00+0000', tz='Etc/UTC'), Timestamp('2025-02-11 18:00:00+0000', tz='Etc/UTC'), Timestamp('2025-02-11 19:00:00+0000', tz='Etc/UTC'), Timestamp('2025-02-11 20:00:00+0000', tz='Etc/UTC')]


Unnamed: 0,pickup_hour,pickup_location_id,rides
0,2025-02-13 00:00:00+00:00,235,0
1,2025-02-12 08:00:00+00:00,38,0
2,2025-02-12 13:00:00+00:00,235,0
3,2025-02-12 02:00:00+00:00,78,0
4,2025-02-12 00:00:00+00:00,53,2


In [100]:
ts_data.describe(include='all')

Unnamed: 0,pickup_hour,pickup_location_id,rides
count,12624,12624.0,12624.0
mean,2025-02-12 15:30:00+00:00,133.224335,20.553866
min,2025-02-11 16:00:00+00:00,1.0,0.0
25%,2025-02-12 03:45:00+00:00,66.0,0.0
50%,2025-02-12 15:30:00+00:00,134.0,1.0
75%,2025-02-13 03:15:00+00:00,200.0,5.0
max,2025-02-13 15:00:00+00:00,265.0,810.0
std,,76.748279,62.537802


## Filtramos datos por el período de interés

In [102]:
# filtrar datos en el período de interés
pickup_ts_from = pd.Timestamp(fetch_data_from).tz_localize("UTC") # convertir a UTC
pickup_ts_to   = pd.Timestamp(fetch_data_to).tz_localize("UTC") # convertir a UTC

ts_data = ts_data[ts_data.pickup_hour.between(pickup_ts_from, pickup_ts_to)] # filtrar por rango

# ordenar datos por location y tiempo
ts_data.sort_values(by=['pickup_location_id', 'pickup_hour'], inplace=True)

# valida que no faltan datos en el feature store
# el número de registros debe ser igual al número de features por location_id
location_ids = ts_data['pickup_location_id'].unique()
print("Ubicaciones únicas:", len(location_ids))
assert len(ts_data) == n_features * len(location_ids), \
    "Time-series data is not complete. Make sure your feature pipeline is up and runnning."


ts_data.head()

Ubicaciones únicas: 263


Unnamed: 0,pickup_hour,pickup_location_id,rides
4668,2025-02-11 16:00:00+00:00,1,1
6321,2025-02-11 17:00:00+00:00,1,0
11569,2025-02-11 18:00:00+00:00,1,1
3371,2025-02-11 19:00:00+00:00,1,1
4404,2025-02-11 20:00:00+00:00,1,0


In [103]:
# transponer los datos de la serie temporal como un vector de características, para cada `pickup_location_id`.
x = np.ndarray(shape=(len(location_ids), n_features), dtype=np.float32)
for i, location_id in enumerate(location_ids):
    ts_data_i = ts_data.loc[ts_data.pickup_location_id == location_id, :]
    ts_data_i = ts_data_i.sort_values(by=['pickup_hour'])
    x[i, :] = ts_data_i['rides'].values

# numpy arrays a Pandas dataframes
features = pd.DataFrame(
    x,
    columns=[f'rides_previous_{i+1}_hour' for i in reversed(range(n_features))]
)
features['pickup_hour'] = current_date
features['pickup_location_id'] = location_ids
features.sort_values(by=['pickup_location_id'], inplace=True)
features.head()


Unnamed: 0,rides_previous_24_hour,rides_previous_23_hour,rides_previous_22_hour,rides_previous_21_hour,rides_previous_20_hour,rides_previous_19_hour,rides_previous_18_hour,rides_previous_17_hour,rides_previous_16_hour,rides_previous_15_hour,...,rides_previous_8_hour,rides_previous_7_hour,rides_previous_6_hour,rides_previous_5_hour,rides_previous_4_hour,rides_previous_3_hour,rides_previous_2_hour,rides_previous_1_hour,pickup_hour,pickup_location_id
0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,2.0,1.0,2025-02-12 16:00:00,1
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2025-02-12 16:00:00,2
2,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2025-02-12 16:00:00,3
3,4.0,6.0,4.0,7.0,9.0,12.0,12.0,14.0,13.0,8.0,...,18.0,17.0,10.0,8.0,3.0,6.0,4.0,4.0,2025-02-12 16:00:00,4
4,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2025-02-12 16:00:00,5


## Cargar modelo para predecir

In [104]:
import joblib

# 3. Cargar el modelo del Model Registry
## se obtiene del config
MODEL_NAME = 'taxi_demand_predictor_next_hour'
MODEL_VERSION = 4
MODEL_FILE = 'rf_model_v2.pkl'

mr = project.get_model_registry() # Conexión al Model Registry

model = mr.get_model(name=MODEL_NAME, version=MODEL_VERSION) # obtenemos el modelo
model_dir = model.download()
model = joblib.load(Path(model_dir)  / MODEL_FILE)

print(f"Modelo '{MODEL_NAME}' v{MODEL_VERSION} cargado correctamente.")

Downloading: 0.000%|          | 0/478321217 elapsed<00:00 remaining<?

Modelo 'taxi_demand_predictor_next_hour' v4 cargado correctamente.


In [106]:
features = features.drop(columns=['pickup_hour'])
features

Unnamed: 0,rides_previous_24_hour,rides_previous_23_hour,rides_previous_22_hour,rides_previous_21_hour,rides_previous_20_hour,rides_previous_19_hour,rides_previous_18_hour,rides_previous_17_hour,rides_previous_16_hour,rides_previous_15_hour,...,rides_previous_9_hour,rides_previous_8_hour,rides_previous_7_hour,rides_previous_6_hour,rides_previous_5_hour,rides_previous_4_hour,rides_previous_3_hour,rides_previous_2_hour,rides_previous_1_hour,pickup_location_id
0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,1.0,1
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2
2,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3
3,4.0,6.0,4.0,7.0,9.0,12.0,12.0,14.0,13.0,8.0,...,8.0,18.0,17.0,10.0,8.0,3.0,6.0,4.0,4.0,4
4,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
258,37.0,52.0,61.0,65.0,52.0,27.0,35.0,14.0,17.0,5.0,...,6.0,4.0,10.0,11.0,25.0,30.0,36.0,45.0,45.0,261
259,103.0,148.0,105.0,60.0,48.0,39.0,21.0,6.0,2.0,0.0,...,171.0,200.0,203.0,140.0,117.0,98.0,98.0,85.0,141.0,262
260,123.0,132.0,144.0,107.0,106.0,123.0,82.0,27.0,9.0,10.0,...,158.0,172.0,164.0,110.0,109.0,117.0,121.0,122.0,122.0,263
261,7.0,7.0,15.0,5.0,10.0,7.0,7.0,4.0,3.0,0.0,...,11.0,15.0,15.0,14.0,10.0,12.0,11.0,14.0,10.0,264


## Hacer predicciones

In [108]:
# 4. Predecir la demanda
predictions = model.predict(features)

results = pd.DataFrame()
results['pickup_location_id'] = features['pickup_location_id'].values
results['predicted_demand'] = predictions.round(0)
results['pickup_hour'] = current_date
results

Unnamed: 0,pickup_location_id,predicted_demand,pickup_hour
0,1,1.0,2025-02-12 16:00:00
1,2,0.0,2025-02-12 16:00:00
2,3,0.0,2025-02-12 16:00:00
3,4,5.0,2025-02-12 16:00:00
4,5,0.0,2025-02-12 16:00:00
...,...,...,...
258,261,47.0,2025-02-12 16:00:00
259,262,130.0,2025-02-12 16:00:00
260,263,126.0,2025-02-12 16:00:00
261,264,11.0,2025-02-12 16:00:00


## Guardar predicciones

Guardar estas predicciones en el feature store, para que puedan ser consumidas posteriormente por nuestra aplicación Streamlit.

In [93]:
# 1) Conéctate al Feature Store
fs = project.get_feature_store()

# 2) Define el nombre y la versión de tu FG de predicciones
FG_NAME    = "taxi_demand_predictions"
FG_VERSION = 1

# 3) Crea (o recupera) el Feature Group
#    - primary_key: las columnas que identifican unívocamente cada fila
#    - event_time: la columna de marca temporal (si lo desea, aunque basta con definirla como primary_key)
#    - description: texto descriptivo
fg_pred = fs.get_or_create_feature_group(
    name=FG_NAME,
    version=FG_VERSION,
    description="Predictions generate by our production model",
    primary_key = ['pickup_location_id', 'pickup_hour'],
    event_time='pickup_hour',
)

fg_pred.insert(results, write_options={"wait_for_job": False})

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1224869/fs/1212495/fg/1479041


Uploading Dataframe: 100.00% |██████████| Rows 263/263 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: taxi_demand_predictions_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1224869/jobs/named/taxi_demand_predictions_1_offline_fg_materialization/executions


(Job('taxi_demand_predictions_1_offline_fg_materialization', 'SPARK'), None)