# Batch Ingestion
This notebook reads the raw data from an S3 bucket, transforms it for ingestion into SageMaker Feature Store and then ingests it into an offline+online Feature Store. Refer [Official SageMaker FeatureStore documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store.html) and [Python SDK](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html).

We create two feature groups in this notebook:
1. An offline+online feature group for customer inputs that is used for ML model training.
2. An offline+online feature group for the destinations features, this is used both for ML model training and real-time inference.

**Note:** Please set kernel to `conda_python3` for this notebook and select instance to `ml.t3.2xlarge` as part of user inputs to the CloudFormation template.

## Imports

In [None]:
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker import get_execution_role
from sagemaker.session import Session
from datetime import datetime
from pathlib import Path
import pandas as pd
import sagemaker
import logging
import boto3
import time
import sys
import os

In [None]:
# import from a different path
sys.path.insert(0, '../utils')
path = Path(os.path.abspath(os.getcwd()))
package_dir = f'{str(path.parent)}/utils'
print(package_dir)
import utils

## Setup Logging

In [None]:
logger = logging.getLogger('__name__')
logging.basicConfig(format="%(asctime)s,%(filename)s,%(funcName)s,%(lineno)s,%(levelname)s,p%(process)s,%(message)s", level=logging.INFO)       
logger.info(f'Using SageMaker version: {sagemaker.__version__}')
logger.info(f'Using Pandas version: {pd.__version__}')

## Global Constants

In [None]:
# global constants
STACK_NAME = "expedia-feature-store-demo-v2"

# number of worker processes to use for batch ingesting data into feature store
MAX_WORKERS = 8

# number of principal components to keep for the destinations dataset
PC_TO_KEEP = 3

# this is a sagemaker limit
MAX_ALLOWED_FEATURE_GROUPS = 100

## Setup Config Variables
Read the config variables used by this notebook from the cloud formation outputs and parameters.

In [None]:
# read output variables from cloud formation stack, these will be used as parameters throughout
# the code
data_bucket_name = utils.get_cfn_stack_outputs(STACK_NAME, 'DataBucketName')
athena_query_results_bucket_name = utils.get_cfn_stack_outputs(STACK_NAME, 'AthenaQueryResultsBucketName')
feature_store_bucket_name = utils.get_cfn_stack_outputs(STACK_NAME, 'FeatureStoreBucketName')
logger.info(f"data_bucket_name={data_bucket_name},\nathena_query_results_bucket_name={athena_query_results_bucket_name},\nfeature_store_bucket_name={feature_store_bucket_name}")

In [None]:
# read params from cloud formation stack. The cloud formation stack provided a convenient
# way to provide configuration parameters for a notebook workflow without having to use
# parameter store or other services for providing config.
customer_inputs_fg_name = utils.get_cfn_stack_parameters(STACK_NAME, 'CustomerInputFeatureGroupName')
destinations_fg_name = utils.get_cfn_stack_parameters(STACK_NAME, 'DestinationsFeatureGroupName')
app_name = utils.get_cfn_stack_parameters(STACK_NAME, 'AppName')

always_recreate_fg = utils.get_cfn_stack_parameters(STACK_NAME, 'AlwaysRecreateFeatureGroup')
always_recreate_fg = True if always_recreate_fg == "true" else False

raw_data_dir = utils.get_cfn_stack_parameters(STACK_NAME, 'RawDataDir')
training_dataset_fname = utils.get_cfn_stack_parameters(STACK_NAME, 'TrainingDatasetFileName')
test_dataset_fname = utils.get_cfn_stack_parameters(STACK_NAME, 'TestDatasetFileName')
destination_features_fname = utils.get_cfn_stack_parameters(STACK_NAME, 'DestinationFeaturesFileName')

# If an existing feature group by the same name is not going to be deleted then
# append a unique suffix to the feature group name to create a new unique feature group name
if always_recreate_fg is False:
    dttm = datetime.now()
    suffix = f"{dttm.year}-{dttm.month}-{dttm.day}-{dttm.hour}-{dttm.minute}"
    customer_inputs_fg_name = f"{customer_inputs_fg_name}-{suffix}"
    destinations_fg_name = f"{destinations_fg_name}-{suffix}"

# log all params debugging help
logger.info(f"customer_inputs_fg_name={customer_inputs_fg_name},\ndestinations_fg_name={destinations_fg_name}\ndestination_features_fname={destination_features_fname}\n"
            f"always_recreate_fg={always_recreate_fg},\n"
            f"raw_data_dir={raw_data_dir},\ntraining_dataset_fname={training_dataset_fname},\n"
            f"test_dataset_fname={test_dataset_fname}, app_name={app_name}")

