# dataset
Generate dataset for the recommendation system plroblem

In [104]:
!python3 dataset_gen.py

Users sample:
   user_id  age gender signup_date          preferences
0        1   56      F  2023-09-28  Home,Books,Clothing
1        2   36      M  2023-03-29          Electronics
2        3   19      F  2023-10-21       Clothing,Books
3        4   39      M  2023-12-11          Electronics
4        5   45  Other  2023-09-28    Electronics,Books

Items sample:
   item_id  category  subcategory  ...  new_arrival  on_sale        arrival_date
0        1    Sports  Team Sports  ...        False    False 2023-03-13 04:48:00
1        2      Home      Kitchen  ...        False     True 2023-08-31 09:14:00
2        3     Books      Science  ...        False    False 2023-01-02 12:40:00
3        4  Clothing  Accessories  ...        False    False 2023-05-08 06:41:00
4        5    Sports  Team Sports  ...        False    False 2023-05-27 08:28:00

[5 rows x 10 columns]

Interactions sample:
   interaction_id  user_id  item_id  ... interaction_type rating  quantity
0               1      382   

# Setup Feature Store

We run the `feast apply` command to register the data

In [84]:
!cd feature_repo/ ; feast plan

  DUMMY_ENTITY = Entity(
No project found in the repository. Using project name feast_edb_rec_sys defined in feature_store.yaml
  entity = cls(
  entity = cls(
  entity = cls(
Updated feature view [1m[33mitem_embedding[0m
	batch_source: [1m[33mname: "data/recommendation_items.parquet"
type: BATCH_FILE
timestamp_field: "arrival_date"
data_source_class_type: "feast.infra.offline_stores.file_source.FileSource"
file_options {
  file_format {
    parquet_format {
    }
  }
  uri: "data/recommendation_items.parquet"
}
[0m -> [1m[92mname: "data/dummy_item_embed.parquet"
type: BATCH_FILE
timestamp_field: "timestamp"
data_source_class_type: "feast.infra.offline_stores.file_source.FileSource"
file_options {
  file_format {
    parquet_format {
    }
  }
  uri: "data/dummy_item_embed.parquet"
}
[0m
	stream_source: [1m[33mname: "item_embed_push_source"
type: PUSH_SOURCE
data_source_class_type: "feast.data_source.PushSource"
batch_source {
  name: "data/recommendation_items.parquet"
  ty

In [89]:
!cd feature_repo/ ; feast apply 

  DUMMY_ENTITY = Entity(
No project found in the repository. Using project name feast_edb_rec_sys defined in feature_store.yaml
Applying changes for project feast_edb_rec_sys
Created project [1m[32mfeast_edb_rec_sys[0m
Created entity [1m[32muser[0m
Created entity [1m[32mitem[0m
Created feature view [1m[32muser_features[0m
Created feature view [1m[32mitem_embedding[0m
Created feature view [1m[32mitem_features[0m
Created feature view [1m[32muser_embedding[0m
Created feature view [1m[32minteractions_features[0m
Created feature service [1m[32minteraction_service[0m
Created feature service [1m[32mmodel_v1[0m
Created feature service [1m[32mitem_service[0m
Created feature service [1m[32mmodel_v2[0m
Created feature service [1m[32muser_service[0m

Created sqlite table [1m[32mfeast_edb_rec_sys_interactions_features[0m
Created sqlite table [1m[32mfeast_edb_rec_sys_item_features[0m
Created sqlite table [1m[32mfeast_edb_rec_sys_user_features[0m
Create

In [90]:
from feast import FeatureStore
from datetime import datetime, timedelta

store = FeatureStore(repo_path="feature_repo/")

# Generating datasets using Feast

In [91]:
from feast import FeatureService
import pandas as pd
from itertools import product
# load feature services
item_service = store.get_feature_service("item_service")
user_service = store.get_feature_service("user_service")
interaction_service = store.get_feature_service("interaction_service")

user_ids = list(range(1, 1_000))
item_ids = list(range(1, 5_000))

# select which entities to use
item_entity_df = pd.DataFrame.from_dict(
    {
        'item_id': item_ids,
        'timestamp': [datetime(2025, 1, 1)] * len(item_ids) 
    }
)
user_entity_df = pd.DataFrame.from_dict(
    {
        'user_id': user_ids,
        'timestamp': [datetime(2025, 1, 1)] * len(user_ids) 
    }
)
item_user_interactions_df = pd.read_parquet('./feature_repo/data/interactions_item_user_ids.parquet')
item_user_interactions_df['timestamp'] = datetime(2025, 1, 1)

# retrive datasets for training
item_df = store.get_historical_features(entity_df=item_entity_df, features=item_service).to_df()
user_df = store.get_historical_features(entity_df=user_entity_df, features=user_service).to_df()
interaction_df = store.get_historical_features(entity_df=item_user_interactions_df, features=interaction_service).to_df()

Using timestamp as the event timestamp. To specify a column explicitly, please name it event_timestamp.
Using timestamp as the event timestamp. To specify a column explicitly, please name it event_timestamp.
Using timestamp as the event timestamp. To specify a column explicitly, please name it event_timestamp.


# Training

In [None]:
from models import ItemEncoder, UserEncoder, TwoTowerModel, train_two_tower
dim = 512

item_encoder = ItemEncoder(dim)
user_encoder = UserEncoder(dim)
two_tower_model = TwoTowerModel(item_encoder=item_encoder, user_encoder=user_encoder)
train_two_tower(two_tower_model, item_df, user_df, interaction_df)

# Batch scoring
Encode the items and users vector representation

In [None]:
embeded_items = item_encoder(item_df)
item_df['embedding'] = embeded_items
embeded_users = user_encoder(user_df)
user_df['embedding'] = embeded_users

# Push the new embedding to the offline and online store
store.push('user_embed_push_source', item_df)
store.push('item_embed_push_source', user_df)
# store.push('user_embed_push_source', item_df[['item_id', 'embedding']])
# store.push('item_embed_push_source', user_df[['user_id', 'embedding']])

In [None]:
from feast.data_source import PushMode
import numpy as np
# embeded_items = item_encoder(item_df)
# embeded_users = user_encoder(user_df)
item_embed_df = item_df[['item_id']]
user_embed_df = user_df[['user_id']]

item_embed_df['embedding'] = [[1.1, 2.2]] * len(item_embed_df)
user_embed_df['embedding'] = [[1.1, 2.2]] * len(user_embed_df)

item_embed_df['timestamp'] = datetime.now()
user_embed_df['timestamp'] = datetime.now()

In [102]:
item_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4999 entries, 0 to 4998
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   item_id       4999 non-null   int64         
 1   timestamp     4999 non-null   datetime64[us]
 2   category      4999 non-null   object        
 3   subcategory   4999 non-null   object        
 4   price         4999 non-null   float64       
 5   avg_rating    4999 non-null   float64       
 6   num_ratings   4999 non-null   int64         
 7   popular       4999 non-null   bool          
 8   new_arrival   4999 non-null   bool          
 9   on_sale       4999 non-null   bool          
 10  embedding     4999 non-null   object        
 11  arrival_date  4999 non-null   datetime64[us]
dtypes: bool(3), datetime64[us](2), float64(2), int64(2), object(3)
memory usage: 366.3+ KB


In [100]:
# Push the new embedding to the offline and online store
store.push('item_embed_push_source', item_embed_df, to=PushMode.ONLINE_AND_OFFLINE)
store.push('user_embed_push_source', user_embed_df, to=PushMode.ONLINE_AND_OFFLINE)
# store.push('user_embed_push_source', item_df[['item_id', 'embedding']])
# store.push('item_embed_push_source', user_df[['user_id', 'embedding']])

ArrowNotImplementedError: Unsupported cast from int64 to null using function cast_null

In [99]:
item_embed_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4999 entries, 0 to 4998
Data columns (total 3 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   item_id    4999 non-null   int64         
 1   embedding  4999 non-null   object        
 2   timestamp  4999 non-null   datetime64[us]
dtypes: datetime64[us](1), int64(1), object(1)
memory usage: 117.3+ KB


In [53]:
item_df['embedding']

0       [1.1, 2.2]
1       [1.1, 2.2]
2       [1.1, 2.2]
3       [1.1, 2.2]
4       [1.1, 2.2]
           ...    
4994    [1.1, 2.2]
4995    [1.1, 2.2]
4996    [1.1, 2.2]
4997    [1.1, 2.2]
4998    [1.1, 2.2]
Name: embedding, Length: 4999, dtype: object

# Materialize
Materialization generates the latest values for each entity key in the online store and creates a time-based index to enhance retrieval speed.
The `materialize-incremental `command materializes the offline store initially and, on subsequent runs, ingests only new data and updates the store.

In [None]:
store.materialize_incremental(datetime.now() - timedelta(days=365 * 5), feature_views=['item_embedding', 'user_embedding'])

# Inferencing

## Existing User Case

## New User Case


In [None]:
from feast import FeatureStore

store = FeatureStore(repo_path=".")

import pandas as pd

# Get the latest feature values for unique entities
entity_df = pd.DataFrame.from_dict({"driver_id": [1001, 1002, 1003, 1004, 1005],})
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)
training_df = store.get_historical_features(
    entity_df=entity_df, features=store.get_feature_service("model_v2"),
).to_df()

# Make batch predictions
# predictions = model.predict(training_df)
print(training_df)
