# Tutorial #9: Develop a feature set using DSL

In this tutorial you will:
- Create a new minimal feature store resource
- Develop and test featureset locally with dsl capability
- Develop an udf featureset with the same transformation and compare results with the dsl featureset
- Register a feature-store entity with the feature store
- Register the dsl featureset that you developed with the feature store
- Generate sample training data dataframe using the features you created

#### Important

This feature is currently in private preview. This preview version is provided without a service-level agreement, and it's not recommended for production workloads. Certain features might not be supported or might have constrained capabilities. For more information, see [Supplemental Terms of Use for Microsoft Azure Previews](https://azure.microsoft.com/support/legal/preview-supplemental-terms/).

## Prerequisites
Before following the steps in this article, make sure you have the following prerequisites:

* An Azure Machine Learning workspace. If you don't have one, use the steps in the [Quickstart: Create workspace resources](https://learn.microsoft.com/en-us/azure/machine-learning/quickstart-create-resources?view=azureml-api-2) article to create one.
* To perform the steps in this article, your user account must be assigned the owner or contributor role to a resource group where the feature store will be created

#### Prepare the notebook environment for development
Note: This tutorial uses AzureML spark notebook for development.

Alternatively you can download a zip file from the [examples repository (azureml-examples)](https://github.com/azure/azureml-examples): click on the `code` dropdown and click `Download ZIP`. Then unzip the contents into a folder in your local machine.

2. Upload the feature store samples directory to project workspace: Open Azure ML studio UI of your Azure ML workspace -> click on "Notebooks" in left nav -> right click on your user name in the directory listing -> click "upload folder" -> select the feature store samples folder from the cloned directory path: `azureml-examples/sdk/python/featurestore-sample`

3. You can either create a new notebook and paste the instructions in this document step by step and execute OR open the existing notebook titled `9.Dsl.ipynb`. You can execute step by step. Keep this document open and refer to it for detailed explanation of the steps. The notebooks are available in the folder: `featurestore_sample/notebooks`. Select either `sdk_only` folder or the `sdk_and_cli` folder. The latter has CLI commands mixed with python sdk useful in ci/cd scenarios.

4. In the "Compute" dropdown in the top nav, select "Serverless Spark Compute". It may take 1-2 minutes for this activity to complete. Wait for a status bar in the top to display `configure session`

5. Click on "configure session" -> click on "Python packages" -> click on "upload conda file" -> select the file `azureml-examples/sdk/python/featurestore-sample/project/env/conda.yml` from your local machine; Also increase the session time out (idle time) if you want to reduce serverless spark cluster startup time.

__Important:__ Except for this step, you need to run all the other steps every time you have a new spark session/session time out


In [None]:
# run this cell to start the spark session (any code block will start the session ). This can take around 10 mins. 
# While waiting for spark session to start, create a feature store
print("start spark session")

#### Creating a feature store through UI:

1. Navigate to the feature store UI [landing page](https://ml.azure.com/featureStores?tid=72f988bf-86f1-41af-91ab-2d7cd011db47)
1. On the `Basics` page: Choose a name for your feature store, and the subscription and resource group, make sure the region is __East US 2 EUAP__
1. On the `Materialization` page: toggle "enable materialization", and then follow the instructions to select UAI, and an ADLS Gen 2 storage.
1. Review and click "Create"


#### Setup root directory for the samples and enable private preview feature

In [None]:
import os

# Please update your alias belpw (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left nav
root_dir = "./Users/<your-alias-here>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

# Set the private preview feature flag to true
os.environ["AZURE_ML_CLI_PRIVATE_FEATURES_ENABLED"]="True"

#### If you don't have a feature store, please create one in East US 2 EUAP region by uncommenting the below cell

In [None]:
import os

featurestore_name = "my-dsl-featurestore-placeholder"
featurestore_resource_group_name = "my-resource-group-placeholder"
featurestore_subscription_id = "my-subscription-id-placeholder"

##### Create Feature Store #####
# from azure.ai.ml import MLClient
# from azure.ai.ml.entities import (
#     FeatureStore,
#     FeatureStoreEntity,
#     FeatureSet,
# )
# from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

# ml_client = MLClient(
#     AzureMLOnBehalfOfCredential(),
#     subscription_id=featurestore_subscription_id,
#     resource_group_name=featurestore_resource_group_name,
# )
# featurestore_location = "eastus2euap"

# fs = FeatureStore(name=featurestore_name, location=featurestore_location)
# # wait for featurestore creation
# fs_poller = ml_client.feature_stores.begin_create(fs, update_dependent_resources=True)
# print(fs_poller.result())

Create a feature set spec using DSL expression. A list of supported aggregations can be found [here](https://github.com/Azure/azureml_run_specification/blob/master/specs/featurestore/feature-store-dsl.md#proposal)


In [None]:
from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.contracts.feature import Feature
from azureml.featurestore.transformation import TransformationExpressionCollection, WindowAggregation
from azureml.featurestore.contracts import (
    DateTimeOffset,
    FeatureSource,
    TransformationCode,
    Column,
    ColumnType,
    SourceType,
    TimestampColumn,
)

dsl_feature_set_spec = create_feature_set_spec(
    source=FeatureSource(
        type=SourceType.parquet,
        path="wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet",
        timestamp_column=TimestampColumn(name="timestamp")
    ),
    temporal_join_lookback=DateTimeOffset(0),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    features=[
        Feature(name="f_transaction_3d_count", type=ColumnType.LONG),
        Feature(name="f_transaction_amount_3d_sum", type=ColumnType.DOUBLE),
        Feature(name="f_transaction_amount_3d_avg", type=ColumnType.DOUBLE),
        Feature(name="f_transaction_7d_count", type=ColumnType.LONG),
        Feature(name="f_transaction_amount_7d_sum", type=ColumnType.DOUBLE),
        Feature(name="f_transaction_amount_7d_avg", type=ColumnType.DOUBLE),
    ],
    feature_transformation=TransformationExpressionCollection(
        transformation_expressions=[
            WindowAggregation(
                feature_name="f_transaction_3d_count",
                source_column="transactionID",
                aggregation="count",
                window=DateTimeOffset(days=3)),
            WindowAggregation(
                feature_name="f_transaction_amount_3d_sum",
                source_column="transactionAmount",
                aggregation="sum",
                window=DateTimeOffset(days=3)),
            WindowAggregation(
                feature_name="f_transaction_amount_3d_avg",
                source_column="transactionAmount",
                aggregation="avg",
                window=DateTimeOffset(days=3)),
            WindowAggregation(
                feature_name="f_transaction_7d_count",
                source_column="transactionID",
                aggregation="count",
                window=DateTimeOffset(days=7)),
            WindowAggregation(
                feature_name="f_transaction_amount_7d_sum",
                source_column="transactionAmount",
                aggregation="sum",
                window=DateTimeOffset(days=7)),
            WindowAggregation(
                feature_name="f_transaction_amount_7d_avg",
                source_column="transactionAmount",
                aggregation="avg",
                window=DateTimeOffset(days=7)),
        ]
    )
)

dsl_feature_set_spec

In [None]:
from datetime import datetime
st = datetime(2020,1,1)
et = datetime(2023,6,1)

In [None]:
dsl_df = dsl_feature_set_spec.to_spark_dataframe(feature_window_start_date_time=st, feature_window_end_date_time=et)

Now we create a feature set spec with the same transformation defined using UDF

In [None]:
from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.contracts import (
    DateTimeOffset,
    FeatureSource,
    TransformationCode,
    Column,
    ColumnType,
    SourceType,
    TimestampColumn,
)

transactions_featureset_code_path = (
    root_dir + "/featurestore/featuresets/transactions/transformation_code"
)

udf_featureset_spec = create_feature_set_spec(
    source=FeatureSource(
        type=SourceType.parquet,
        path="wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet",
        timestamp_column=TimestampColumn(name="timestamp"),
    ),
    transformation_code=TransformationCode(
        path=transactions_featureset_code_path,
        transformer_class="transaction_transform.TransactionFeatureTransformer",
    ),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    infer_schema=True,
)

udf_featureset_spec

The transformation code is replicated below to show that the aggregation window is defined the same way.

```python
class TransactionFeatureTransformer(Transformer):
    def _transform(self, df: DataFrame) -> DataFrame:
        days = lambda i: i * 86400
        w_3d = (
            Window.partitionBy("accountID")
            .orderBy(F.col("timestamp").cast("long"))
            .rangeBetween(-days(3), 0)
        )
        w_7d = (
            Window.partitionBy("accountID")
            .orderBy(F.col("timestamp").cast("long"))
            .rangeBetween(-days(7), 0)
        )
        res = (
            df.withColumn("transaction_7d_count", F.count("transactionID").over(w_7d))
            .withColumn(
                "transaction_amount_7d_sum", F.sum("transactionAmount").over(w_7d)
            )
            .withColumn(
                "transaction_amount_7d_avg", F.avg("transactionAmount").over(w_7d)
            )
            .withColumn("transaction_3d_count", F.count("transactionID").over(w_3d))
            .withColumn(
                "transaction_amount_3d_sum", F.sum("transactionAmount").over(w_3d)
            )
            .withColumn(
                "transaction_amount_3d_avg", F.avg("transactionAmount").over(w_3d)
            )
            .select(
                "accountID",
                "timestamp",
                "transaction_3d_count",
                "transaction_amount_3d_sum",
                "transaction_amount_3d_avg",
                "transaction_7d_count",
                "transaction_amount_7d_sum",
                "transaction_amount_7d_avg",
            )
        )
        return res

```

In [None]:
udf_df = udf_featureset_spec.to_spark_dataframe(feature_window_start_date_time=st, feature_window_end_date_time=et)

We can compare the results and verify that they are the same between udf and dsl. Below we look at one account id values to compare the resulting data frame.

In [None]:
display(dsl_df.where(dsl_df.accountID == "A1899946977632390").sort("timestamp"))

In [None]:
display(udf_df.where(udf_df.accountID == "A1899946977632390").sort("timestamp"))

#### Export as feature set spec
Inorder to register the feature set spec with the feature store, it needs to be saved in a specific format. 
Action: Please inspect the generated `transactions` FeaturesetSpec: Open this file from the file tree to see the spec: `featurestore/featuresets/transactions/spec/FeaturesetSpec.yaml`

Spec contains these important elements:

1. `source`: reference to a storage. In this case a parquet file in a blob storage.
1. `features`: list of features and their datatypes.
1. `index_columns`: the join keys required to access values from the feature set

Learn more about it in the [top level feature store entities document](fs-concepts-todo) and the [feature set spec yaml reference](reference-yaml-featureset-spec.md).

The additional benefit of persisting it is that it can be source controlled.

In [None]:
dsl_spec_folder = root_dir  + "/featurestore/featuresets/transactions-dsl/spec"

dsl_feature_set_spec.dump(dsl_spec_folder)

In [None]:
from azure.ai.ml import MLClient
from azure.ai.ml.entities import (
    FeatureStore,
    FeatureStoreEntity,
    FeatureSet,
)
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
from azureml.featurestore import FeatureStoreClient

fs_client = MLClient(
    AzureMLOnBehalfOfCredential(), 
    featurestore_subscription_id, 
    featurestore_resource_group_name, 
    featurestore_name
)

featurestore = FeatureStoreClient(
    credential = AzureMLOnBehalfOfCredential(), 
    subscription_id = featurestore_subscription_id, 
    resource_group_name = featurestore_resource_group_name, 
    name = featurestore_name
)

#### Register `account` entity with the feature store
Create account entity that has join key `accountID` of `string` type. 

In [None]:
from azure.ai.ml.entities import DataColumn

account_entity_config = FeatureStoreEntity(
    name = "account",
    version = "1",
    index_columns = [
        DataColumn(name="accountID", type = "string")
    ]
)

poller = fs_client.feature_store_entities.begin_create_or_update(account_entity_config)
print(poller.result())

#### Create the feature set using the exported feature set spec

In [None]:
from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification, MaterializationSettings, MaterializationComputeResource

materialization_settings = MaterializationSettings(
    offline_enabled=True,
    resource=MaterializationComputeResource(instance_type="standard_e8s_v3"),
    spark_configuration={
        "spark.driver.cores": 4,
        "spark.driver.memory": "36g",
        "spark.executor.cores": 4,
        "spark.executor.memory": "36g",
        "spark.executor.instances": 2,
    },
    schedule=None,
)

fset_config = FeatureSet(
    name = "transactions-dsl",
    version = "1",
    entities = ["azureml:account:1"],
    stage = "Development",
    specification = FeatureSetSpecification(path = dsl_spec_folder),
    materialization_settings = materialization_settings,
    tags = {"data_type": "nonPII"}
)

poller = fs_client.feature_sets.begin_create_or_update(fset_config)
print(poller.result())

Materialize the feature set to persist the transformations to the offline store

In [None]:
poller = fs_client.feature_sets.begin_backfill(
    name="transactions-dsl",
    version="1",
    feature_window_start_time=st,
    feature_window_end_time=et,
)
print(poller.result().job_id)

In [None]:
# get the job URL, and stream the job logs (the back fill job could take 10+ minutes to complete)
fs_client.jobs.stream(poller.result().job_id)

Lets print sample data from the featureset. You can notice from the output information that the data was retrieved from the materilization store. `get_offline_features()` method that is used to retrieve training/inference data will also use the materialization store by default .

In [None]:
# look up the featureset by providing name and version
transactions_featureset = featurestore.feature_sets.get("transactions-dsl", "1")
display(transactions_featureset.to_spark_dataframe().head(5))

#### Generate a training dataframe using the registered featureset

Load observation data

We start by exploring the observation data. Observation data is typically the core data used in training and inference data. This is then joined with feature data to create the full training data. Observation data is the data captured during the time of the event: in this case it has core transaction data including transaction id, account id, transaction amount. In this case, since it is for training, it also has the target variable appended (is_fraud).

To learn more core concepts including observation data, refer to the docs

In [None]:
observation_data_path = "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/observation_data/train/*.parquet"
observation_data_df = spark.read.parquet(observation_data_path)
obs_data_timestamp_column = "timestamp"

display(observation_data_df)
# Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value

In [None]:
featureset = featurestore.feature_sets.get("transactions-dsl", "1")

# you can select features in pythonic way
features = [
    featureset.get_feature("f_transaction_amount_7d_sum"),
    featureset.get_feature("f_transaction_amount_7d_avg"),
]

# you can also specify features in string form: featurestore:featureset:version:feature
more_features = [
    "transactions-dsl:1:f_transaction_amount_3d_sum",
    "transactions-dsl:1:f_transaction_3d_count",
]

more_features = featurestore.resolve_feature_uri(more_features)
features.extend(more_features)

In [None]:
from azureml.featurestore import get_offline_features

training_df = get_offline_features(features=features, observation_data=observation_data_df, timestamp_column=obs_data_timestamp_column, query_mode="dsl")

display(training_df)