In [1]:
from workflow import fetch_online_features, fetch_historical_features_entity_df
import subprocess
from datetime import datetime

import pandas as pd
from feast import FeatureStore
from feast.data_source import PushMode

store = FeatureStore(repo_path=".")
# list(meth for meth in dir(store) if not meth.startswith("_"))

In [2]:
subprocess.run(["feast", "apply"])



Deploying infrastructure for tree_data_stats_fresh
Deploying infrastructure for tree_data_stats


CompletedProcess(args=['feast', 'apply'], returncode=0)

In [3]:
print("\n--- Historical features for training ---")
fetch_historical_features_entity_df(store, for_batch_scoring=False)




--- Historical features for training ---
   tree_id           event_timestamp root_stone root_grate root_other  \
0   536325 2024-06-28 00:00:00+00:00         No         No         No   
1   247341 2024-06-28 00:00:00+00:00         No         No        Yes   

         curb_loc trunk_wire trnk_light trnk_other brch_light  ...  \
0  OffsetFromCurb         No         No         No         No  ...   
1          OnCurb         No         No        Yes         No  ...   

  trnk_other_num brch_light_num brch_shoe_num brch_other_num curb_loc_num  \
0              0              0             0              0            0   
1              1              0             0              1            1   

  sidewalk_num steward_num  guards_num  problems_num  health_num  
0            0           0           0             1           2  
1            0           0           0             3           1  

[2 rows x 32 columns]


In [4]:
print("\n--- Load features into online store ---")
store.materialize_incremental(end_date=datetime.now())




