## How-to guide Streaming Feature Store on the Abacus.AI platform
This notebook provides you with a hands on environment to build and deploy a feature store using Abacus.AI

We'll be using the [Retail Interaction Logs](https://s3.amazonaws.com/abacusai.exampledatasets/pers_promotion/events.csv) and [Item Categories](https://s3.amazonaws.com/abacusai.exampledatasets/pers_promotion/item_categories.csv) datasets, which contain information about user interactions and item attributes.

1. Install the Abacus.AI library.

In [None]:
!pip install abacusai
!pip install fsspec
!pip install s3fs

2. Add your Abacus.AI [API Key](https://abacus.ai/app/profile/apikey) generated using the API dashboard as follows:

In [None]:
#@title Abacus.AI API Key

api_key = ''  #@param {type: "string"}

3. Import the Abacus.AI library and instantiate a client.

In [None]:
from abacusai import ApiClient, ApiException
client = ApiClient(api_key)

## 1. Create a Project

In this notebook, we're going to create and deploy a feature store that automatically featurizes input data using the Item Categorials and a streamed retail interactions log.

In [None]:
project = client.create_project(name='Demo Feature Store Streaming Project', use_case='FEATURE_STORE')

## 2. Creating Datasets

Using the Create Dataset API, we can tell Abacus.AI the public S3 URI of where to find our batch dataset.
- [Items Dataset](https://s3.amazonaws.com/abacusai.exampledatasets/pers_promotion/item_categories.csv)
This dataset contains information about the item categories.



### Data Preview


In [None]:
import pandas as pd
pd.read_csv('s3://abacusai.exampledatasets/pers_promotion/item_categories.csv')

### Add the datasets to Abacus.AI


Using the Create Dataset API, we can tell Abacus.AI the public S3 URI of where to find the datasets.



In [None]:
# if the datasets already exist, skip creation
try: 
  items_dataset = client.describe_dataset(client.describe_feature_group_by_table_name('items_categories').dataset_id)
  streaming_dataset_items = client.describe_dataset(client.describe_feature_group_by_table_name('streaming_item_interactions').dataset_id)
  batch_events_dataset = client.describe_dataset(client.describe_feature_group_by_table_name('events_batch_data').dataset_id)
except ApiException: # datasets not found
  items_dataset = client.create_dataset_from_file_connector(name='Items Dataset', table_name='items_categories', location='s3://abacusai.exampledatasets/pers_promotion/item_categories.csv')
  streaming_dataset_items = client.create_streaming_dataset(name='Item Interactions', 
                                                            table_name='streaming_item_interactions')
  batch_events_dataset = client.create_dataset_from_file_connector(name='Item Interactions Batch Data', 
                                                                 table_name='events_batch_data', 
                                                                 location='s3://abacusai.exampledatasets/pers_promotion/events.csv')
  items_dataset.wait_for_inspection()
  batch_events_dataset.wait_for_inspection()

streaming_feature_group = client.describe_feature_group_by_table_name(table_name=streaming_dataset_items.feature_group_table_name)
items_feature_group = client.describe_feature_group_by_table_name(table_name=items_dataset.feature_group_table_name)
batch_feature_group = client.describe_feature_group_by_table_name(table_name=batch_events_dataset.feature_group_table_name)

items_feature_group.add_to_project(project.project_id)
streaming_feature_group.add_to_project(project.project_id)
batch_feature_group.add_to_project(project.project_id)



In [None]:
items_feature_group.set_indexing_config(primary_key='itemid')
batch_feature_group.set_indexing_config(update_timestamp_key='timestamp')
streaming_dataset_items.set_streaming_retention_policy(retention_hours=48, retention_row_count=2_000_000_000)
streaming_feature_group.set_schema([{'name': 'timestamp', 'dataType': 'DATETIME'}, 
                                    {'name': 'itemid', 'dataType': 'STRING'}, 
                                    {'name': 'event', 'dataType': 'STRING'}, 
                                    {'name': 'visitorid', 'dataType': 'STRING'}])
streaming_feature_group.set_indexing_config(lookup_keys=['visitorid'], update_timestamp_key='timestamp')


### Stream data to the streaming dataset

We'll first test streaming some mock data, invalidate that, then upload real data.

In [None]:
import time
streaming_tokens = client.list_streaming_tokens()
if streaming_tokens:
  streaming_token = streaming_tokens[0].streaming_token
else:
  streaming_token = client.create_streaming_token().streaming_token

In [None]:
streaming_feature_group.append_data(streaming_token=streaming_token, 
                                    data={'visitorid': '123', 'itemid': '1', 'event': 'click', 'timestamp': time.time()})

In [None]:
streaming_feature_group.invalidate_streaming_data(invalid_before_timestamp=time.time())

To get some real data, we're going to load the data from a batch dataset locally and stream it all into the new streaming dataset.

In [None]:
import pandas as pd
raw_event_data = pd.read_csv('s3://abacusai.exampledatasets/pers_promotion/events.csv')
raw_event_data['visitorid'] = raw_event_data.index
raw_event_data

In [None]:
from numpy import isnan
for row in raw_event_data.head(10).to_dict(orient="records"):
    row['event_timestamp'] =  time.time()
    row['visitorid'] = str(row['visitorid'])
    if isnan(row['transactionid']):
        row['transactionid'] = None
    streaming_feature_group.append_data(streaming_token=streaming_token, data=row)

Verify recently streamed data

In [None]:
streaming_feature_group.get_recent_streamed_data()

### Concatenate Batch with streaming

If you have a batch dataset like above, it's more efficient to load it into Abacus.AI first and the streaming feature group data into the batch dataset (or vice versa).

In [None]:
from datetime import datetime
batch_feature_group.concatenate_data(streaming_feature_group.feature_group_id, 
                                         merge_type='UNION', 
                                         replace_until_timestamp=datetime(2021, 9, 1).timestamp())


## Join Metadata with Feature Group Data

We can join batch with streaming either with SQL or a python feature group

### Python

In [None]:
function_code = '''
import pandas as pd

def join_tables(events_df, items_df):
    return pd.merge(events_df, items_df, on='itemid')
'''
python_feature_group = client.create_feature_group_from_function(table_name='python_interactions_joined_items', 
                                                                 function_source_code=function_code, 
                                                                 function_name='join_tables', 
                                                                 input_feature_groups=['events_batch_data', 'items_categories'])
python_feature_group.add_to_project(project.project_id)
python_feature_group.set_schema([{'name': 'timestamp', 'dataType': 'DATETIME'}, 
                                    {'name': 'itemid', 'dataType': 'STRING'}, 
                                    {'name': 'categoryid', 'dataType': 'STRING'}, 
                                    {'name': 'visitorid', 'dataType': 'STRING'},
                                    {'name': 'event', 'dataType': 'STRING'},
                                    {'name': 'transactionid', 'dataType': 'STRING'}])
python_feature_group.set_indexing_config(lookup_keys=['visitorid'], update_timestamp_key='timestamp')


### SQL

In [None]:
feature_group = client.create_feature_group(table_name='interactions_joined_items', sql='SELECT * FROM events_batch_data JOIN items_categories USING (itemid)')
feature_group.set_indexing_config(lookup_keys=['visitorid'])
feature_group.add_to_project(project.project_id)

### Materialize Feature Group Data

In [None]:
feature_group_version = feature_group.create_version()
feature_group_version.wait_for_results()

In [None]:
feature_group_version.load_as_pandas()

### Deploy feature group for online featurization

In [None]:
deployment_token = client.create_deployment_token(project_id=project.project_id).deployment_token
deployment = client.create_deployment(feature_group_id=feature_group.feature_group_id, project_id=project.project_id) 
deployment.wait_for_deployment()

In [None]:
client.lookup_features(deployment_id=deployment.deployment_id, deployment_token=deployment_token, query_data={'visitorid': ['466806', '273888']})