# Feathr Feature Store on Azure Demo Notebook

This notebook illustrates the use of Feature Store to create a model that predicts NYC Taxi fares. It includes these steps:


This tutorial demonstrates the basic usage of Feathr Feature Store on Azure. It includes these steps:

1. Setup Feathr Environment
2. Initialize Feathr Client
3. Defining Features with Feathr
4. Registering Features to the Feathr feature registry
5. Creating training data point-in-time correct features joins
6. Materializing features to an offline/online store

In this tutorial, we use Feathr Feature Store to create a model that predicts NYC Taxi fares. The dataset comes from [here](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page). The feature flow is as below:

![Feature Flow](https://github.com/linkedin/feathr/blob/main/docs/images/feature_flow.png?raw=true)

# Setup Feathr Environment

**REQUIRED STEP: Fill in the resource prefix when provisioning the resources**

***Prior to running the notebook, make sure you have deployed all required resources in your Azure subscription.*** 

In [1]:
# replace with your prefix
resource_prefix = "rsfeathr"
storage_accountname = "rsfeathrdls"
storage_containername = "nyctaxi"

In [2]:
import glob
import os
import tempfile
from datetime import datetime, timedelta
from math import sqrt

import pandas as pd
import pandavro as pdx
from feathr import FeathrClient
from feathr import BOOLEAN, FLOAT, INT32, ValueType
from feathr import Feature, DerivedFeature, FeatureAnchor
from feathr import BackfillTime, MaterializationSettings
from feathr import FeatureQuery, ObservationSettings
from feathr import RedisSink, HdfsSink
from feathr import INPUT_CONTEXT, HdfsSource
from feathr import WindowAggTransformation
from feathr import TypedKey
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient

In [3]:
# Get all the required credentials from Azure Key Vault
key_vault_name=resource_prefix+"kv"
synapse_workspace_url=resource_prefix+"syws"
adls_account=resource_prefix+"dls"
adls_fs_name=resource_prefix+"fs"
purview_name=resource_prefix+"purview"
key_vault_uri = f"https://{key_vault_name}.vault.azure.net"
credential = DefaultAzureCredential(exclude_interactive_browser_credential=False)
client = SecretClient(vault_url=key_vault_uri, credential=credential)
secretName = "FEATHR-ONLINE-STORE-CONN"
retrieved_secret = client.get_secret(secretName).value

# Get redis credentials; This is to parse Redis connection string.
redis_port=retrieved_secret.split(',')[0].split(":")[1]
redis_host=retrieved_secret.split(',')[0].split(":")[0]
redis_password=retrieved_secret.split(',')[1].split("password=",1)[1]
redis_ssl=retrieved_secret.split(',')[2].split("ssl=",1)[1]

# Set the resource link
os.environ['spark_config__azure_synapse__dev_url'] = f'https://{synapse_workspace_url}.dev.azuresynapse.net'
os.environ['spark_config__azure_synapse__pool_name'] = 'spark31'
os.environ['spark_config__azure_synapse__workspace_dir'] = f'abfss://{adls_fs_name}@{adls_account}.dfs.core.windows.net/feathr_project'
os.environ['feature_registry__purview__purview_name'] = f'{purview_name}'
os.environ['online_store__redis__host'] = redis_host
os.environ['online_store__redis__port'] = redis_port
os.environ['online_store__redis__ssl_enabled'] = redis_ssl
os.environ['REDIS_PASSWORD']=redis_password
feathr_output_path = f'abfss://{adls_fs_name}@{adls_account}.dfs.core.windows.net/feathr_output'

# Initialize Feathr Client

In [4]:
config_file_name = "feathr_config.yaml" 
feathr_client = FeathrClient(config_path=config_file_name, credential=credential)

2022-11-29 10:53:20.781 | INFO     | feathr.utils._envvariableutil:get_environment_variable_with_default:51 - secrets__azure_key_vault__name not found in the config file.
2022-11-29 10:53:20.923 | INFO     | feathr.utils._envvariableutil:get_environment_variable_with_default:51 - spark_config__azure_synapse__feathr_runtime_location not found in the config file.
2022-11-29 10:53:24.896 | INFO     | feathr.utils._envvariableutil:get_environment_variable_with_default:51 - secrets__azure_key_vault__name not found in the config file.
2022-11-29 10:53:24.905 | INFO     | feathr.utils._envvariableutil:get_environment_variable_with_default:51 - feature_registry__api_endpoint not found in the config file.
2022-11-29 10:53:24.919 | INFO     | feathr.client:__init__:176 - Feathr Client 0.9.0 initialized successfully


# Defining Features with Feathr

In Feathr, a feature is viewed as a function, mapping from entity id or key, and timestamp to a feature value. For more details on feature definition, please refer to the [Feathr Feature Definition Guide](https://github.com/linkedin/feathr/blob/main/docs/concepts/feature-definition.md)


1. The typed key (a.k.a. entity id) identifies the subject of feature, e.g. a user id, 123.
2. The feature name is the aspect of the entity that the feature is indicating, e.g. the age of the user.
3. The feature value is the actual value of that aspect at a particular time, e.g. the value is 30 at year 2022.


Note that, in some cases, such as features defined on top of request data, may have no entity key or timestamp.
It is merely a function/transformation executing against request data at runtime.
For example, the day of week of the request, which is calculated by converting the request UNIX timestamp.


## Define Sources Section with UDFs
A feature source is needed for anchored features that describes the raw data in which the feature values are computed from. See the python documentation to get the details on each input column.


In [5]:
# define a pre-processing function/UDF to be used on spark 
from pyspark.sql import SparkSession, DataFrame
def feathr_udf_day_calc(df: DataFrame) -> DataFrame:
    from pyspark.sql.functions import dayofweek, dayofyear, col
    df = df.withColumn("fare_amount_cents", col("fare_amount")*100)
    return df

# define the data source
batch_source = HdfsSource(name="nycTaxiBatchSource",
                          path="abfss://{container_name}@{storage_account}.dfs.core.windows.net/nyc_taxi.parquet".format(storage_account=storage_accountname, container_name=storage_containername),
                          event_timestamp_column="lpep_dropoff_datetime",
                          preprocessing=feathr_udf_day_calc,
                          timestamp_format="yyyy-MM-dd HH:mm:ss")

### Define Anchors and Features
A feature is called an anchored feature when the feature is directly extracted from the source data, rather than computed on top of other features. The latter case is called derived feature.

In [6]:
# define anchor features
f_trip_distance = Feature(name="f_trip_distance",
                          feature_type=FLOAT, transform="trip_distance")
f_trip_time_duration = Feature(name="f_trip_time_duration",
                               feature_type=INT32,
                               transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60")

features = [
    f_trip_distance,
    f_trip_time_duration,
    Feature(name="f_is_long_trip_distance",
            feature_type=BOOLEAN,
            transform="cast_float(trip_distance)>30"),
    Feature(name="f_day_of_week",
            feature_type=INT32,
            transform="dayofweek(lpep_dropoff_datetime)"),
]

request_anchor = FeatureAnchor(name="request_features",
                               source=INPUT_CONTEXT,
                               features=features)

### Window aggregation features

For window aggregation features, see the supported fields below:

Note that the `agg_func` should be any of these:

| Aggregation Type | Input Type | Description |
| --- | --- | --- |
|SUM, COUNT, MAX, MIN, AVG	|Numeric|Applies the the numerical operation on the numeric inputs. |
|MAX_POOLING, MIN_POOLING, AVG_POOLING	| Numeric Vector | Applies the max/min/avg operation on a per entry bassis for a given a collection of numbers.|
|LATEST| Any |Returns the latest not-null values from within the defined time window |


After you have defined features and sources, bring them together to build an anchor:


Note that if the data source is from the observation data, the `source` section should be `INPUT_CONTEXT` to indicate the source of those defined anchors.

In [7]:
location_id = TypedKey(key_column="DOLocationID",
                       key_column_type=ValueType.INT32,
                       description="location id in NYC",
                       full_name="nyc_taxi.location_id")

# calculate the average trip fare, maximum fare and total fare per location for 90 days
agg_features = [Feature(name="f_location_avg_fare",
                        key=location_id,
                        feature_type=FLOAT,
                        transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
                                                          agg_func="AVG",
                                                          window="90d")),
                Feature(name="f_location_max_fare",
                        key=location_id,
                        feature_type=FLOAT,
                        transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
                                                          agg_func="MAX",
                                                          window="90d")),
                Feature(name="f_location_total_fare_cents",
                        key=location_id,
                        feature_type=FLOAT,
                        transform=WindowAggTransformation(agg_expr="fare_amount_cents",
                                                          agg_func="SUM",
                                                          window="90d")),
                ]

agg_anchor = FeatureAnchor(name="aggregationFeatures",
                           source=batch_source,
                           features=agg_features)

### Derived Features 
Derived features are the features that are computed from other features. They could be computed from anchored features, or other derived features.

In [8]:
f_trip_time_distance = DerivedFeature(name="f_trip_time_distance",
                                      feature_type=FLOAT,
                                      input_features=[
                                          f_trip_distance, f_trip_time_duration],
                                      transform="f_trip_distance * f_trip_time_duration")

f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded",
                                     feature_type=INT32,
                                     input_features=[f_trip_time_duration],
                                     transform="f_trip_time_duration % 10")