## Read raw data from S3 bucket
The raw data exists in an S3 bucket. Note that the data upload to the S3 bucket in the raw data directory (typicall raw_data) needs to be done manually prior to running this step. The data is read directly using the Pandas read_csv method. In another version of this code, Pandas will be replaced with Pyspark.

We read two datasets here:
1. The customer inputs datasets from the train.csv file that represents customers looking up hotels via the Expedia website.
2. The destination features dataset from destinations.csv that represents embeddings for each destination, this will be joined with the customer input dataset at the time of model training.

In [None]:
# read data from the bucket in a pandas dataframe, this will be ingested in the feature store
s3a_uri = f"s3a://{data_bucket_name}/{raw_data_dir}/{training_dataset_fname}"
df = pd.read_csv(s3a_uri)
logger.info(f"shape of the dataframe read from {s3a_uri} is {df.shape}")

# drop rows with NA
df_customer_inputs = df.dropna()
logger.info(f"shape of the dataframe after dropna is {df_customer_inputs.shape}")
display(df_customer_inputs.head())

In [None]:
# read data from the bucket in a pandas dataframe, this will be ingested in the feature store
s3a_uri = f"s3a://{data_bucket_name}/{raw_data_dir}/{destination_features_fname}"
df_destinations = pd.read_csv(s3a_uri)
logger.info(f"shape of the dataframe read from {s3a_uri} is {df_destinations.shape}")

# drop rows with NA
df_destinations = df_destinations.dropna()
logger.info(f"shape of the dataframe after dropna is {df_destinations.shape}")
display(df_destinations.head())

## Data Transformation for Ingesting Into Feature Store
Before this data can be ingested into the SageMaker FeatureStore, certain transformations need to be done.

1. The date_time field which will be used as "Event Time" need to be converted to the ISO-8601 format i.e. YYYY-MM-DDTHH:MM:SSZ.
2. The user_id field which will be used for "Record Identifier" needs to be converted to string.
3. All "object" type fields need to be converted to string.

In [None]:
# convert to datetime first
df_customer_inputs.date_time = pd.to_datetime(df_customer_inputs.date_time)

# the above returns (for example) 2015-09-03 17:09:54, change this to 2015-09-03T17:09:54Z
# The dataset documentation does not mention the timezone of the date_time so will just assume it to be UTC.
df_customer_inputs.date_time = df_customer_inputs.date_time.map(lambda x: x.isoformat() + 'Z')

In [None]:
# Convert user_id to string
df_customer_inputs.user_id = df_customer_inputs.user_id.astype("string")

# destination id as well since this is going to be used as a key in the feature group for the destinations data
# and the feature group record identifier can only be a string, BUT this is not the destinations table this is
# the customer inputs table...so what gives..well, the customer inputs and destinations would be joined at the
# time of model training and instead of doing a cast there, let's just do it here.
df_customer_inputs.srch_destination_id = df_customer_inputs.srch_destination_id.astype("string")

In [None]:
# only keep rows where is_booking == 1 because we are only concerned with events when the user actually booked a hotel and that is also what the test data contains. 
if "is_booking" in df_customer_inputs.columns:
    df_customer_inputs = df_customer_inputs[df_customer_inputs.is_booking == 1]
    logger.info(f"after removing all is_booking != 1 rows, shape of dataframe {df_customer_inputs.shape}")

### Create derived features
These features can then be stored in the Feature Store and be used for training the model. This is the advantage of having a feature store, these derived features would now be available ready to use when we want to train an ML model, any model whether it is the one being created in this repo or for a new future use-case.

In [None]:
# create derived features

# duration of the trip for which the hotel booking is needed seems to be intituively important
df_customer_inputs['duration'] = (pd.to_datetime(df_customer_inputs.srch_co, errors='coerce') - pd.to_datetime(df_customer_inputs.srch_ci, errors='coerce')).astype('timedelta64[D]')

# how far is the trip from the time when the user was looking up the Expedia website
df_customer_inputs['days_to_trip'] = (pd.to_datetime(df.srch_ci, errors='coerce') - pd.to_datetime(df_customer_inputs.date_time, errors='coerce').dt.tz_localize(None)).astype('timedelta64[D]')

# is the start or end of the trip on a weekend?
df_customer_inputs['start_of_trip_weekend'] = (pd.to_datetime(df_customer_inputs.srch_ci, errors='coerce').dt.weekday >= 5).astype(int)
df_customer_inputs['end_of_trip_weekend'] = (pd.to_datetime(df_customer_inputs.srch_co, errors='coerce').dt.weekday >= 5).astype(int)


In [None]:
# convert any "object" type columns to string
utils.cast_object_to_string(df_customer_inputs)

In [None]:
df_customer_inputs.head()

