# Install our dependencies

In [1]:
!pip install featureform mlflow scikit-learn --upgrade


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


# Create a connection to Featureform

Featureform is running locally via our docker container. We need to create an insecure connection to it, as our container isn't setup with SSL enabled by default. The API server runs on port 7878, and the dashboard server runs at port 80.

In [2]:
import featureform as ff

client = ff.Client(host="localhost:7878", insecure=True)



# Retrieve our relevent resources

We created our Postgres and Redis configurations earlier via our definitions.py file. This retrieves those resources from Featureform so we can use them and build off of them.

In [6]:
postgres = ff.get_postgres("postgres-quickstart")
redis = ff.get_redis("redis-quickstart")
transactions = ff.get_source("transactions", "raw")

# Everything in Featureform can be retrieved as a Dataframe

In this case, we're getting our transactions table (variant raw) as a Pandas dataframe. We also support Spark dataframes when using Spark. Every data resource in Featureform can be retrieved as a Dataframe.

In [7]:
client.dataframe(transactions, limit=1000)

Output()

Applying Run: 2024-02-06t08-40-14



Unnamed: 0,"""transactionid""","""customerid""","""customerdob""","""custlocation""","""custaccountbalance""","""transactionamount""","""timestamp""","""isfraud"""
0,T1,C5841053,10/1/94,JAMSHEDPUR,17819.05,25.00,2022-04-09T11:33:09Z,False
1,T2,C2142763,4/4/57,JHAJJAR,2270.69,27999.00,2022-03-27T01:04:21Z,False
2,T3,C4417068,26/11/96,MUMBAI,17874.44,459.00,2022-04-07T00:48:14Z,False
3,T4,C5342380,14/9/73,MUMBAI,866503.21,2060.00,2022-04-14T07:56:59Z,True
4,T5,C9031234,24/3/88,NAVI MUMBAI,6714.43,1762.50,2022-04-13T07:39:19Z,False
...,...,...,...,...,...,...,...,...
995,T998,C1025072,25/6/94,DELHI,4356.19,400.00,2022-04-11T05:27:40Z,True
996,T999,C7439786,7/11/95,KASHIPUR,1.92,23.00,2022-04-03T08:04:13Z,True
997,T1000,C5734042,2/2/88,UDAIPUR,57477.29,1269.00,2022-04-10T23:13:14Z,False
998,T1001,C1225952,15/8/80,AMBALA,161289.86,395.00,2022-04-15T08:45:30Z,False


# Creating our first transformations!

In [8]:
@postgres.sql_transformation(inputs=[transactions])
def perc_balance(tbl):
  """ The transaction amount divided by the customer account balance.
  It handles NaNs and Inf by defaulting to 100% if the denominator is 0.
  """
  return """SELECT *,
       CASE
           WHEN custaccountbalance != 0 THEN transactionamount / custaccountbalance
           ELSE 0.0
       END AS balancepercent
FROM {{tbl}};"""
client.dataframe(perc_balance, limit=1000)

Output()

Applying Run: 2024-02-06t08-40-31
Creating source perc_balance 2024-02-06t08-40-31



Unnamed: 0,"""transactionid""","""customerid""","""customerdob""","""custlocation""","""custaccountbalance""","""transactionamount""","""timestamp""","""isfraud""","""balancepercent"""
0,T1,C5841053,10/1/94,JAMSHEDPUR,17819.05,25.00,2022-04-09T11:33:09Z,False,0.001403
1,T2,C2142763,4/4/57,JHAJJAR,2270.69,27999.00,2022-03-27T01:04:21Z,False,12.330613
2,T3,C4417068,26/11/96,MUMBAI,17874.44,459.00,2022-04-07T00:48:14Z,False,0.025679
3,T4,C5342380,14/9/73,MUMBAI,866503.21,2060.00,2022-04-14T07:56:59Z,True,0.002377
4,T5,C9031234,24/3/88,NAVI MUMBAI,6714.43,1762.50,2022-04-13T07:39:19Z,False,0.262494
...,...,...,...,...,...,...,...,...,...
995,T998,C1025072,25/6/94,DELHI,4356.19,400.00,2022-04-11T05:27:40Z,True,0.091823
996,T999,C7439786,7/11/95,KASHIPUR,1.92,23.00,2022-04-03T08:04:13Z,True,11.979167
997,T1000,C5734042,2/2/88,UDAIPUR,57477.29,1269.00,2022-04-10T23:13:14Z,False,0.022078
998,T1001,C1225952,15/8/80,AMBALA,161289.86,395.00,2022-04-15T08:45:30Z,False,0.002449


