title | titleSuffix | description | ms.service | ms.subservice | ms.topic | ms.author | author | ms.reviewer | ms.date | ms.custom |
---|---|---|---|---|---|---|---|---|---|---|
Feature set specification transformation concepts |
Azure Machine Learning |
The feature set specification, transformations, and best practices. |
machine-learning |
mldata |
how-to |
franksolomon |
fbsolo-ms1 |
yogipandey |
12/06/2023 |
template-concept |
This article describes feature set specifications, the different kinds of transformations that can be used with it, and related best practices.
A feature set is a collection of features generated by source data transformations. A feature set specification is a self-contained definition for feature set development and local testing. After its development and local testing, you can register that feature set as a feature set asset with the feature store. You then have versioning and materialization available as managed capabilities.
FeatureSetSpec
defines a feature set. This sample shows a feature set specification file:
$schema: http://azureml/sdk-2-0/FeatureSetSpec.json
source:
type: parquet
path: abfs://file_system@account_name.dfs.core.windows.net/datasources/transactions-source/*.parquet
timestamp_column: # name of the column representing the timestamp.
name: timestamp
source_delay:
days: 0
hours: 3
minutes: 0
feature_transformation:
transformation_code:
path: ./transformation_code
transformer_class: transaction_transform.TransactionFeatureTransformer
features:
- name: transaction_7d_count
type: long
- name: transaction_amount_7d_sum
type: double
- name: transaction_amount_7d_avg
type: double
- name: transaction_3d_count
type: long
- name: transaction_amount_3d_sum
type: double
- name: transaction_amount_3d_avg
type: double
index_columns:
- name: accountID
type: string
source_lookback:
days: 7
hours: 0
minutes: 0
temporal_join_lookback:
days: 1
hours: 0
minutes: 0
Note
The featurestore
core SDK autogenerates the feature set specification YAML
. This tutorial has an example.
In the FeatureSetSpec
definition, these properties have relevance to feature transformation:
source
: defines the source data and relevant metadata - for example, the timestamp column in the data. Currently, only time-series source data and features are supported. Thesource.timestamp_column
property is mandatoryfeature_transformation.transformation_code
: defines the code folder location of the feature transformerfeatures
: defines the feature schema generated by the feature transformerindex_columns
: defines the index column(s) schema that the feature transformer generatessource_lookback
: this property is used when the feature handles aggregation on time-series (for example, window aggregation) data. The value of this property indicates the required time range of source data in the past, for a feature value at time T. The Best Practice section has details.
After you define a FeatureSetSpec
, invoke featureSetSpec.to_spark_dataframe(feature_window_start_ts, feature_window_end_ts)
to calculate features on a given feature window.
The calculation happens in these steps:
- Read data from the source data. The
source
defines the source data. Filter the data by the time range[feature_window_start_ts - source_lookback, feature_window_end_ts)
. The time range includes the start of the window, and excludes the end of the window - Apply the feature transformer, defined by
feature_transformation.transformation_code
, on the data, and get the calculated features - Filter the feature values to return only those feature records within the feature window
[feature_window_start_ts, feature_window_end_ts)
In this code sample, the feature store API computes the features:
# define the source data time window according to feature window
source_window_start_ts = feature_window_start_ts - source_lookback
source_window_end_ts = feature_window_end_ts
# read source table into a dataframe
df1 = spark.read.parquet(source.path).filter(df1["timestamp"] >= source_window_start_ts && df1["timestamp"] < source_window_end_ts)
# apply the feature transformer
df2 = FeatureTransformer._transform(df1)
## filter the feature(set) to include only feature records within the feature window
feature_set_df = df2.filter(df2["timestamp"] >= feature_window_start_ts && df2["timestamp"] < feature_window_end_ts)
:::image type="content" source="./media/feature-set-specification-transformation-concepts/feature-set-specification-example.png" lightbox="./media/feature-set-specification-transformation-concepts/feature-set-specification-example.png" alt-text="Illustration showing feature set specification and corresponding transformations applied on source data to produce feature dataframe.":::
The transform function outputs a dataframe, which includes these values in its schema:
- Index columns that match the
FeatureSetSpec
definition, both in name and type - The timestamp column (name) that matches the timestamp definition in the
source
. Thesource
is found inFeatureSetSpec
- Define all other column name/type values as
features
inFeatureSetSpec
In a row-level transformation, a feature value calculation on a specific row only uses column values of that row. Start with this source data:
user_id |
timestamp |
total_spend |
---|---|---|
1 | 2022-12-19 06:00:00 | 12.00 |
2 | 2022-12-10 03:00:00 | 56.00 |
1 | 2022-12-25 13:00:00 | 112.00 |
Define a new feature set named user_total_spend_profile
:
from pyspark.sql import Dataframe
from pyspark.ml import Transformer
class UserTotalSpendProfileTransformer(Transformer):
def _transform(df: Dataframe) -> Dataframe:
df.withColumn("is_high_spend_user", col("total_spend") > 100.0) \
.withColumn("is_low_spend_user", col("total_spend") < 20.0)
This feature set has three features, with data types as shown:
total_spend
: doubleis_high_spend_user
: boolis_low_spend_user
: bool
This shows the calculated feature values:
user_id |
timestamp |
total_spend |
is_high_spend_user |
is_low_spend_user |
---|---|---|---|---|
1 | 2022-12-19 06:00:00 | 12.00 | false | true |
2 | 2022-12-10 03:00:00 | 56.00 | false | false |
1 | 2022-12-25 13:00:00 | 112.00 | true | false |
Sliding window aggregation can help handle feature values that present statistics (for example, sum, average, etc.) that accumulate over time. The SparkSQL Window
function defines a sliding window around each row in the data, is useful in these cases.
For each row, the Window
object can look into both future and past. In the context of machine learning features, you should define the Window
object to look only the past, for each row. Visit the Best Practice section for more details.
Start with this source data:
user_id |
timestamp |
spend |
---|---|---|
1 | 2022-12-10 06:00:00 | 12.00 |
2 | 2022-12-10 03:00:00 | 56.00 |
1 | 2022-12-11 01:00:00 | 10.00 |
2 | 2022-12-11 20:00:00 | 10.00 |
2 | 2022-12-12 02:00:00 | 100.00 |
Define a new feature set named user_rolling_spend
. This feature set includes rolling 1-day and 3-day total spending, by user:
from pyspark.sql import Dataframe
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml import Transformer
class UserRollingSpend(Transformer):
def _transform(df: Dataframe) -> Dataframe:
days = lambda i: i * 86400
w_1d = (Window.partitionBy("user_id").orderBy(F.col("timestamp").cast('long'))\
.rangeBetween(-days(1), 0))
w_3d = (Window.partitionBy("user_id").orderBy(F.col("timestamp").cast('long')).\
rangeBetween(-days(3), 0))
res = df.withColumn("spend_1d_sum", F.sum("spend").over(w_1d)) \
.withColumn("spend_3d_sum", F.sum("spend").over(w_3d)) \
.select("user_id", "timestamp", "spend_1d_sum", "spend_3d_sum")
return res
The user_rolling_spend
feature set has two features:
spend_1d_sum
: doublespend_3d_sum
: double
This shows its calculated feature values:
user_id |
timestamp |
spend_1d_sum |
spend_3d_sum |
---|---|---|---|
1 | 2022-12-10 06:00:00 | 12.00 | 12.00 |
2 | 2022-12-10 03:00:00 | 56.00 | 56.00 |
1 | 2022-12-11 01:00:00 | 22.00 | 22.00 |
2 | 2022-12-11 20:00:00 | 10.00 | 66.00 |
2 | 2022-12-12 02:00:00 | 110.00 | 166.00 |
The feature value calculations use columns on the current row, combined with preceding row columns within the range.
A tumbling window can aggregate data on time-series data. Group the data into fixed-size, nonoverlapping and continuous time windows, and then aggregate it. For example, users can define features based on daily or hourly aggregation. Use the pyspark.sql.functions.window
function to define a tumbling window, for consistent results. The output feature timestamp
should align with the end of each tumbling window.
Start with this source data:
user_id |
timestamp |
spend |
---|---|---|
1 | 2022-12-10 06:00:00 | 12.00 |
1 | 2022-12-10 16:00:00 | 10.00 |
2 | 2022-12-10 03:00:00 | 56.00 |
1 | 2022-12-11 01:00:00 | 10.00 |
2 | 2022-12-12 04:00:00 | 23.00 |
2 | 2022-12-12 12:00:00 | 10.00 |
Define a new feature set named user_daily_spend
:
from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.sql.dataframe import DataFrame
class TransactionFeatureTransformer(Transformer):
def _transform(self, df: DataFrame) -> DataFrame:
df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day"))\
.agg(F.sum("spend").alias("daily_spend"))
df2 = df1.select("user_id", df1.window.end.cast("timestamp").alias("end"),"daily_spend")
df3 = df2.withColumn('timestamp', F.expr("end - INTERVAL 1 milliseconds")) \
.select("user_id", "timestamp","daily_spend")
return df3
The user_daily_spend
feature set has this feature:
daily_spend
: double
This shows its calculated feature values:
user_id |
timestamp |
daily_spend |
---|---|---|
1 | 2022-12-10 23:59:59 | 22.00 |
2 | 2022-12-10 23:59:59 | 56.00 |
1 | 2022-12-11 23:59:59 | 10.00 |
2 | 2022-12-12 23:59:59 | 33.00 |
Stagger window aggregation is a minor variant of the tumbling window aggregation. Stagger window aggregation groups the data into fixed-size windows. However, the windows can overlap each other. For this, use pyspark.sql.functions.window
, with a slideDuration
smaller than windowDuration
.
Start with this example data:
user_id |
timestamp |
spend |
---|---|---|
1 | 2022-12-10 03:00:00 | 12.00 |
1 | 2022-12-10 09:00:00 | 10.00 |
1 | 2022-12-11 05:00:00 | 8.00 |
2 | 2022-12-12 14:00:00 | 56.00 |
Define a new feature set named user_sliding_24hr_spend
:
from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.sql.dataframe import DataFrame
class TrsactionFeatureTransformer(Transformer):
def _transform(self, df: DataFrame) -> DataFrame:
df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="6 hours"))\
.agg(F.sum("spend").alias("sliding_24hr_spend"))
df2 = df1.select("user_id", df1.window.end.cast("timestamp").alias("end"),"sliding_24hr_spend")
df3 = df2.withColumn('timestamp', F.expr("end - INTERVAL 1 milliseconds")) \
.select("user_id", "timestamp","sliding_24hr_spend")
return df3
The user_sliding_24hr_spend
feature set has one feature:
sliding_24hr_spend
: double
This shows its calculated feature values:
user_id |
timestamp |
sliding_24hr_spend |
---|---|---|
1 | 2022-12-10 05:59:59 | 12.00 |
1 | 2022-12-10 11:59:59 | 22.00 |
1 | 2022-12-10 17:59:59 | 22.00 |
1 | 2022-12-10 23:59:59 | 22.00 |
1 | 2022-12-11 05:59:59 | 18.00 |
1 | 2022-12-11 11:59:59 | 8.00 |
1 | 2022-12-11 17:59:59 | 8.00 |
1 | 2022-12-11 23:59:59 | 8.00 |
1 | 2022-12-12 05:59:59 | 18.00 |
2 | 2022-12-12 17:59:59 | 56.00 |
2 | 2022-12-12 23:59:59 | 56.00 |
2 | 2022-12-13 05:59:59 | 56.00 |
2 | 2022-12-13 11:59:59 | 56.00 |
If the timestamp value for each calculated feature value is ts_0
, calculate the feature values based on source
data with timestamp values on or before ts_0
only. This avoids feature calculation based on data from after the feature event time, otherwise known as data leakage.
Data leakage usually happens with sliding/tumbling/stagger window aggregation. These best practices can help avoid leakage:
- Sliding window aggregation: define the window to look only back in time, from each row
- Tumbling/stagger window aggregation: define the feature timestamp based on the end of each window
This data sample shows good and bad example data:
Aggregation | Good example | Bad example with data leakage |
---|---|---|
Sliding window | Window.partitionBy("user_id") .orderBy(F.col("timestamp").cast('long')) . rangeBetween(-days(4), 0) |
Window.partitionBy("user_id") .orderBy(F.col("timestamp").cast('long')) . rangeBetween(-days(2), days(2)) |
Tumbling/stagger window | df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day")) .agg(F.sum("spend").alias("daily_spend")) df2 = df1.select("user_id", df1.window. end .cast("timestamp").alias("timestamp"),"daily_spend") |
df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day")) .agg(F.sum("spend").alias("daily_spend")) df2 = df1.select("user_id", df1.window. start .cast("timestamp").alias("timestamp"),"daily_spend") |
Data leakage in the feature transformation definition can lead to these problems:
- Errors in the calculated/materialized feature values
- Inconsistencies in
get_offline_feature
, when using the materialized feature value instead of values calculated on the fly
For time-series (sliding/tumbling/stagger window aggregation) data aggregations, properly set the source_lookback
property. This diagram shows the relationship between the source data window and the feature window in the feature (set) calculation:
:::image type="content" source="./media/feature-set-specification-transformation-concepts/illustration-source-lookback.png" lightbox="./media/feature-set-specification-transformation-concepts/illustration-source-lookback.png" alt-text="Illustration showing the concept of source_lookback.":::
Define source_lookback
as a time delta value, which presents the range of source data needed for a feature value of a given timestamp. This example shows the recommended source_lookback
values for the common transformation types:
Transformation type | source_lookback |
---|---|
Row-level transformation | 0 (default) |
Sliding window | size of the largest window range in the transformer. e.g. source_lookback = 3 days when the feature set defines 3 day rolling features source_lookback = 7 days when the feature set defines both 3 day and 7 day rolling features |
Tumbling/stagger window | value of windowDuration in window definition. e.g. source_lookback = 1day when using window("timestamp", windowDuration="1 day",slideDuration="6 hours) |
Incorrect source_lookback
settings can lead to incorrect calculated/materialized feature values.