In [None]:
# reduce the size of the dataset to make it more manageable for this demo
unique_user_id = list(df_customer_inputs.user_id.unique())
num_unique_user_ids = len(unique_user_id)
logger.info(f"there are {len(unique_user_id)} user_ids in the dataset")

# select 1% of the unique users
import random
FRACTION_OF_USER_IDS_TO_KEEP = 0.01
if FRACTION_OF_USER_IDS_TO_KEEP != 1:
    fraction_of_unique_user_ids = random.sample(unique_user_id, int(num_unique_user_ids*FRACTION_OF_USER_IDS_TO_KEEP))
    df_customer_inputs = df_customer_inputs[df_customer_inputs.user_id.isin(fraction_of_unique_user_ids)]
    logger.info(f"after filtering dataframe to keep {100*FRACTION_OF_USER_IDS_TO_KEEP}% of all user_ids, dataframe shape is {df_customer_inputs.shape}")

## Initialize SageMaker and FeatureStore Runtime

In [None]:
role = get_execution_role()
region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)

featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

account_id = boto3.client('sts').get_caller_identity()["Account"]
logger.info(f"role={role}, region={region}, account_id={account_id}")



In [None]:
# Create a feature store session object
feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

## Cleanup Existing FeatureGroup (if needed)
To allow running this notebook multiple time and not create a new feature group on every run we have a config parameter which controls whether or not to delete existing feature group by the same name. If the always recreate feature group param is set to false then a new feature group is created by suffixing the current datetime to the configured feature group name.

In [None]:
# get a list of feature groups
fg_list = sagemaker_client.list_feature_groups()
num_feature_groups = len(fg_list['FeatureGroupSummaries'])
if num_feature_groups == MAX_ALLOWED_FEATURE_GROUPS:
    logger.error(f"number fo already existing feature groups is {num_feature_groups}, no more feature groups can be created, delete some feature groups and try again")
logger.info(f"there are {num_feature_groups} feature groups")
logger.info(fg_list)
# if the feature group list is not empty and always recreate feature groups is set to True then delete existing feature group
if always_recreate_fg is True and len(fg_list['FeatureGroupSummaries']) > 0:
    logger.warning(f"always_recreate_fg is True, going to delete feature groups")
    _ = [sagemaker_client.delete_feature_group(FeatureGroupName=fg['FeatureGroupName']) for fg in fg_list['FeatureGroupSummaries'] if fg['FeatureGroupName'] in [customer_inputs_fg_name, destinations_fg_name]]
    time.sleep(5)

# Create Feature Group
Create a Feature Group and then set the schema from the feature group using the existing dataframe that contains the transformed data (already amenable for ingestion into feature store.)

In [None]:
feature_group = FeatureGroup(name=customer_inputs_fg_name, sagemaker_session=feature_store_session)
feature_group.load_feature_definitions(data_frame=df_customer_inputs)

This is the actual feature group creation step. Note that we usually always want to create an **online + offline feature store**. Online because we want to use it for real time predictions and offline because we want to use it for model training. While in this particular use case, a separate test dataset is provided so an online datastore is much more relevant for the tedt dataset rather than the training dataset, neverthless an offline+online datastore here does not hurt.

In [None]:
feature_group.create(
    s3_uri=f"s3://{feature_store_bucket_name}/{customer_inputs_fg_name}",
    record_identifier_name="user_id",
    event_time_feature_name="date_time",
    role_arn=role,
    enable_online_store=True,
    tags=[{'Key':'project','Value':'expedia-feature-store-demo'}]
)

In [None]:
utils.check_feature_group_status(feature_group)

In [None]:
#Ingest features into the feature group
# actually batch ingest the data into the feature store now
logger.info(f"about to begin ingestion of data into feature store, max_workers={MAX_WORKERS}")
feature_group.ingest(
    data_frame=df_customer_inputs, max_workers=MAX_WORKERS, wait=True
)

## Query ingested data from the "Online" feature store
This should immediately return the results.

In [None]:
# Use batch-get_record
record_identifier_values = list((df_customer_inputs.user_id.unique()))[:2]
response=featurestore_runtime.batch_get_record(
    Identifiers=[
        {"FeatureGroupName": customer_inputs_fg_name, "RecordIdentifiersValueAsString": record_identifier_values}
    ]
)
response

## Query ingested data from the "Offline" feature store
The offline featrure store is queried using Athena. The feature store object has an Athena query method that is used to construct a query.

**Note:** It could be several minutes (upto 15) until the data is ingested and available for querying.

In [None]:
# add a 1 minute sleep to wait for at least some data to show up in the offline feature store
# time.sleep(60)