In [9]:
@postgres.sql_transformation(inputs=[transactions])
def window_aggs(tbl):
  """ Adds two columns: mean and count.
  mean is the average transaction amount of the last 30 transactions at the time of the transaction in the row.
  count is the number of transactions if that number is less than 30.
  """
  return """SELECT *,
       AVG(transactionamount) OVER (PARTITION BY customerid ORDER BY timestamp ROWS BETWEEN 29 PRECEDING AND CURRENT ROW) AS rollingmean,
       COUNT(transactionamount) OVER (PARTITION BY customerid ORDER BY timestamp ROWS BETWEEN 29 PRECEDING AND CURRENT ROW) AS rollingcount
FROM {{tbl}}
ORDER BY timestamp;"""

client.dataframe(window_aggs, limit=1000)

Output()

Applying Run: 2024-02-06t08-40-36
Creating source window_aggs 2024-02-06t08-40-36



Unnamed: 0,"""transactionid""","""customerid""","""customerdob""","""custlocation""","""custaccountbalance""","""transactionamount""","""timestamp""","""isfraud""","""rollingmean""","""rollingcount"""
0,T3852,C5124051,1/1/1800,PATIALA,131981.39,500.0,2022-03-25T22:22:52Z,True,500.0,1
1,T2174,C1173037,5/3/84,BANGALORE,19357.86,592.0,2022-03-25T22:25:59Z,True,592.0,1
2,T5306,C2623913,18/1/78,GURGAON,204863.48,2376.0,2022-03-25T22:30:38Z,False,2376.0,1
3,T9897,C8718541,19/6/87,VADODARA,199291.43,2234.0,2022-03-25T22:32:44Z,True,2234.0,1
4,T5765,C2132853,30/11/92,RANGA REDDY,6354.44,298.0,2022-03-25T22:40:31Z,True,298.0,1
...,...,...,...,...,...,...,...,...,...,...
995,T756,C3027447,30/6/87,DELHI,20247.07,550.0,2022-03-28T23:52:17Z,True,550.0,1
996,T6111,C6326571,21/3/85,KOTHANUR BANGALORE,21368.44,392.0,2022-03-28T23:55:29Z,False,392.0,1
997,T33,C1889073,25/2/85,BANGALORE,57791.69,760.0,2022-03-29T00:05:17Z,False,760.0,1
998,T2834,C3031174,13/1/93,MUMBAI,11852.45,831.0,2022-03-29T00:05:56Z,True,831.0,1


# Creating our Features and Training Set

In [13]:
@ff.entity
class User:
    is_fraud = ff.Label(transactions[["customerid", "isfraud", "timestamp"]], type=ff.Bool)
    avg_trans = ff.Feature(window_aggs[["customerid", "rollingmean", "timestamp"]], type=ff.Float32, inference_store=redis)
    balance = ff.Feature(transactions[["customerid", "custaccountbalance", "timestamp"]], variant="simple", type=ff.Float32, inference_store=redis)
    perc = ff.Feature(perc_balance[["customerid", "balancepercent", "timestamp"]], type=ff.Float32, inference_store=redis)

ts = ff.register_training_set("fraud", variant="basic", label=User.is_fraud, features=[User.avg_trans, User.perc])

client.apply()

