# Feathr Feature Store on Azure Demo Notebook

This notebook illustrates the use of Feature Store to create a model that predicts NYC Taxi fares. It includes these steps:


This tutorial demonstrates the basic usage of Feathr Feature Store on Azure. It includes these steps:

1. Setup Feathr Environment
2. Initialize Feathr Client
3. Defining Features with Feathr
4. Registering Features to the Feathr feature registry
5. Creating training data point-in-time correct features joins
6. Materializing features to an offline/online store

In this tutorial, we use Feathr Feature Store to create a model that predicts NYC Taxi fares. The dataset comes from [here](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page). The feature flow is as below:

![Feature Flow](https://github.com/linkedin/feathr/blob/main/docs/images/feature_flow.png?raw=true)

# Setup Feathr Environment

**REQUIRED STEP: Fill in the resource prefix when provisioning the resources**

***Prior to running the notebook, make sure you have deployed all required resources in your Azure subscription.*** 

In [None]:
# replace with your own values
resource_prefix = ""
resource_postfix = ""
resource_env = ""
storage_accountname = "" # storage account name of the feature store whre we created the container nyctaxi and uploaded sample data
storage_containername = "nyctaxi"

In [None]:
import glob
import os
import tempfile
from datetime import datetime, timedelta
from math import sqrt

import pandas as pd
import pandavro as pdx
from feathr import FeathrClient
from feathr import BOOLEAN, FLOAT, INT32, ValueType
from feathr import Feature, DerivedFeature, FeatureAnchor
from feathr import BackfillTime, MaterializationSettings
from feathr import FeatureQuery, ObservationSettings
from feathr import RedisSink, HdfsSink
from feathr import INPUT_CONTEXT, HdfsSource
from feathr import WindowAggTransformation
from feathr import TypedKey
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient

In [None]:
# Get all the required credentials from Azure Key Vault
key_vault_name="kv-"+resource_prefix+"-"+resource_postfix+resource_env
synapse_workspace_url="sy"+resource_prefix+"-"+resource_postfix+resource_env
adls_account="st"+resource_prefix+resource_postfix+resource_env
adls_fs_name="dl"+resource_prefix+resource_postfix+resource_env
key_vault_uri = f"https://{key_vault_name}.vault.azure.net"
credential = DefaultAzureCredential(exclude_interactive_browser_credential=False)
client = SecretClient(vault_url=key_vault_uri, credential=credential)
secretName = "FEATHR-ONLINE-STORE-CONN"
retrieved_secret = client.get_secret(secretName).value

# Get redis credentials; This is to parse Redis connection string.
redis_port=retrieved_secret.split(',')[0].split(":")[1]
redis_host=retrieved_secret.split(',')[0].split(":")[0]
redis_password=retrieved_secret.split(',')[1].split("password=",1)[1]
redis_ssl=retrieved_secret.split(',')[2].split("ssl=",1)[1]

# Set the resource link
os.environ['spark_config__azure_synapse__dev_url'] = f'https://{synapse_workspace_url}.dev.azuresynapse.net'
os.environ['spark_config__azure_synapse__pool_name'] = 'spdev'
os.environ['spark_config__azure_synapse__workspace_dir'] = f'abfss://{adls_fs_name}@{adls_account}.dfs.core.windows.net/feathr_project'
os.environ['online_store__redis__host'] = redis_host
os.environ['online_store__redis__port'] = redis_port
os.environ['online_store__redis__ssl_enabled'] = redis_ssl
os.environ['REDIS_PASSWORD']=redis_password
feathr_output_path = f'abfss://{adls_fs_name}@{adls_account}.dfs.core.windows.net/feathr_output'
os.environ['FEATURE_REGISTRY__API_ENDPOINT']= f'https://app{resource_prefix+resource_postfix+resource_env}.azurewebsites.net/api/v1'


# Initialize Feathr Client

In [None]:
config_file_name = "feathr_config.yaml" 
feathr_client = FeathrClient(config_path=config_file_name, credential=credential)

# Defining Features with Feathr

In Feathr, a feature is viewed as a function, mapping from entity id or key, and timestamp to a feature value. For more details on feature definition, please refer to the [Feathr Feature Definition Guide](https://github.com/linkedin/feathr/blob/main/docs/concepts/feature-definition.md)


1. The typed key (a.k.a. entity id) identifies the subject of feature, e.g. a user id, 123.
2. The feature name is the aspect of the entity that the feature is indicating, e.g. the age of the user.
3. The feature value is the actual value of that aspect at a particular time, e.g. the value is 30 at year 2022.


Note that, in some cases, such as features defined on top of request data, may have no entity key or timestamp.
It is merely a function/transformation executing against request data at runtime.
For example, the day of week of the request, which is calculated by converting the request UNIX timestamp.


## Define Sources Section with UDFs
A feature source is needed for anchored features that describes the raw data in which the feature values are computed from. See the python documentation to get the details on each input column.


In [None]:
# define a pre-processing function/UDF to be used on spark 
from pyspark.sql import SparkSession, DataFrame
def feathr_udf_day_calc(df: DataFrame) -> DataFrame:
    from pyspark.sql.functions import dayofweek, dayofyear, col
    df = df.withColumn("fare_amount_cents", col("fare_amount")*100)
    return df

# define the data source
batch_source = HdfsSource(name="nycTaxiBatchSource",
                          path="abfss://{container_name}@{storage_account}.dfs.core.windows.net/nyc_taxi.parquet".format(storage_account=storage_accountname, container_name=storage_containername),
                          event_timestamp_column="lpep_dropoff_datetime",
                          preprocessing=feathr_udf_day_calc,
                          timestamp_format="yyyy-MM-dd HH:mm:ss")

### Define Anchors and Features
A feature is called an anchored feature when the feature is directly extracted from the source data, rather than computed on top of other features. The latter case is called derived feature.

In [None]:
# define anchor features
f_trip_distance = Feature(name="f_trip_distance",
                          feature_type=FLOAT, transform="trip_distance")
f_trip_time_duration = Feature(name="f_trip_time_duration",
                               feature_type=INT32,
                               transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60")

features = [
    f_trip_distance,
    f_trip_time_duration,
    Feature(name="f_is_long_trip_distance",
            feature_type=BOOLEAN,
            transform="cast_float(trip_distance)>30"),
    Feature(name="f_day_of_week",
            feature_type=INT32,
            transform="dayofweek(lpep_dropoff_datetime)"),
]

request_anchor = FeatureAnchor(name="request_features",
                               source=INPUT_CONTEXT,
                               features=features)

### Window aggregation features

For window aggregation features, see the supported fields below:

Note that the `agg_func` should be any of these:

| Aggregation Type | Input Type | Description |
| --- | --- | --- |
|SUM, COUNT, MAX, MIN, AVG	|Numeric|Applies the the numerical operation on the numeric inputs. |
|MAX_POOLING, MIN_POOLING, AVG_POOLING	| Numeric Vector | Applies the max/min/avg operation on a per entry bassis for a given a collection of numbers.|
|LATEST| Any |Returns the latest not-null values from within the defined time window |


After you have defined features and sources, bring them together to build an anchor:


Note that if the data source is from the observation data, the `source` section should be `INPUT_CONTEXT` to indicate the source of those defined anchors.

In [None]:
location_id = TypedKey(key_column="DOLocationID",
                       key_column_type=ValueType.INT32,
                       description="location id in NYC",
                       full_name="nyc_taxi.location_id")

# calculate the average trip fare, maximum fare and total fare per location for 90 days
agg_features = [Feature(name="f_location_avg_fare",
                        key=location_id,
                        feature_type=FLOAT,
                        transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
                                                          agg_func="AVG",
                                                          window="90d")),
                Feature(name="f_location_max_fare",
                        key=location_id,
                        feature_type=FLOAT,
                        transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
                                                          agg_func="MAX",
                                                          window="90d")),
                Feature(name="f_location_total_fare_cents",
                        key=location_id,
                        feature_type=FLOAT,
                        transform=WindowAggTransformation(agg_expr="fare_amount_cents",
                                                          agg_func="SUM",
                                                          window="90d")),
                ]

agg_anchor = FeatureAnchor(name="aggregationFeatures",
                           source=batch_source,
                           features=agg_features)

### Derived Features 
Derived features are the features that are computed from other features. They could be computed from anchored features, or other derived features.

In [None]:
f_trip_time_distance = DerivedFeature(name="f_trip_time_distance",
                                      feature_type=FLOAT,
                                      input_features=[
                                          f_trip_distance, f_trip_time_duration],
                                      transform="f_trip_distance * f_trip_time_duration")

f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded",
                                     feature_type=INT32,
                                     input_features=[f_trip_time_duration],
                                     transform="f_trip_time_duration % 10")


