In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS workspace.ml_training

# Feature Engineering

Before we save features to a feature table we need to create features that we are interested in. Feature
selection criteria depend on your project goals and business problem. Thus, in this section, we will pick some
features, however, it doesn't necessarily mean that these features are significant for our purpose.

**One important point is that you need to exclude the target field from the feature table and you need to
define a primary key for the table.**

## Load Dataset

Typically, first, you will need to conduct data pre-processing and select features. As we covered data pre-
processing and feature preparation, we will load a clean dataset which you would typically load from a
`silver` table.

Let's load in our dataset from a CSV file containing Telco customer churn data from the specified path using
Apache Spark.** In this dataset the target column will be Churn and primary key will be customerID .**


In [0]:
df = spark.sql('select * from samples.tpch.customer')

In [0]:
display(df)

## Save Features to Feature Table

Let's start creating a Feature Engineering Client so we can populate our feature store.

In [0]:
%pip install databricks-feature-engineering
dbutils.library.restartPython()

In [0]:
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()

help(fe.create_table)

## Create Feature Table

Next, we can create the Feature Table using the create_table method.

This method takes a few parameters as inputs:
. `name` - A feature table name of the form` <catalog> .< schema_name>. <table_name>`

. `primary_keys` - The primary key(s). If multiple columns are required, specify a list of column names.

. `timestamp_col` - [OPTIONAL] any timestamp column which can be used for point-in-time lookup.

. `df` - Data to insert into this feature table. The schema of features_df will be used as the feature table
schema.

. `schema` - Feature table schema. Note that either schema or features_df must be provided.

· `description` - Description of the feature table

· `partition_columns` - Column(s) used to partition the feature table.

· `tags` - Tag(s) to tag feature table

In [0]:
table_name = 'workspace.ml_training.features_customer_tpch'

fe.create_table(
    name = table_name,
    primary_keys=['c_custkey'],
    df = df,
    # partition_columns=
    description = 'TPCH Customers Features',
    tags = {'source':'samples', 'format':'delta'}
)

Alternatively, you can create_table with schema only (without df), and populate data to the feature table
with fe.write_table. fe.write_table has merge mode ONLY (to overwrite, we should drop and then re-
create the table).

Example:



In [0]:
#  One time creation
# fs.create_table(
#     name=table_name,
#     primary_keys=["index"],
#     schema=telco_df.schema,
#     description="Original Telco data (Silver)"
# )

#  Repeated/Scheduled writes
# fs.write_table(
#     name=table_name,
#     df=telco_df,
#     mode="merge"
# )

## Load Feature Table

We can also look at the metadata of the feature store via the FeatureStore client by using `get_table()` . As
feature table is a Delta table we can load it with Spark as normally we do for other tables.

In [0]:
ft = fe.get_table(name=table_name)
print(f'Feature Table description: {ft.description}')
print(ft.features)

In [0]:
display(fe.read_table(name=table_name))

## Update Feature Table

In some cases we might need to update an existing feature table by adding new features or deleting existing
features. In this section, we will show to make these type of changes.

### Add a New Feature

To illustrate adding a new feature, let's redefine an existing one. In this case, we'll transform the tenure
column by categorizing it into three groups: `short`, `mid`, and `long` , representing different tenure durations.

Then we will write the dataset back to the feature table. The important parameter is the mode parameter,
which we should set to "`merge`" .

In [0]:
import pyspark.sql.functions as f

df_updated = df.withColumn('c_acctbal_group', f.when(
    f.col('c_acctbal') >= 5000 , "long"
).otherwise(
    "short"
))

Selecting relevant columns. Use an appropriate mode (e.g., "merge") and display the written table for
validation.

In [0]:
fe.write_table(
  name=table_name,
  df=df_updated.select('c_custkey', 'c_acctbal_group'),
  mode='merge'
)

## Delete Existing Feature

To remove a feature column from the table you can just drop the column. Let's drop the original tenure
column.

**Note:** We need to set Delta read and write protocal version manually to support column mapping. If you
want to learn more about this you can check related documentation page.

In [0]:
%sql 
ALTER TABLE workspace.ml_training.features_customer_tpch SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name', 'delta.minReaderVersion'='2', 'delta.minWriterVersion'='5');
ALTER TABLE workspace.ml_training.features_customer_tpch DROP COLUMNS (c_acctbal)

## Read Feature Table by Version

As feature tables are based on Delta tables, we get all nice features of Delta including versioning. To
demonstrate this, let's read from a snapshot of the feature table.

In [0]:
timestamp_v3 = spark.sql(f'DESCRIBE HISTORY {table_name}').orderBy('version').collect()[2].timestamp
print(timestamp_v3)

In [0]:
df_v3 = (
  spark
  .read
  .option('timestampAsOf', timestamp_v3)
  .table(table_name)
)

display(df_v3)

In [0]:
feature_df_v3 = fe.read_table(name=table_name, as_of_delta_timestamp=timestamp_v3)
feature_df_v3.printSchema()

## Create a Feature Table from Existing UC Table

Alter/Change existing UC table to become a feature table Add a primary key (PK) with non-null constraint
(with timestamp if applicable) on any UC table to turn it into a feature table (more info here)

In this example, we have a table created in the beginning of the demo which contains security features. Let's
convert this delta table to a feature table.

For this, we need to do these two changes;

1. Set primary key columns to NOT NULL .

2. Alter the table to add the Primary Key Constraint

In [0]:
df_parts = spark.sql('select * from samples.tpch.part')
df_parts.write.format('delta').mode('overwrite').saveAsTable('workspace.ml_training.features_parts')
display(df_parts)

In [0]:
%sql
ALTER TABLE workspace.ml_training.features_parts ALTER COLUMN p_partkey SET NOT NULL;
ALTER TABLE workspace.ml_training.features_parts ADD CONSTRAINT parts_features_pk_constraint PRIMARY KEY (p_partkey);