--- Load features into online store ---
Materializing [1m[32m2[0m feature views to [1m[32m2024-06-28 17:37:39+01:00[0m into the [1m[32mcassandra[0m online store.

[1m[32mtree_data_stats[0m from [1m[32m2024-06-28 18:33:12+01:00[0m to [1m[32m2024-06-28 17:37:39+01:00[0m:


0it [00:00, ?it/s]

[1m[32mtree_data_stats_fresh[0m from [1m[32m2024-06-28 18:33:12+01:00[0m to [1m[32m2024-06-28 18:37:39+01:00[0m:



0it [00:00, ?it/s]


In [5]:
print("\n--- Online features ---")
fetch_online_features(store)




--- Online features ---
brch_light  :  ['No', 'No']
brch_light_num  :  [0, 0]
brch_other  :  ['No', 'Yes']
brch_other_num  :  [0, 1]
brch_shoe  :  ['No', 'No']
brch_shoe_num  :  [0, 0]
curb_loc  :  ['OffsetFromCurb', 'OnCurb']
curb_loc_num  :  [0, 1]
guards  :  [None, None]
health  :  ['Good', 'Fair']
problems  :  [None, 'RootOther,TrunkOther,BranchOther']
root_grate  :  ['No', 'No']
root_other  :  ['No', 'Yes']
root_stone  :  ['No', 'No']
sidewalk  :  ['NoDamage', 'NoDamage']
steward  :  [None, None]
tree_id  :  [536325, 247341]
trnk_light  :  ['No', 'No']
trnk_other  :  ['No', 'Yes']
trunk_wire  :  ['No', 'No']


In [6]:
print("\n--- Online features retrieved (instead) through a feature service---")
fetch_online_features(store, source="feature_service")


--- Online features retrieved (instead) through a feature service---




brch_light_num  :  [0, 0]
brch_other_num  :  [0, 1]
brch_shoe_num  :  [0, 0]
curb_loc_num  :  [0, 1]
guards_num  :  [0, 0]
health_num  :  [2, 1]
problems_num  :  [1, 3]
root_grate_num  :  [0, 0]
root_other_num  :  [0, 1]
root_stone_num  :  [0, 0]
sidewalk_num  :  [0, 0]
steward_num  :  [0, 0]
tree_dbh  :  [56, 16]
tree_id  :  [536325, 247341]
trnk_light_num  :  [0, 0]
trnk_other_num  :  [0, 1]
trunk_wire_num  :  [0, 0]


In [7]:
print(
    "--- Online features retrieved (using feature service v2, which uses a feature view with a push source---"
)
fetch_online_features(store, source="push")



--- Online features retrieved (using feature service v3, which uses a feature view with a push source---
borough  :  ['Staten Island', 'Queens']
brch_light  :  ['No', 'No']
brch_light_num  :  [0, 0]
brch_other  :  ['No', 'Yes']
brch_other_num  :  [0, 1]
brch_shoe  :  ['No', 'No']
brch_shoe_num  :  [0, 0]
curb_loc  :  ['OffsetFromCurb', 'OnCurb']
curb_loc_num  :  [0, 1]
guards  :  [None, None]
guards_num  :  [0, 0]
health  :  ['Good', 'Fair']
health_num  :  [2, 1]
problems  :  [None, 'RootOther,TrunkOther,BranchOther']
problems_num  :  [1, 3]
root_grate  :  ['No', 'No']
root_grate_num  :  [0, 0]
root_other  :  ['No', 'Yes']
root_other_num  :  [0, 1]
root_stone  :  ['No', 'No']
root_stone_num  :  [0, 0]
sidewalk  :  ['NoDamage', 'NoDamage']
sidewalk_num  :  [0, 0]
spc_common  :  ['ash', 'Norway maple']
steward  :  [None, None]
steward_num  :  [0, 0]
tree_dbh  :  [56, 16]
tree_id  :  [536325, 247341]
trnk_light  :  ['No', 'No']
trnk_light_num  :  [0, 0]
trnk_other  :  ['No', 'Yes']
trnk_o

In [8]:
print("\n--- Simulate a stream event ingestion of the stats df ---")
event_df = pd.DataFrame.from_dict(
    {
        "tree_id": [-1],
        "timestamp": [
            datetime.now(),
        ],
        "borough": ['Queens'],
        "brch_light": ['No'],
        "brch_other": ['No'],
        "brch_shoe": ['No'],
        "curb_loc": ['OffsetFromCurb'],
        "guards": [None],
        "problems": [None],
        "root_grate": ['No'],
        "root_other": ['No'],
        "root_stone": ['No'],
        "sidewalk": ['NoDamage'],
        "spc_common": ['ash'],
        "steward": [None],
        "tree_dbh": [56],
        "trnk_light": ['No'],
        "trnk_other": ['No'],
        "trunk_wire": ['No'],
        "user_type": ['NYC Parks Staff'],
        "zip_city": ['Staten Island'],
        "health": [None]
    }
)
print(event_df)
store.push("tree_data_stats_push_source", event_df, to=PushMode.ONLINE)




--- Simulate a stream event ingestion of the hourly stats df ---
   tree_id                  timestamp borough brch_light brch_other brch_shoe  \
0       -1 2024-06-28 17:37:49.030020  Queens         No         No        No   

         curb_loc guards problems root_grate  ...  sidewalk spc_common  \
0  OffsetFromCurb   None     None         No  ...  NoDamage        ash   

  steward tree_dbh trnk_light  trnk_other trunk_wire        user_type  \
0    None       56         No          No         No  NYC Parks Staff   

        zip_city health  
0  Staten Island   None  

[1 rows x 22 columns]


In [9]:
print("\n--- Online features again with updated values from a stream push---")
fetch_online_features(store, source="push")




--- Online features again with updated values from a stream push---
borough  :  ['Staten Island', 'Queens']
brch_light  :  ['No', 'No']
brch_light_num  :  [0, 0]
brch_other  :  ['No', 'Yes']
brch_other_num  :  [0, 1]
brch_shoe  :  ['No', 'No']
brch_shoe_num  :  [0, 0]
curb_loc  :  ['OffsetFromCurb', 'OnCurb']
curb_loc_num  :  [0, 1]
guards  :  [None, None]
guards_num  :  [0, 0]
health  :  ['Good', 'Fair']
health_num  :  [2, 1]
problems  :  [None, 'RootOther,TrunkOther,BranchOther']
problems_num  :  [1, 3]
root_grate  :  ['No', 'No']
root_grate_num  :  [0, 0]
root_other  :  ['No', 'Yes']
root_other_num  :  [0, 1]
root_stone  :  ['No', 'No']
root_stone_num  :  [0, 0]
sidewalk  :  ['NoDamage', 'NoDamage']
sidewalk_num  :  [0, 0]
spc_common  :  ['ash', 'Norway maple']
steward  :  [None, None]
steward_num  :  [0, 0]
tree_dbh  :  [56, 16]
tree_id  :  [536325, 247341]
trnk_light  :  ['No', 'No']
trnk_light_num  :  [0, 0]
trnk_other  :  ['No', 'Yes']
trnk_other_num  :  [0, 1]
trunk_wire  :  [

In [10]:
fetch_online_features(store, source="feature_service")



brch_light_num  :  [0, 0]
brch_other_num  :  [0, 1]
brch_shoe_num  :  [0, 0]
curb_loc_num  :  [0, 1]
guards_num  :  [0, 0]
health_num  :  [2, 1]
problems_num  :  [1, 3]
root_grate_num  :  [0, 0]
root_other_num  :  [0, 1]
root_stone_num  :  [0, 0]
sidewalk_num  :  [0, 0]
steward_num  :  [0, 0]
tree_dbh  :  [56, 16]
tree_id  :  [536325, 247341]
trnk_light_num  :  [0, 0]
trnk_other_num  :  [0, 1]
trunk_wire_num  :  [0, 0]


In [11]:
print("\n--- Run feast teardown ---")
subprocess.run(["feast", "teardown"])


--- Run feast teardown ---




CompletedProcess(args=['feast', 'teardown'], returncode=0)