And then we need to build those features so that it can be consumed later. Note that we have to build both the "anchor" and the "derived" features (which is not anchored to a source).

In [None]:
feathr_client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=[
                      f_trip_time_distance, f_trip_time_rounded])

### Registering features

We can also register the features with an Apache Atlas compatible service, such as Azure Purview, and share the registered features across teams:

In [None]:
feathr_client.register_features()

We can now list the registered features:

In [None]:
feathr_client.list_registered_features(project_name="nyctaxifs")

## Create training data using point-in-time correct feature join (e.g usage path: ML model training and scoring)

A training dataset usually contains entity id columns, multiple feature columns, event timestamp column and label/target column. 

To create a training dataset using Feathr, one needs to provide a feature join configuration file to specify
what features and how these features should be joined to the observation data. 

To learn more on this topic, please refer to [Point-in-time Correctness](https://github.com/linkedin/feathr/blob/main/docs/concepts/point-in-time-join.md)


In [None]:

output_path = feathr_output_path

# retrieve a list of features with help of the common key
feature_query = FeatureQuery(
    feature_list=["f_location_avg_fare", "f_trip_time_rounded", "f_is_long_trip_distance", "f_location_total_fare_cents"], key=location_id)

# Time settings of the observation data. Used in feature join (so the data useed for join is this data in observeration path combined with feature list described above)
settings = ObservationSettings(
    observation_path="abfss://{container_name}@{storage_account}.dfs.core.windows.net/nyc_taxi.parquet".format(storage_account=storage_accountname, container_name=storage_containername),
    event_timestamp_column="lpep_dropoff_datetime",
    timestamp_format="yyyy-MM-dd HH:mm:ss")
feathr_client.get_offline_features(observation_settings=settings,
                            feature_query=feature_query,
                            output_path=output_path)
feathr_client.wait_job_to_finish(timeout_sec=500)

Download the result and show the result

Let's use the helper function `get_result_df` to download the result and view it:

In [None]:
def get_result_df(client: FeathrClient) -> pd.DataFrame:
    """Download the job result dataset from cloud as a Pandas dataframe."""
    res_url = feathr_client.get_job_result_uri(block=True, timeout_sec=600)
    tmp_dir = tempfile.TemporaryDirectory()
    feathr_client.feathr_spark_launcher.download_result(result_path=res_url, local_folder=tmp_dir.name)
    dataframe_list = []
    # assuming the result are in avro format
    for file in glob.glob(os.path.join(tmp_dir.name, '*.avro')):
        dataframe_list.append(pdx.read_avro(file))
    vertical_concat_df = pd.concat(dataframe_list, axis=0)
    tmp_dir.cleanup()
    return vertical_concat_df

df_res = get_result_df(client)
df_res

## Materialize feature value into offline/online storage (e.g usage during: ML model inference path)

While Feathr can compute the feature value from the feature definition on-the-fly at request time, it can also pre-compute
and materialize the feature value to offline and/or online storage. 

We can push the generated features to the online store like below:


In [None]:
backfill_time = BackfillTime(start=datetime(
    2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))

redisSink = RedisSink(table_name="nyctaxi")

settings = MaterializationSettings(name="nycTaxiFeatures",
                                   backfill_time=backfill_time,
                                   sinks=[redisSink],
                                   feature_names=["f_location_avg_fare", "f_location_max_fare"])

feathr_client.materialize_features(settings)
feathr_client.wait_job_to_finish(timeout_sec=500)


We can then get the features from the online store (Redis in our case):


In [None]:
res = feathr_client.multi_get_online_features("nyctaxi", ["128", "243"], ["f_location_avg_fare", "f_location_max_fare"])
res