# Data Processing and Feature Engineering

In [0]:
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import pandas_udf, col, when, lit

from datetime import datetime

import pyspark.sql.functions as F
from databricks.sdk import WorkspaceClient
from databricks.feature_engineering import FeatureEngineeringClient

**Defining the catalog and schema to use**

In [0]:
%%sql

use catalog main;

use schema dbdemos_mlops;

In [0]:
# loadig the dataset
telcoDF = spark.table('advanced_churn_bronze_customers')
display(telcoDF.limit(10))

## Define Featurization Logic(s) for BATCH feature computation

In [0]:
def compute_service_features(inputDF: SparkDataFrame) -> SparkDataFrame:
  """
    Creates a new column 'num_optional_services' that counts the number of optional services
  """

  @pandas_udf('double')
  def num_optional_services(*cols):
    # Nested helper function to count the number of optional services in a pandas dataframe
    return sum(map(lambda s: (s == "Yes").astype('double'), cols))

  return inputDF.\
    withColumn("num_optional_services",
        num_optional_services("online_security", "online_backup", "device_protection", "tech_support", "streaming_tv", "streaming_movies"))

In [0]:
def clean_churn_features(dataDF: SparkDataFrame) -> SparkDataFrame:
  """
    Simple cleaning function
  """

  pandas_data_df = dataDF.pandas_api()

  pandas_data_df = pandas_data_df.astype({'senior_citizen': 'string'})
  pandas_data_df['senior_citizen'] = pandas_data_df['senior_citizen'].map({'1':'Yes', '0':'No'})

  pandas_data_df['total_charges'] = pandas_data_df['total_charges'].apply(lambda x: float(x) if x.strip() else 0)

  pandas_data_df = pandas_data_df.fillna({
    'tenure': 0.0,
    'monthly_charges': 0.0,
    'total_charges': 0.0
  })

  clean_df = pandas_data_df.to_spark()
  clean_df = clean_df.withMetadata('customer_id', {'spark.contentAnnotation.semanticType': 'native'})
  clean_df = clean_df.withMetadata("num_optional_services", {"spark.contentAnnotation.semanticType":"numeric"})

  return clean_df

### Write to Feature Store


In [0]:
current_time = datetime.now().timestamp()
churn_features_n_predsDF = clean_churn_features(compute_service_features(telcoDF)).withColumn('transaction_ts', lit(current_time).cast('timestamp'))

### Create train, val, and test categorical labels for each instance

will be used by model validation jobs


In [0]:
train_ratio, val_ration, test_ratio = 0.7, 0.2, 0.1

(
  churn_features_n_predsDF.select('customer_id', 'transaction_ts', 'churn')
  .withColumn('random', F.rand(seed=42))
  .withColumn('split', 
              F.when(F.col('random') < train_ratio, 'train')
              .when(F.col('random') < train_ratio + val_ration, 'val')
              .otherwise('test')
              )
  .drop('random')
  .write.mode('overwrite')
  .format('delta')
  .saveAsTable('advanced_churn_label_table')
)

churn_featuresDF = churn_features_n_predsDF.drop('churn')

In [0]:
%sql
-- add primary key constraint to the label for feature lookup

alter table advanced_churn_label_table drop constraint if exists advanced_churn_label_table_pk;
alter table advanced_churn_label_table alter column customer_id set not null;
alter table advanced_churn_label_table alter column transaction_ts SET NOT NULL;
alter table advanced_churn_label_table add constraint advanced_churn_label_table_pk PRIMARY KEY(customer_id, transaction_ts);

### write the feature table to unity catalog

In [0]:
# delete online feature table if it exists
w = WorkspaceClient()

try:
  online_table_specs = w.online_tables.get('main.dbdemos_mlops.advanced_churn_feature_table_online_table')
  # drop existing online feature tables
  w.online_tables.delete(name='main.dbdemos_mlops.advanced_churn_feature_table_online_table')
  print("Deleted existing online feature table main.dbdemos_mlops.advanced_churn_feature_table_online_table")

except Exception as e:
  print("No existing online feature table found")

In [0]:
%sql

DROP TABLE IF EXISTS advanced_churn_feature_table;

In [0]:
# create feature table
fe = FeatureEngineeringClient()

churn_feature_table = fe.create_table(
    name='advanced_churn_feature_table',
    primary_keys=['customer_id', 'transaction_ts'],
    schema=churn_featuresDF.schema,
    timeseries_column='transaction_ts',
)


fe.write_table(
    name='advanced_churn_feature_table',
    df=churn_featuresDF,
    mode='merge'
)

### Define Featurization Logic for on-demand feature functions
On-deman function to calculate new feature during inference

In [0]:
%sql

CREATE OR REPLACE FUNCTION avg_price(monthly_charges_in DOUBLE, tenure_in DOUBLE, total_charges_in DOUBLE)
RETURNS FLOAT
LANGUAGE PYTHON
AS $$
if tenure_in > 0:
  return monthly_charges_in - total_charges_in / tenure_in
else:
  return 0
$$