In [1]:
from datetime import datetime
import pandas as pd

from feast import FeatureStore

In [2]:
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for 
# more details on how to retrieve for all entities in the offline store instead
entity_df = pd.DataFrame.from_dict(
    {
        # entity's join key -> entity values
        "driver_id": [1001, 1002, 1003],
        # "event_timestamp" (reserved key) -> timestamps
        "event_timestamp": [
            datetime(2021, 4, 12, 10, 59, 42),
            datetime(2021, 4, 12, 8, 12, 10),
            datetime(2021, 4, 12, 16, 40, 26),
        ],
        # (optional) label name -> label values. Feast does not process these
        "label_driver_reported_satisfaction": [1, 5, 3],
        # values we're using for an on-demand transformation
        "val_to_add": [1, 2, 3],
        "val_to_add_2": [10, 20, 30],
    }
)

In [7]:
entity_df

Unnamed: 0,driver_id,event_timestamp,label_driver_reported_satisfaction,val_to_add,val_to_add_2
0,1001,2021-04-12 10:59:42,1,1,10
1,1002,2021-04-12 08:12:10,5,2,20
2,1003,2021-04-12 16:40:26,3,3,30


In [3]:

store = FeatureStore(repo_path=".")

In [4]:

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
).to_df()

See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])


In [8]:
training_df

Unnamed: 0,driver_id,event_timestamp,label_driver_reported_satisfaction,val_to_add,val_to_add_2,conv_rate,acc_rate,avg_daily_trips,conv_rate_plus_val1,conv_rate_plus_val2
0,1001,2021-04-12 10:59:42+00:00,1,1,10,0.668698,0.160657,608,1.668698,10.668698
1,1002,2021-04-12 08:12:10+00:00,5,2,20,0.182542,0.462864,948,2.182542,20.182542
2,1003,2021-04-12 16:40:26+00:00,3,3,30,0.170621,0.616805,276,3.170621,30.170621


In [5]:

print("----- Feature schema -----\n")
print(training_df.info())

----- Feature schema -----

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 10 columns):
 #   Column                              Non-Null Count  Dtype              
---  ------                              --------------  -----              
 0   driver_id                           3 non-null      int64              
 1   event_timestamp                     3 non-null      datetime64[ns, UTC]
 2   label_driver_reported_satisfaction  3 non-null      int64              
 3   val_to_add                          3 non-null      int64              
 4   val_to_add_2                        3 non-null      int64              
 5   conv_rate                           3 non-null      float32            
 6   acc_rate                            3 non-null      float32            
 7   avg_daily_trips                     3 non-null      int32              
 8   conv_rate_plus_val1                 3 non-null      float64            
 9   conv_rate_plus_val2

In [6]:
print()
print("----- Example features -----\n")
print(training_df.head())


----- Example features -----

   driver_id           event_timestamp  label_driver_reported_satisfaction  \
0       1001 2021-04-12 10:59:42+00:00                                   1   
1       1002 2021-04-12 08:12:10+00:00                                   5   
2       1003 2021-04-12 16:40:26+00:00                                   3   

   val_to_add  val_to_add_2  conv_rate  acc_rate  avg_daily_trips  \
0           1            10   0.668698  0.160657              608   
1           2            20   0.182542  0.462864              948   
2           3            30   0.170621  0.616805              276   

   conv_rate_plus_val1  conv_rate_plus_val2  
0             1.668698            10.668698  
1             2.182542            20.182542  
2             3.170621            30.170621  


In [9]:
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
).to_df()

print("\n----- Example features -----\n")
print(training_df.head())


----- Example features -----

   driver_id                  event_timestamp  \
0       1001 2023-07-29 00:46:41.798888+00:00   
1       1002 2023-07-29 00:46:41.798888+00:00   
2       1003 2023-07-29 00:46:41.798888+00:00   

   label_driver_reported_satisfaction  val_to_add  val_to_add_2  conv_rate  \
0                                   1           1            10   0.103490   
1                                   5           2            20   0.806228   
2                                   3           3            30   0.421226   

   acc_rate  avg_daily_trips  conv_rate_plus_val1  conv_rate_plus_val2  
0  0.757674              493             1.103490            10.103490  
1  0.433632              841             2.806228            20.806228  
2  0.068001              240             3.421226            30.421226  


See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])


In [10]:
print("\n--- Load features into online store ---")
store.materialize_incremental(end_date=datetime.now())


