# inspect the raw data

In [1]:
from datetime import datetime
import pandas as pd

from feast import FeatureStore

In [2]:
pd.read_parquet("data/driver_stats.parquet")

Unnamed: 0,event_timestamp,driver_id,conv_rate,acc_rate,avg_daily_trips,created
0,2023-03-31 15:00:00+00:00,1005,0.115611,0.554137,318,2023-04-15 15:00:47.216
1,2023-03-31 16:00:00+00:00,1005,0.863201,0.755645,386,2023-04-15 15:00:47.216
2,2023-03-31 17:00:00+00:00,1005,0.310922,0.456694,924,2023-04-15 15:00:47.216
3,2023-03-31 18:00:00+00:00,1005,0.518537,0.884248,989,2023-04-15 15:00:47.216
4,2023-03-31 19:00:00+00:00,1005,0.745835,0.864634,272,2023-04-15 15:00:47.216
...,...,...,...,...,...,...
1802,2023-04-15 13:00:00+00:00,1001,0.575827,0.753413,358,2023-04-15 15:00:47.216
1803,2023-04-15 14:00:00+00:00,1001,0.755292,0.665238,164,2023-04-15 15:00:47.216
1804,2021-04-12 07:00:00+00:00,1001,0.511466,0.254260,233,2023-04-15 15:00:47.216
1805,2023-04-08 03:00:00+00:00,1003,0.189217,0.900712,881,2023-04-15 15:00:47.216


# register feature definitions and deploy feature store

In [4]:
! feast apply

[1m[94mNo changes to registry
[1m[94mNo changes to infrastructure


# generate training data or power batch scoring models

## generate training data

In [4]:
entity_df = pd.DataFrame.from_dict(
    {
        "driver_id": [1001, 1002, 1003,],
        "event_timestamp": [
            datetime(2021, 4, 12, 10, 59, 42),
            datetime(2021, 4, 12, 8, 12, 10),
            datetime(2021, 4, 12, 16, 40, 26),
        ],
        "label_driver_reported_satisfaction": [1, 5, 3],
        "val_to_add": [1, 2, 3],
        "val_to_add_2": [10, 20, 30],
    }
)

store = FeatureStore(repo_path=".")

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
).to_df()

print("----- Feature schema -----", end="\n")
print(training_df.info())

print()
print("----- Example features -----", end="\n")
print(training_df.head())

----- Feature schema -----
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 10 columns):
 #   Column                              Non-Null Count  Dtype              
---  ------                              --------------  -----              
 0   driver_id                           3 non-null      int64              
 1   event_timestamp                     3 non-null      datetime64[ns, UTC]
 2   label_driver_reported_satisfaction  3 non-null      int64              
 3   val_to_add                          3 non-null      int64              
 4   val_to_add_2                        3 non-null      int64              
 5   conv_rate                           3 non-null      float32            
 6   acc_rate                            3 non-null      float32            
 7   avg_daily_trips                     3 non-null      int32              
 8   conv_rate_plus_val1                 3 non-null      float64            
 9   conv_rate_plus_val2 

## run offline inference (batch scoring)

In [9]:
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
).to_df()

print()
print("----- Example features -----", end="\n")
print(training_df.head())


----- Example features -----
   driver_id                  event_timestamp  \
0       1001 2023-04-15 06:48:48.884403+00:00   
1       1002 2023-04-15 06:48:48.884403+00:00   
2       1003 2023-04-15 06:48:48.884403+00:00   

   label_driver_reported_satisfaction  val_to_add  val_to_add_2  conv_rate  \
0                                   1           1            10   0.805241   
1                                   5           2            20   0.093653   
2                                   3           3            30   0.896571   

   acc_rate  avg_daily_trips  conv_rate_plus_val1  conv_rate_plus_val2  
0  0.683805               51             1.805241            10.805241  
1  0.008572              398             2.093653            20.093653  
2  0.487203              601             3.896571            30.896571  


# ingest batch features into online store

In [6]:
! CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") && \
  feast materialize-incremental $CURRENT_TIME

Materializing [1m[32m2[0m feature views to [1m[32m2023-04-15 17:11:13+09:00[0m into the [1m[32msqlite[0m online store.

[1m[32mdriver_hourly_stats_fresh[0m from [1m[32m2023-04-15 16:37:55+09:00[0m to [1m[32m2023-04-15 17:11:13+09:00[0m:
100%|███████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 1295.34it/s]
[1m[32mdriver_hourly_stats[0m from [1m[32m2023-04-15 16:37:55+09:00[0m to [1m[32m2023-04-15 17:11:13+09:00[0m:
100%|███████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 4324.92it/s]


# fetch feature vectors for inference

In [14]:
from pprint import pprint
from feast import FeatureStore

store = FeatureStore(repo_path=".")

feature_vector = store.get_online_features(
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
    ],
    entity_rows=[
        # {join_key: entity_value}
        {"driver_id": 1004},
        {"driver_id": 1005},
    ],
).to_dict()

pprint(feature_vector)

{'acc_rate': [0.9208038449287415, 0.05192827060818672],
 'avg_daily_trips': [115, 598],
 'conv_rate': [0.318071186542511, 0.8333941102027893],
 'driver_id': [1004, 1005]}


# use feature service to fetch online features instead

In [9]:
from feast import FeatureService
driver_stats_fs = FeatureService(
    name="driver_activity_v1",
    features=[driver_hourly_stats_view],
)

SyntaxError: invalid syntax (2733840029.py, line 1)

In [8]:
from pprint import pprint
from feast import FeatureStore


feature_store = FeatureStore('.')  # Initialize the feature store


feature_service = feature_store.get_feature_service("driver_activity_v1")
feature_vector = feature_store.get_online_features(
    features=feature_service,
    entity_rows=[
        # {join_key: entity_value}
        {"driver_id": 1004},
        {"driver_id": 1005},
    ],
).to_dict()

pprint(feature_vector)

RequestDataNotFoundInEntityRowsException: Required request data source features ['val_to_add', 'val_to_add_2'] not found in the entity rows, but required by feature views

In [12]:
feature_service

<FeatureService(name = driver_activity_v1, _features = [], feature_view_projections = [FeatureViewProjection(name='driver_hourly_stats', name_alias='', desired_features=[], features=[conv_rate-Float32], join_key_map={}), FeatureViewProjection(name='transformed_conv_rate', name_alias='', desired_features=[], features=[conv_rate_plus_val1-Float64, conv_rate_plus_val2-Float64], join_key_map={})], description = , tags = {}, owner = , created_timestamp = 2023-04-15 06:06:10.707176, last_updated_timestamp = 2023-04-15 06:06:10.707176, logging_config = None)>