Applying Run: 2024-02-06t08-42-24
Creating entity user
Creating feature avg_trans 2024-02-06t08-42-24
Looks like an equivalent feature variant already exists, going to use its variant:  2024-02-06t08-40-42
Creating feature balance simple
Looks like an equivalent feature variant already exists, going to use its variant:  simple
Creating feature perc 2024-02-06t08-42-24
Looks like an equivalent feature variant already exists, going to use its variant:  2024-02-06t08-42-16
Creating label is_fraud 2024-02-06t08-42-24
Looks like an equivalent label variant already exists, going to use its variant:  2024-02-06t08-40-42


Output()

Creating training-set fraud basic



# Training with MLFlow

In [14]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from datetime import datetime
import mlflow

current_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
model_name = "ff-mlflow-workshop-xgboost-" + current_time
ts_name = "fraud"
ts_variant = "basic"
random_state = 42
test_size=0.2
ff_url = f"internal-sandbox.featureform.com/training-sets/{ts_name}?variant={ts_variant}"
ts = client.training_set(ts_name, ts_variant, model=model_name).dataframe()
print(ts)

mlflow.set_tracking_uri("http://localhost:8000")

X = ts.drop('label', axis=1)
y = ts['label']
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
with mlflow.start_run() as run:
    # Create and train RandomForest model
    rf = RandomForestClassifier()
    rf.fit(X_train, y_train)

    # Make predictions
    y_pred = rf.predict(X_test)

    # Calculate accuracy
    accuracy = accuracy_score(y_test, y_pred)
    print("Accuracy:", accuracy)

    # Log parameters, metrics, and model to MLflow
    mlflow.log_param("training_set_name", ts_name)
    mlflow.log_param("training_set_variant", ts_variant)
    mlflow.log_param("featureform_url", ff_url)
    mlflow.log_param("random_state", random_state)
    mlflow.log_param("test_size", test_size)
    mlflow.log_param("model_type", "sklearn.ensemble.RandomForestClassifier")
    mlflow.log_metric("accuracy", accuracy)
    mlflow.sklearn.log_model(rf, "model")
    run_id = run.info.run_id
    model_uri = f"runs:/{run_id}/model"
    mlflow.register_model(model_uri, model_name)

      feature__avg_trans__2024-02-06t08-40-42  \
0                                       25.00   
1                                    27999.00   
2                                      459.00   
3                                     2060.00   
4                                     1762.50   
...                                       ...   
9976                                    50.00   
9977                                   444.00   
9978                                   480.00   
9979                                   271.37   
9980                                  1878.20   

      feature__perc__2024-02-06t08-42-16  label  
0                               0.001403  False  
1                              12.330613  False  
2                               0.025679  False  
3                               0.002377   True  
4                               0.262494  False  
...                                  ...    ...  
9976                            0.003979  False  
9977       

Successfully registered model 'ff-mlflow-workshop-xgboost-2024-02-06-08-43-04'.
2024/02/06 08:43:08 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: ff-mlflow-workshop-xgboost-2024-02-06-08-43-04, version 1
Created version '1' of model 'ff-mlflow-workshop-xgboost-2024-02-06-08-43-04'.


# Serving in Production with MLFlow and OnDemandFeatures

In [15]:
@ff.ondemand_feature(variant="simple")
def ondemand_percent(client, params, entities):
  import featureform as ff
  return params["TransactionAmount"] / client.features([("balance", "simple")], entities=entities)[0]

client.apply()

Output()

Applying Run: 2024-02-06t08-42-52
Creating ondemand_feature ondemand_percent simple



In [16]:
import mlflow
f = client.features([User.avg_trans, ("ondemand_percent", "simple")],
                params={"TransactionAmount": 1000.0},
                entities={"user": "C5124051"})
print(f)
loaded_model = mlflow.pyfunc.load_model(f"models:/{model_name}/latest")
loaded_model.predict([f])

[500.0, 0.007576825757513873]




Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]



array([False])

E0206 08:46:23.989275000 6214283264 chttp2_transport.cc:1290]          ipv6:%5B::1%5D:7878: Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings". Current keepalive time (before throttling): 60000ms