--- Load features into online store ---
Materializing [1m[32m2[0m feature views to [1m[32m2023-07-28 21:49:27-03:00[0m into the [1m[32msqlite[0m online store.

[1m[32mdriver_hourly_stats[0m from [1m[32m2023-07-28 00:49:27-03:00[0m to [1m[32m2023-07-28 21:49:27-03:00[0m:


100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 333.32it/s]


[1m[32mdriver_hourly_stats_fresh[0m from [1m[32m2023-07-28 00:49:27-03:00[0m to [1m[32m2023-07-28 18:49:27-03:00[0m:


100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 555.30it/s]


In [11]:
from pprint import pprint

feature_vector = store.get_online_features(
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
    ],
    entity_rows=[
        # {join_key: entity_value}
        {"driver_id": 1004},
        {"driver_id": 1005},
    ],
).to_dict()

pprint(feature_vector)

{'acc_rate': [0.7958470582962036, 0.9991276860237122],
 'avg_daily_trips': [850, 85],
 'conv_rate': [0.4983433485031128, 0.5325374007225037],
 'driver_id': [1004, 1005]}


In [12]:
def fetch_online_features(store, source: str = ""):
    entity_rows = [
        # {join_key: entity_value}
        {
            "driver_id": 1001,
            "val_to_add": 1000,
            "val_to_add_2": 2000,
        },
        {
            "driver_id": 1002,
            "val_to_add": 1001,
            "val_to_add_2": 2002,
        },
    ]
    if source == "feature_service":
        features_to_fetch = store.get_feature_service("driver_activity_v1")
    elif source == "push":
        features_to_fetch = store.get_feature_service("driver_activity_v3")
    else:
        features_to_fetch = [
            "driver_hourly_stats:acc_rate",
            "transformed_conv_rate:conv_rate_plus_val1",
            "transformed_conv_rate:conv_rate_plus_val2",
        ]
    returned_features = store.get_online_features(
        features=features_to_fetch,
        entity_rows=entity_rows,
    ).to_dict()
    for key, value in sorted(returned_features.items()):
        print(key, " : ", value)


In [13]:
print("\n--- Online features ---")
fetch_online_features(store)


--- Online features ---
acc_rate  :  [0.757673978805542, 0.43363234400749207]
conv_rate_plus_val1  :  [1000.1034897714853, 1001.8062283396721]
conv_rate_plus_val2  :  [2000.1034897714853, 2002.806228339672]
driver_id  :  [1001, 1002]


See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])


In [14]:
print("\n--- Online features retrieved (instead) through a feature service---")
fetch_online_features(store, source="feature_service")



--- Online features retrieved (instead) through a feature service---
conv_rate  :  [0.10348977148532867, 0.8062283396720886]
conv_rate_plus_val1  :  [1000.1034897714853, 1001.8062283396721]
conv_rate_plus_val2  :  [2000.1034897714853, 2002.806228339672]
driver_id  :  [1001, 1002]


See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])


In [15]:
print(
    "\n--- Online features retrieved (using feature service v3, which uses a feature view with a push source---"
)
fetch_online_features(store, source="push")



--- Online features retrieved (using feature service v3, which uses a feature view with a push source---
acc_rate  :  [0.757673978805542, 0.43363234400749207]
avg_daily_trips  :  [493, 841]
conv_rate  :  [0.10348977148532867, 0.8062283396720886]
conv_rate_plus_val1  :  [1000.1034897714853, 1001.8062283396721]
conv_rate_plus_val2  :  [2000.1034897714853, 2002.806228339672]
driver_id  :  [1001, 1002]


See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])


In [16]:
from feast.data_source import PushMode

In [17]:
print("\n--- Simulate a stream event ingestion of the hourly stats df ---")
event_df = pd.DataFrame.from_dict(
    {
        "driver_id": [1001],
        "event_timestamp": [
            datetime.now(),
        ],
        "created": [
            datetime.now(),
        ],
        "conv_rate": [1.0],
        "acc_rate": [1.0],
        "avg_daily_trips": [1000],
    }
)


--- Simulate a stream event ingestion of the hourly stats df ---


In [18]:
print(event_df)

   driver_id            event_timestamp                    created  conv_rate  \
0       1001 2023-07-28 21:56:01.622576 2023-07-28 21:56:01.622576        1.0   

   acc_rate  avg_daily_trips  
0       1.0             1000  


In [19]:
store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE_AND_OFFLINE)

In [20]:
print("\n--- Online features again with updated values from a stream push---")
fetch_online_features(store, source="push")



--- Online features again with updated values from a stream push---
acc_rate  :  [1.0, 0.43363234400749207]
avg_daily_trips  :  [1000, 841]
conv_rate  :  [1.0, 0.8062283396720886]
conv_rate_plus_val1  :  [1001.0, 1001.8062283396721]
conv_rate_plus_val2  :  [2001.0, 2002.806228339672]
driver_id  :  [1001, 1002]


See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  return np.find_common_type(types, [])