And then we need to build those features so that it can be consumed later. Note that we have to build both the "anchor" and the "derived" features (which is not anchored to a source).

In [9]:
feathr_client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=[
                      f_trip_time_distance, f_trip_time_rounded])

### Registering features

We can also register the features with an Apache Atlas compatible service, such as Azure Purview, and share the registered features across teams:

In [10]:
feathr_client.register_features()

2022-11-29 10:53:46.554 | INFO     | feathr.registry._feature_registry_purview:_register_feathr_feature_types:193 - Feathr Feature Type System Initialized.


Found existing entity  947706cf-8ab6-4e27-8935-68fdcbb556de, feathr_workspace_v1 -- nyctaxifs
Found existing entity  a9c14bc8-74d5-41b9-9243-8446ded63581, feathr_source_v1 -- nyctaxifs__nycTaxiBatchSource
Found existing entity  64cfc76e-4c76-4aa4-a603-02e5d5f4aec9, Process -- CONTAINS__947706cf-8ab6-4e27-8935-68fdcbb556de__a9c14bc8-74d5-41b9-9243-8446ded63581
Found existing entity  748dc0cf-b760-4734-92c0-59f5e35b727b, Process -- BELONGSTO__a9c14bc8-74d5-41b9-9243-8446ded63581__947706cf-8ab6-4e27-8935-68fdcbb556de
Found existing entity  72de4e85-e8ae-4267-a312-37b0008218db, feathr_anchor_v1 -- nyctaxifs__aggregationFeatures
Found existing entity  1fd5d2b1-968c-4602-bb5b-280ecc4d1ba2, Process -- CONTAINS__947706cf-8ab6-4e27-8935-68fdcbb556de__72de4e85-e8ae-4267-a312-37b0008218db
Found existing entity  e2e9c7fa-e44e-4b67-a4f2-fb396ac27973, Process -- BELONGSTO__72de4e85-e8ae-4267-a312-37b0008218db__947706cf-8ab6-4e27-8935-68fdcbb556de
Found existing entity  e2da6f24-c831-4267-a23d-78efae