# the feature group provided a convenient Athena object to query the offline feature store data
query = feature_group.athena_query()
customers_fg_table = query.table_name
logger.info(f"Athena table -> fg_table={customers_fg_table}")

In [None]:
query_string = f'SELECT * FROM "{customers_fg_table}" limit 10'
output_location=f's3://{athena_query_results_bucket_name}/{customer_inputs_fg_name}/query_results/'
logger.info(f"going to run this query -> {query_string} and store the results in {output_location}")

# run the query
query.run(query_string=query_string, output_location=output_location)

# wait for the results
query.wait()
df_fg = query.as_dataframe()

# results
df_fg.head()

## Save variables for next stage

In [None]:
# write feature group names and query_string to a file, used when generating lineage
utils.write_param("customer_inputs_fg_name", customer_inputs_fg_name)
utils.write_param("customer_inputs_fg_table", customers_fg_table)
utils.write_param("customer_inputs_fg_query_string", query_string)


## Create feature group for the destination features
We first do PCA on the destinations features to reduce it to 3 features and then store the principal components in a separate feature group of their own.


In [None]:
# reduce dimensions of destination folder
from sklearn.decomposition import PCA

# the number of principal components to keep is just set to 3 here since this is a demo
# but in an actual production model this would be determined by examining a scree plot/variance explained rule/other critiera
pca = PCA(n_components=PC_TO_KEEP)

In [None]:
# all columns except the src_destination_id 
cols_to_use = [c for c in df_destinations.columns if c != 'srch_destination_id']
destinations_pca = pca.fit_transform(df_destinations[cols_to_use])
df_destinations_pca = pd.DataFrame(destinations_pca, columns=[f'pc{x}' for x in range(1, (PC_TO_KEEP+1))])

# typecasting the destination id to string since this is going to be used as the record identifier in the feature store
# which has to be a string
df_destinations_pca["srch_destination_id"] = df_destinations["srch_destination_id"].astype('string')

# since there is no date time associated with these features in the input dataset so just use the current datetime
from datetime import datetime
# datetime.utcnow().isoformat() will return something like '2022-06-07T22:08:19.399890', need to
# trunchate it to yyyy-MM-dd'T'HH:mm:ss format to make it work with sagemaker feature store
datetime_iso8601_now = f"{datetime.utcnow().isoformat().split('.')[0]}Z"
df_destinations_pca["date_time"] = datetime_iso8601_now

In [None]:
df_destinations_pca.head()

In [None]:
df_destinations_pca['date_time'] = df_destinations_pca['date_time'].astype('string')

In [None]:
df_destinations_pca.dtypes

In [None]:
feature_group = FeatureGroup(name=destinations_fg_name, sagemaker_session=feature_store_session)
feature_group.load_feature_definitions(data_frame=df_destinations_pca)

In [None]:
feature_group.create(
    s3_uri=f"s3://{feature_store_bucket_name}/{customer_inputs_fg_name}",
    record_identifier_name="srch_destination_id",
    event_time_feature_name="date_time",
    role_arn=role,
    enable_online_store=True,
    tags=[{'Key':'AppName','Value':app_name}]
)

In [None]:
utils.check_feature_group_status(feature_group)

In [None]:
#Ingest features into the feature group
# actually batch ingest the data into the feature store now
logger.info(f"about to begin ingestion of data into feature store, max_workers={MAX_WORKERS}")
feature_group.ingest(
    data_frame=df_destinations_pca, max_workers=MAX_WORKERS, wait=True
)

In [None]:
# Use batch-get_record
record_identifier_values = list((df_destinations_pca.srch_destination_id.unique()))[:2]
response=featurestore_runtime.batch_get_record(
    Identifiers=[
        {"FeatureGroupName": destinations_fg_name, "RecordIdentifiersValueAsString": record_identifier_values}
    ]
)
response

In [None]:
# add a 1 minute sleep to wait for at least some data to show up in the offline feature store
# time.sleep(60)

# the feature group provided a convenient Athena object to query the offline feature store data
query = feature_group.athena_query()
destinations_fg_table = query.table_name
logger.info(f"Athena table -> fg_table={destinations_fg_table}")

In [None]:
query_string = f'SELECT * FROM "{destinations_fg_table}" limit 10'
utils.write_param("destinations_fg_table", destinations_fg_table)
output_location=f's3://{athena_query_results_bucket_name}/{destinations_fg_name}/query_results/'
logger.info(f"going to run this query -> {query_string} and store the results in {output_location}")

# run the query
query.run(query_string=query_string, output_location=output_location)

# wait for the results
query.wait()
df_fg = query.as_dataframe()

# results
df_fg.head()

In [None]:
# write feature group name to a file
utils.write_param("destinations_fg_name", destinations_fg_name)