In [45]:
from datetime import datetime
import pandas as pd

from feast import FeatureStore

# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for 
# more details on how to retrieve for all entities in the offline store instead
entity_df = pd.DataFrame.from_dict(
    {
        # entity's join key -> entity values
        "driver_id": [1001, 1002, 1003],
        # "event_timestamp" (reserved key) -> timestamps
        "event_timestamp": [
            datetime(2021, 4, 12, 10, 59, 42),
            datetime(2021, 4, 12, 8, 12, 10),
            datetime(2021, 4, 12, 16, 40, 26),
        ],
        # (optional) label name -> label values. Feast does not process these
        "label_driver_reported_satisfaction": [1, 5, 3],
        # values we're using for an on-demand transformation
        "val_to_add": [1, 2, 3],
        "val_to_add_2": [10, 20, 30],
    }
)

store = FeatureStore(repo_path=".")

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
).to_df()

print("----- Feature schema -----\n")
print(training_df.info())

print()
print("----- Example features -----\n")
print(training_df.head())

----- Feature schema -----

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 10 columns):
 #   Column                              Non-Null Count  Dtype              
---  ------                              --------------  -----              
 0   driver_id                           3 non-null      int64              
 1   event_timestamp                     3 non-null      datetime64[ns, UTC]
 2   label_driver_reported_satisfaction  3 non-null      int64              
 3   val_to_add                          3 non-null      int64              
 4   val_to_add_2                        3 non-null      int64              
 5   conv_rate                           3 non-null      float32            
 6   acc_rate                            3 non-null      float32            
 7   avg_daily_trips                     3 non-null      int32              
 8   conv_rate_plus_val1                 3 non-null      float64            
 9   conv_rate_plus_val2

In [46]:
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
).to_df()

print("\n----- Example features -----\n")
print(training_df.head())


----- Example features -----

   driver_id                  event_timestamp  \
0       1001 2025-04-13 14:57:26.040294+00:00   
1       1002 2025-04-13 14:57:26.040294+00:00   
2       1003 2025-04-13 14:57:26.040294+00:00   

   label_driver_reported_satisfaction  val_to_add  val_to_add_2  conv_rate  \
0                                   1           1            10   0.099013   
1                                   5           2            20   0.220330   
2                                   3           3            30   0.377322   

   acc_rate  avg_daily_trips  conv_rate_plus_val1  conv_rate_plus_val2  
0  0.885061              807             1.099013            10.099013  
1  0.098932              532             2.220330            20.220330  
2  0.241187              323             3.377322            30.377322  


In [47]:
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
).to_df()

print("\n----- Example features -----\n")
print(training_df.head())


----- Example features -----

   driver_id                  event_timestamp  \
0       1001 2025-04-13 14:57:26.354207+00:00   
1       1002 2025-04-13 14:57:26.354207+00:00   
2       1003 2025-04-13 14:57:26.354207+00:00   

   label_driver_reported_satisfaction  val_to_add  val_to_add_2  conv_rate  \
0                                   1           1            10   0.099013   
1                                   5           2            20   0.220330   
2                                   3           3            30   0.377322   

   acc_rate  avg_daily_trips  conv_rate_plus_val1  conv_rate_plus_val2  
0  0.885061              807             1.099013            10.099013  
1  0.098932              532             2.220330            20.220330  
2  0.241187              323             3.377322            30.377322  


In [48]:
from pprint import pprint
from feast import FeatureStore

store = FeatureStore(repo_path=".")

feature_vector = store.get_online_features(
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
    ],
    entity_rows=[
        # {join_key: entity_value}
        {"driver_id": 1004},
        {"driver_id": 1005},
    ],
).to_dict()

pprint(feature_vector)

{'acc_rate': [0.3307599127292633, 0.6691997647285461],
 'avg_daily_trips': [871, 526],
 'conv_rate': [0.15946581959724426, 0.6906761527061462],
 'driver_id': [1004, 1005]}


In [52]:
from feast import FeatureView, Field, FileSource, Entity
from feast.types import Float32, Int64
from datetime import timedelta
from feast import FeatureService

driver = Entity(name="driver", join_keys=["driver_id"])

driver_stats_source = FileSource(
    name="driver_hourly_stats_source",
    path="data/driver_stats.parquet",  # Replace with your actual file path
    timestamp_field="event_timestamp",
    created_timestamp_column="created",
)

driver_stats_fv = FeatureView(
    name="driver_hourly_stats",
    entities=[driver],
    ttl=timedelta(days=1),
    schema=[
        Field(name="conv_rate", dtype=Float32),
        Field(name="avg_daily_trips", dtype=Int64),
    ],
    online=True,
    source=driver_stats_source,
)
driver_stats_fs = FeatureService(
    name="driver_activity_v1", features=[driver_stats_fv]
)


  driver = Entity(name="driver", join_keys=["driver_id"])


In [53]:
from pprint import pprint
from feast import FeatureStore

store = FeatureStore(repo_path=".")

feature_vector = store.get_online_features(
    features=[
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
    entity_rows=[
        # {join_key: entity_value}
        {
            "driver_id": 1001,
            "val_to_add": 1000,
            "val_to_add_2": 2000,
        },
        {
            "driver_id": 1002,
            "val_to_add": 1001,
            "val_to_add_2": 2002,
        },
    ],
).to_dict()

pprint(feature_vector)

{'acc_rate': [0.8471629023551941, 0.9603970050811768],
 'avg_daily_trips': [77, 550],
 'conv_rate_plus_val1': [1000.9774943590164, 1001.5112236142159],
 'conv_rate_plus_val2': [2000.9774943590164, 2002.5112236142159],
 'driver_id': [1001, 1002]}


In [None]:
from feast import FeatureService

driver_stats_fs = FeatureService(
    name="driver_activity_v1",
    features=[driver_hourly_stats_view]  # Make sure this variable is defined too!
)


NameError: name 'driver_hourly_stats_view' is not defined

{'acc_rate': [1.0, 0.08180499821901321],
 'avg_daily_trips': [1000, 568],
 'conv_rate': [1.0, 0.39903518557548523],
 'conv_rate_plus_val1': [1001.0, 1001.3990351855755],
 'conv_rate_plus_val2': [2001.0, 2002.3990351855755],
 'driver_id': [1001, 1002]}


In [8]:
from feast.data_source import PushMode

print("\n--- Simulate a stream event ingestion of the hourly stats df ---")
event_df = pd.DataFrame.from_dict(
    {
        "driver_id": [1001],
        "event_timestamp": [
            datetime(2021, 5, 13, 10, 59, 42),
        ],
        "created": [
            datetime(2021, 5, 13, 10, 59, 42),
        ],
        "conv_rate": [1.0],
        "acc_rate": [1.0],
        "avg_daily_trips": [1000],
    }
)
print(event_df)
store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE_AND_OFFLINE)


--- Simulate a stream event ingestion of the hourly stats df ---
   driver_id     event_timestamp             created  conv_rate  acc_rate  \
0       1001 2021-05-13 10:59:42 2021-05-13 10:59:42        1.0       1.0   

   avg_daily_trips  
0             1000  


PushSourceNotFoundException: Unable to find push source 'driver_stats_push_source'.