2022-11-29 10:54:54.280 | INFO     | feathr.registry._feature_registry_purview:register_features:980 - Finished registering features.


Found existing entity  7e3efcff-8c7f-4a66-9a35-809ad5e033ad, Process -- PRODUCES__042243d7-130d-4b9f-aebf-dc1039b6f0d2__bd382c8f-c2d4-4c35-a899-5fce980f22da


We can now list the registered features:

In [11]:
feathr_client.list_registered_features(project_name="nyctaxifs")

[{'name': 'f_trip_time_distance',
  'id': 'bd382c8f-c2d4-4c35-a899-5fce980f22da',
  'qualifiedName': 'nyctaxifs__f_trip_time_distance'},
 {'name': 'f_trip_time_rounded',
  'id': '67454f48-c0cb-4d2d-af22-40bbac6fea8e',
  'qualifiedName': 'nyctaxifs__f_trip_time_rounded'},
 {'name': 'f_is_long_trip_distance',
  'id': 'bb477734-fe36-4df8-9771-c8e79b39c19c',
  'qualifiedName': 'nyctaxifs__request_features__f_is_long_trip_distance'},
 {'name': 'f_day_of_week',
  'id': '8d99c1a9-2319-4745-afc3-c3d91d108af1',
  'qualifiedName': 'nyctaxifs__request_features__f_day_of_week'},
 {'name': 'f_trip_distance',
  'id': '1377824b-0ea5-4a8f-90ae-8e01a218d1d9',
  'qualifiedName': 'nyctaxifs__request_features__f_trip_distance'},
 {'name': 'f_trip_time_duration',
  'id': '042243d7-130d-4b9f-aebf-dc1039b6f0d2',
  'qualifiedName': 'nyctaxifs__request_features__f_trip_time_duration'},
 {'name': 'f_location_total_fare_cents',
  'id': '8568689c-586e-4348-9c16-3186bf53b1e0',
  'qualifiedName': 'nyctaxifs__aggreg

## Create training data using point-in-time correct feature join (e.g usage path: ML model training and scoring)

A training dataset usually contains entity id columns, multiple feature columns, event timestamp column and label/target column. 

To create a training dataset using Feathr, one needs to provide a feature join configuration file to specify
what features and how these features should be joined to the observation data. 

To learn more on this topic, please refer to [Point-in-time Correctness](https://github.com/linkedin/feathr/blob/main/docs/concepts/point-in-time-join.md)


In [13]:

output_path = feathr_output_path

# retrieve a list of features with help of the common key
feature_query = FeatureQuery(
    feature_list=["f_location_avg_fare", "f_trip_time_rounded", "f_is_long_trip_distance", "f_location_total_fare_cents"], key=location_id)

# Time settings of the observation data. Used in feature join (so the data useed for join is this data in observeration path combined with feature list described above)
settings = ObservationSettings(
    observation_path="abfss://{container_name}@{storage_account}.dfs.core.windows.net/nyc_taxi.parquet".format(storage_account=storage_accountname, container_name=storage_containername),
    event_timestamp_column="lpep_dropoff_datetime",
    timestamp_format="yyyy-MM-dd HH:mm:ss")
feathr_client.get_offline_features(observation_settings=settings,
                            feature_query=feature_query,
                            output_path=output_path)
feathr_client.wait_job_to_finish(timeout_sec=500)

2022-11-29 11:02:40.900 | INFO     | feathr.spark_provider._synapse_submission:upload_or_get_cloud_path:67 - Uploading C:\Users\RISENG~1\AppData\Local\Temp\tmpu5bgb285\feathr_pyspark_driver.py to cloud..
2022-11-29 11:02:40.902 | INFO     | feathr.spark_provider._synapse_submission:upload_file:412 - Uploading file feathr_pyspark_driver.py
2022-11-29 11:02:41.475 | INFO     | feathr.spark_provider._synapse_submission:upload_file:418 - C:\Users\RISENG~1\AppData\Local\Temp\tmpu5bgb285\feathr_pyspark_driver.py is uploaded to location: abfss://rsfeathrfs@rsfeathrdls.dfs.core.windows.net/feathr_project/feathr_pyspark_driver.py
2022-11-29 11:02:41.477 | INFO     | feathr.spark_provider._synapse_submission:upload_or_get_cloud_path:71 - C:\Users\RISENG~1\AppData\Local\Temp\tmpu5bgb285\feathr_pyspark_driver.py is uploaded to location: abfss://rsfeathrfs@rsfeathrdls.dfs.core.windows.net/feathr_project/feathr_pyspark_driver.py
2022-11-29 11:02:41.514 | INFO     | feathr.spark_provider._synapse_sub

Download the result and show the result

Let's use the helper function `get_result_df` to download the result and view it:

In [14]:
def get_result_df(client: FeathrClient) -> pd.DataFrame:
    """Download the job result dataset from cloud as a Pandas dataframe."""
    res_url = feathr_client.get_job_result_uri(block=True, timeout_sec=600)
    tmp_dir = tempfile.TemporaryDirectory()
    feathr_client.feathr_spark_launcher.download_result(result_path=res_url, local_folder=tmp_dir.name)
    dataframe_list = []
    # assuming the result are in avro format
    for file in glob.glob(os.path.join(tmp_dir.name, '*.avro')):
        dataframe_list.append(pdx.read_avro(file))
    vertical_concat_df = pd.concat(dataframe_list, axis=0)
    tmp_dir.cleanup()
    return vertical_concat_df

df_res = get_result_df(client)
df_res

2022-11-29 11:08:46.241 | INFO     | feathr.spark_provider._synapse_submission:wait_for_completion:177 - Current Spark job status: success
2022-11-29 11:08:46.309 | INFO     | feathr.spark_provider._synapse_submission:download_file:430 - Beginning reading of results from abfss://rsfeathrfs@rsfeathrdls.dfs.core.windows.net/feathr_output
Downloading result files: 100%|██████████| 146/146 [00:10<00:00, 13.70it/s]
2022-11-29 11:08:57.444 | INFO     | feathr.spark_provider._synapse_submission:download_file:459 - Finish downloading files from abfss://rsfeathrfs@rsfeathrdls.dfs.core.windows.net/feathr_output to C:\Users\RISENG~1\AppData\Local\Temp\tmp5c3n34i9.


Unnamed: 0,trip_id,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,...,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,f_is_long_trip_distance,f_location_total_fare_cents,f_location_avg_fare,f_trip_time_rounded
0,25281,,2020-04-03 07:58:00,2020-04-03 08:26:00,,,47,125,,12.98,...,,0.3,32.00,,,,False,3170.0,31.700001,8
1,26724,,2020-04-07 22:45:00,2020-04-07 23:08:00,,,228,125,,7.13,...,,0.3,31.58,,,,False,6023.0,30.115002,3
2,6453,2.0,2020-04-08 04:09:53,2020-04-08 04:27:48,N,1.0,75,125,1.0,5.93,...,,0.3,25.05,1.0,1.0,2.75,False,7923.0,26.410002,7
3,8885,1.0,2020-04-10 21:33:57,2020-04-10 22:00:11,N,1.0,89,125,1.0,0.00,...,,0.3,29.90,1.0,1.0,2.5,False,12363.0,24.726002,6
4,27764,,2020-04-10 21:33:57,2020-04-10 22:00:11,,,89,125,,0.00,...,,0.3,31.90,,,,False,12363.0,24.726002,6
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
219,35356,,2020-04-30 16:12:00,2020-04-30 17:16:00,,,55,119,,25.74,...,,0.3,68.85,,,,False,337156.0,15.325272,4
220,35424,,2020-04-30 17:05:00,2020-04-30 17:25:00,,,242,119,,6.89,...,,0.3,21.27,,,,False,338978.0,15.338370,0
221,24371,2.0,2020-04-30 18:09:34,2020-04-30 18:20:12,N,5.0,74,119,1.0,2.98,...,,0.3,13.30,2.0,2.0,0.0,False,340278.0,15.327837,0
222,35458,,2020-04-30 18:13:00,2020-04-30 18:25:00,,,127,119,,2.39,...,,0.3,13.54,,,,False,341327.0,15.306143,2


## Materialize feature value into offline/online storage (e.g usage during: ML model inference path)

While Feathr can compute the feature value from the feature definition on-the-fly at request time, it can also pre-compute
and materialize the feature value to offline and/or online storage. 

We can push the generated features to the online store like below:


In [15]:
backfill_time = BackfillTime(start=datetime(
    2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))

redisSink = RedisSink(table_name="nyctaxi")

settings = MaterializationSettings(name="nycTaxiFeatures",
                                   backfill_time=backfill_time,
                                   sinks=[redisSink],
                                   feature_names=["f_location_avg_fare", "f_location_max_fare"])

feathr_client.materialize_features(settings)
feathr_client.wait_job_to_finish(timeout_sec=500)


2022-11-29 11:10:13.047 | INFO     | feathr.spark_provider._synapse_submission:upload_or_get_cloud_path:67 - Uploading C:\Users\RISENG~1\AppData\Local\Temp\tmpu5bgb285\feathr_pyspark_driver.py to cloud..
2022-11-29 11:10:13.050 | INFO     | feathr.spark_provider._synapse_submission:upload_file:412 - Uploading file feathr_pyspark_driver.py
2022-11-29 11:10:13.417 | INFO     | feathr.spark_provider._synapse_submission:upload_file:418 - C:\Users\RISENG~1\AppData\Local\Temp\tmpu5bgb285\feathr_pyspark_driver.py is uploaded to location: abfss://rsfeathrfs@rsfeathrdls.dfs.core.windows.net/feathr_project/feathr_pyspark_driver.py
2022-11-29 11:10:13.420 | INFO     | feathr.spark_provider._synapse_submission:upload_or_get_cloud_path:71 - C:\Users\RISENG~1\AppData\Local\Temp\tmpu5bgb285\feathr_pyspark_driver.py is uploaded to location: abfss://rsfeathrfs@rsfeathrdls.dfs.core.windows.net/feathr_project/feathr_pyspark_driver.py
2022-11-29 11:10:13.422 | INFO     | feathr.utils._envvariableutil:get_

We can then get the features from the online store (Redis in our case):


In [16]:
res = feathr_client.multi_get_online_features("nycTaxiFeatures", ["128", "243"], ["f_location_avg_fare", "f_location_max_fare"])
res

{'128': [25.209131240844727, 72.0], '243': [16.404672622680664, 80.0]}