# Experiment

## Install Python dependencies

In [1]:
!pip install -q onnx onnxruntime tf2onnx

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
mypy-protobuf 3.6.0 requires protobuf>=4.25.3, but you have protobuf 3.20.3 which is incompatible.
feast 0.40.1 requires protobuf<5.0.0,>=4.24.0, but you have protobuf 3.20.3 which is incompatible.
codeflare-sdk 0.19.1 requires pydantic<2, but you have pydantic 2.9.2 which is incompatible.[0m[31m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


Import the dependencies for the model training code:

In [2]:
import datetime
import numpy as np
import onnx
import pandas as pd
import pickle
import tf2onnx

from keras.models import Sequential
from keras.layers import Dense, Dropout, BatchNormalization, Activation
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import class_weight
from pathlib import Path

2024-09-26 07:31:01.311457: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-26 07:31:01.311524: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-26 07:31:01.312922: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-09-26 07:31:01.320266: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


The output might show TensorFlow messages, such as a "Could not find TensorRT" warning. You can ignore these messages.


## Load the CSV data

The CSV data that you use to train the model contains the following fields:

* **distancefromhome** - The distance from home where the transaction happened.
* **distancefromlast_transaction** - The distance from the last transaction that happened.
* **ratiotomedianpurchaseprice** - The ratio of purchased price compared to median purchase price.
* **repeat_retailer** - If it's from a retailer that already has been purchased from before.
* **used_chip** - If the credit card chip was used.
* **usedpinnumber** - If the PIN number was used.
* **online_order** - If it was an online order.
* **fraud** - If the transaction is fraudulent.

## Install Feast project

In [11]:
!pip install -q --upgrade pip
!pip install -q feast
!pip install -q psycopg==3.2.2
!pip install -q psycopg-pool==3.2.3
!pip list | grep psyco
!feast version

psycopg                      3.2.2
psycopg-pool                 3.2.3
Feast SDK Version: "0.40.1"


In [12]:
# Forward Feast logs to the notebook output
import logging
import sys
from io import StringIO
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger()

In [13]:
%env FEAST_REPO=feast_fraud/feature_repo
%env ROOT_DIR=/opt/app-root/src/fraud-detection
!ls $FEAST_REPO

env: FEAST_REPO=feast_fraud/feature_repo
env: ROOT_DIR=/opt/app-root/src/fraud-detection
data  feature_store.yaml


In [14]:
!feast -c $FEAST_REPO apply
!feast -c $FEAST_REPO entities list
!feast -c $FEAST_REPO data-sources list
!feast -c $FEAST_REPO feature-views list

[1m[94mNo changes to registry
[1m[94mNo changes to infrastructure
NAME    DESCRIPTION    TYPE
NAME    CLASS
NAME    ENTITIES    TYPE


In [15]:
# common imports
from datetime import timedelta

import pandas as pd
import os

from feast import (
    # Entity,
    FeatureView,
    Field,
)
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
    PostgreSQLSource,
)
from feast.feature_store import FeatureStore
from feast.feature_logging import LoggingConfig
from feast.infra.offline_stores.file_source import FileLoggingDestination
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64

## Data preparation
~~1. Add entity key to the tables~~
1. Add event_ts field
1. Convert to parquet files

~~Since the original dataset was not considering the `customer` concept, we'll replicate the same setup: add a new field customer_id and apply the same value to each record.~~

In [17]:
# def add_customer_id(df):
#     df['customer_id'] = 1

def add_timestamps(df):
    # Create time series: one entry every 1H, up to now
    timestamps = pd.date_range(
        end=pd.Timestamp.now().replace(microsecond=0), 
        periods=len(df), 
        freq='1H').to_frame(name="ts", index=False)

    timestamps['created'] = timestamps['ts']
    df = pd.concat(objs=[df, timestamps], axis=1)
    columns = df.columns.tolist()
    columns.insert(0, columns.pop(9))
    columns.insert(0, columns.pop(9))
    return df[columns]

In [18]:
xtrain = pd.read_csv('data/train.csv')
xval = pd.read_csv('data/validate.csv')
xtest = pd.read_csv('data/test.csv')

# add_customer_id(xtrain)
# add_customer_id(xval)
# add_customer_id(xtest)

xtrain = add_timestamps(xtrain)
xval = add_timestamps(xval)
xtest = add_timestamps(xtest)

!rm data/*.parquet
xtrain.to_parquet('data/train.parquet')
xval.to_parquet('data/validate.parquet')
xtest.to_parquet('data/test.parquet')

print("-----xtrain-----")
xtrain.info()
print("-----len(xtrain)-----")
print(len(xtrain))
print("-----len(xval)-----")
print(len(xval))
print("-----len(xtest)-----")
print(len(xtest))


  timestamps = pd.date_range(
  timestamps = pd.date_range(
  timestamps = pd.date_range(


-----xtrain-----
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 600000 entries, 0 to 599999
Data columns (total 10 columns):
 #   Column                          Non-Null Count   Dtype         
---  ------                          --------------   -----         
 0   ts                              600000 non-null  datetime64[ns]
 1   created                         600000 non-null  datetime64[ns]
 2   distance_from_home              600000 non-null  float64       
 3   distance_from_last_transaction  600000 non-null  float64       
 4   ratio_to_median_purchase_price  600000 non-null  float64       
 5   repeat_retailer                 600000 non-null  float64       
 6   used_chip                       600000 non-null  float64       
 7   used_pin_number                 600000 non-null  float64       
 8   online_order                    600000 non-null  float64       
 9   fraud                           600000 non-null  float64       
dtypes: datetime64[ns](2), float64(8)
memory

In [19]:
!ls -lh data/*parquet

-rw-r--r--. 1 1001210000 1001210000 8.8M Sep 26 07:36 data/test.parquet
-rw-r--r--. 1 1001210000 1001210000  24M Sep 26 07:36 data/train.parquet
-rw-r--r--. 1 1001210000 1001210000 8.8M Sep 26 07:36 data/validate.parquet


## Define a SQL store
A Postgres service is deployed on the current namespace and DB tables are created and populated with data from the `xtrain`, `xval` and `xtest` data frames

In [21]:
namespace_path='/var/run/secrets/kubernetes.io/serviceaccount/namespace'
with open(namespace_path, "r") as file:
    current_namespace = file.read().strip()
print(f"Current namespace is {current_namespace}")
os.environ['CURRENT_NS'] = current_namespace
!echo $CURRENT_NS

Current namespace is dmartino-fraud-detection
dmartino-fraud-detection


### Deploy PostgreSQL from template
From the OpenShift console, create an instance of PostgreSQL database with the following options in the current namespace:
* DATABASE_SERVICE_NAME=postgresql 
* POSTGRESQL_USER=feast 
* POSTGRESQL_PASSWORD=feast
* POSTGRESQL_DATABASE=feast 
* VOLUME_CAPACITY=2Gi 
* MEMORY_LIMIT=1Gi

In [22]:
# Setup DB connection attributes
psqlHost = f'postgresql.{current_namespace}.svc.cluster.local'
psqlPort = 5432
psqlUsername = 'feast'
psqlPassword = 'feast'
psqlDb = 'feast'
psqlSchema = 'public'

In [40]:
%%time

# Load DataFrame to DB using `to_sql` method of pandas DataFrame
import psycopg
from sqlalchemy import create_engine, text, Table, MetaData, select, func
from sqlalchemy.exc import ProgrammingError

engine = create_engine(f'postgresql+psycopg://{psqlUsername}:{psqlPassword}@{psqlHost}:{str(psqlPort)}/{psqlDb}')

metadata = MetaData()
metadata.reflect(bind=engine)

for t in ['fraud_train', 'fraud_validate', 'fraud_test']:
    if t in metadata.tables:
        table = metadata.tables[t]
        with engine.connect() as connection:
            row_count = connection.execute(select(func.count()).select_from(text(t))).scalar()
            print(f"Deleting from {t}")
            print(f"Rows before deletion: {row_count}")
            connection.execute(table.delete())
            connection.commit()

print("Persisting xtrain...")
xtrain.to_sql('fraud_train', engine, if_exists='append', index=True, schema=psqlSchema)
print("Persisting xval...")
xval.to_sql('fraud_validate', engine, if_exists='append', index=True, schema=psqlSchema)
print("Persisting xtest...")
xtest.to_sql('fraud_test', engine, if_exists='append', index=True, schema=psqlSchema)

Deleting from fraud_train
Rows before deletion: 0
Deleting from fraud_validate
Rows before deletion: 0
Deleting from fraud_test
Rows before deletion: 0
Persisting xtrain...
Persisting xval...
Persisting xtest...


-1

In [41]:
# Validate row count
for t in ['fraud_train', 'fraud_validate', 'fraud_test']:
    if t in metadata.tables:
        table = metadata.tables[t]
        with engine.connect() as connection:
            row_count = connection.execute(select(func.count()).select_from(text(t))).scalar()
            print(f"Rows in {t}: {row_count}")

Rows in fraud_train: 600000
Rows in fraud_validate: 200000
Rows in fraud_test: 200000


## Define the Feature Store
* Map parquet files to `PostgreSQLSource`s
* Define FeatureViews for training purposes
....

**Note**: we cannot apply feature store definitions from the remote servers because of GH issue [4592: Remote apply](https://github.com/feast-dev/feast/issues/4529), so we use a direct connection to the DB

In [55]:
!cat $FEAST_REPO/feature_store.yaml

project: feast_fraud
registry:
    registry_type: sql
    path: postgresql+psycopg://feast:feast@postgresql:5432/feast
    cache_ttl_seconds: 60
    sqlalchemy_config_kwargs:
        echo: false
        pool_pre_ping: true
online_store:
    type: postgres
    host: postgresql
    port: 5432
    database: feast
    db_schema: public
    user: feast
    password: feast
offline_store:
    type: postgres
    host: postgresql
    port: 5432
    database: feast
    db_schema: public
    user: feast
    password: feast
entity_key_serialization_version: 2


In [51]:
# Initialize the store
store = FeatureStore(os.environ['FEAST_REPO'])
print(store.list_entities())

[]


In [52]:
%%time
# Create the PostgreSQLSource
train_source = PostgreSQLSource(
    name="train_source",
    query="SELECT * FROM fraud_train",
    timestamp_field="ts",
    created_timestamp_column="created",
)
validate_source = PostgreSQLSource(
    name="validate_source",
    query="SELECT * FROM fraud_validate",
    timestamp_field="ts",
    created_timestamp_column="created",
)
test_source = PostgreSQLSource(
    name="test_source",
    query="SELECT * FROM fraud_test",
    timestamp_field="ts",
    created_timestamp_column="created",
)
store.registry.apply_data_source(train_source, store.project)
store.registry.apply_data_source(validate_source, store.project)
store.registry.apply_data_source(test_source, store.project)
!feast -c $FEAST_REPO data-sources list

NAME             CLASS
train_source     <class 'feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source.PostgreSQLSource'>
validate_source  <class 'feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source.PostgreSQLSource'>
test_source      <class 'feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source.PostgreSQLSource'>
CPU times: user 38 ms, sys: 27.9 ms, total: 65.9 ms
Wall time: 3.17 s


In [53]:
# Customer entity
# customer = Entity(name="customer", join_keys=["customer_id"])
# store.registry.apply_entity(customer, store.project)
!feast -c $FEAST_REPO entities list

NAME    DESCRIPTION    TYPE


In [54]:
%%time
training_fv = FeatureView(
    name="training_fv",
    # entities=[customer],
    entities=[],
    ttl=timedelta(days=1),
    schema=[
        # Field(name="customer_id", dtype=Int64),
        Field(name="distance_from_last_transaction", dtype=Float64),
        Field(name="ratio_to_median_purchase_price", dtype=Float64),
        Field(name="used_chip", dtype=Float64),
        Field(name="used_pin_number", dtype=Float64),
        Field(name="online_order", dtype=Float64),
        Field(name="fraud", dtype=Float64),
    ],
    online=True,
    source=train_source,
    tags={"team": "training"},
)
validation_fv = FeatureView(
    name="validation_fv",
    # entities=[customer],
    entities=[],
    ttl=timedelta(days=1),
    schema=[
        # Field(name="customer_id", dtype=Int64),
        Field(name="distance_from_last_transaction", dtype=Float64),
        Field(name="ratio_to_median_purchase_price", dtype=Float64),
        Field(name="used_chip", dtype=Float64),
        Field(name="used_pin_number", dtype=Float64),
        Field(name="online_order", dtype=Float64),
        Field(name="fraud", dtype=Float64),
    ],
    online=True,
    source=validate_source,
    tags={"team": "training"},
)
test_fv = FeatureView(
    name="test_fv",
    # entities=[customer],
    entities=[],
    ttl=timedelta(days=1),
    schema=[
        # Field(name="customer_id", dtype=Int64),
        Field(name="distance_from_last_transaction", dtype=Float64),
        Field(name="ratio_to_median_purchase_price", dtype=Float64),
        Field(name="used_chip", dtype=Float64),
        Field(name="used_pin_number", dtype=Float64),
        Field(name="online_order", dtype=Float64),
        Field(name="fraud", dtype=Float64),
    ],
    online=True,
    source=test_source,
    tags={"team": "training"},
)
store.registry.apply_feature_view(training_fv, store.project)
store.registry.apply_feature_view(validation_fv, store.project)
store.registry.apply_feature_view(test_fv, store.project)
!feast -c $FEAST_REPO feature-views list

NAME           ENTITIES    TYPE
training_fv    n/a         FeatureView
validation_fv  n/a         FeatureView
test_fv        n/a         FeatureView
CPU times: user 29.5 ms, sys: 37 ms, total: 66.5 ms
Wall time: 3.16 s


## Start Feast services
A fully distributed Feast environment is deployed using Helm:
* Registry
* Online Store
* Offline Store

Run the following commands from a local clone of this git repo.

Generate base64 encoded feature configurations
```console
REGISTRY_CONFIG_BASE64=$(cat feast_fraud/feature_repo/registry_store.yaml | base64 -w0)
ONLINE_CONFIG_BASE64=$(cat feast_fraud/feature_repo/online_store.yaml | base64 -w0)
OFFLINE_CONFIG_BASE64=$(cat feast_fraud/feature_repo/offline_store.yaml | base64 -w0)
```

Initialize the image settings:
```console
FEAST_IMAGE_REPO=feastdev/feature-server
FEAST_IMAGE_VERSION=latest
```

Setup the Helm repository:
```console
helm repo add feast-charts https://feast-helm-charts.storage.googleapis.com
helm repo update
```

Login to the cluster and set the current project as the default.

Then run the following command to install the Registry server:
```console
helm upgrade --install feast-registry feast-charts/feast-feature-server \
--set fullnameOverride=registry-server --set feast_mode=registry \
--set image.repository=${FEAST_IMAGE_REPO} --set image.tag=${FEAST_IMAGE_VERSION} \
--set feature_store_yaml_base64=$REGISTRY_CONFIG_BASE64

oc wait --for=condition=available deployment/registry-server --timeout=2m
```

Run the following command to install the Offline server:
```console
helm upgrade --install feast-offline feast-charts/feast-feature-server \
--set fullnameOverride=offline-server --set feast_mode=offline \
--set image.repository=${FEAST_IMAGE_REPO} --set image.tag=${FEAST_IMAGE_VERSION} \
--set feature_store_yaml_base64=$OFFLINE_CONFIG_BASE64

oc wait --for=condition=available deployment/offline-server --timeout=2m
```

Run the following command to install the Online server:
```console
helm upgrade --install feast-online feast-charts/feast-feature-server \
--set fullnameOverride=online-server --set feast_mode=online \
--set image.repository=${FEAST_IMAGE_REPO} --set image.tag=${FEAST_IMAGE_VERSION} \
--set feature_store_yaml_base64=$ONLINE_CONFIG_BASE64

oc wait --for=condition=available deployment/online-server --timeout=2m
```


## Model training

In [None]:
# import subprocess

# # Run feast serve in the background
# feast_offline_server_process = subprocess.Popen(["feast", "-c", os.environ['FEAST_REPO'], "serve_offline"])

In [None]:
# feast_offline_server_process.terminate()

In [None]:
!ps -ef | grep 'serve_offline'
!kill -9 788

In [None]:
store = FeatureStore("feast_fraud/client")
store.list_entities()

In [None]:
datetimes = xtrain['ts'].dt.to_pydatetime().tolist()
datetimes=datetimes[:10]
len(datetimes)
# del xtrain
# del xval
# del xtest

In [None]:
# datetimes=datetimes[:5]
# len(datetimes)

In [None]:
# Fetch historical data
entity_df = pd.DataFrame.from_dict(
    {
        # "customer_id": [1] * len(datetimes),
        "event_timestamp": datetimes,
    }
)

In [None]:
# fv = store.registry.get_feature_view('training', store.project)
# fv

In [None]:
# %%timeit -r 1 -n 1
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "training:distance_from_last_transaction",
        "training:ratio_to_median_purchase_price",
        "training:used_chip",
        "training:used_pin_number",
        "training:online_order",
        "training:fraud",
    ]
).to_df()
training_df.head()

In [None]:
training_df

In [None]:
!find . -name train.parquet
!pwd

In [None]:
# Set the input (X) and output (Y) data. 
# The only output data is whether it's fraudulent. All other fields are inputs to the model.

feature_indexes = [
    1,  # distance_from_last_transaction
    2,  # ratio_to_median_purchase_price
    4,  # used_chip
    5,  # used_pin_number
    6,  # online_order
]

label_indexes = [
    7  # fraud
]

X_train =apd.read_csv('data/train.csv')
y_train = X_train.iloc[:, label_indexes]
X_train = X_train.iloc[:, feature_indexes]

X_val = pd.read_csv('data/validate.csv')
y_val = X_val.iloc[:, label_indexes]
X_val = X_val.iloc[:, feature_indexes]

X_test = pd.read_csv('data/test.csv')
y_test = X_test.iloc[:, label_indexes]
X_test = X_test.iloc[:, feature_indexes]


# Scale the data to remove mean and have unit variance. The data will be between -1 and 1, which makes it a lot easier for the model to learn than random (and potentially large) values.
# It is important to only fit the scaler to the training data, otherwise you are leaking information about the global distribution of variables (which is influenced by the test set) into the training set.

scaler = StandardScaler()

X_train = scaler.fit_transform(X_train.values)

Path("artifact").mkdir(parents=True, exist_ok=True)
with open("artifact/test_data.pkl", "wb") as handle:
    pickle.dump((X_test, y_test), handle)
with open("artifact/scaler.pkl", "wb") as handle:
    pickle.dump(scaler, handle)

# Since the dataset is unbalanced (it has many more non-fraud transactions than fraudulent ones), set a class weight to weight the few fraudulent transactions higher than the many non-fraud transactions.
class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train.values.ravel())
class_weights = {i : class_weights[i] for i in range(len(class_weights))}

## Build the model

The model is a simple, fully-connected, deep neural network, containing three hidden layers and one output layer.

In [None]:
model = Sequential()
model.add(Dense(32, activation = 'relu', input_dim = len(feature_indexes)))
model.add(Dropout(0.2))
model.add(Dense(32))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(32))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(1, activation = 'sigmoid'))
model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['accuracy'])
model.summary()

## Train the model

Training a model is often the most time-consuming part of the machine learning process.  Large models can take multiple GPUs for days.  Expect the training on CPU for this very simple model to take a minute or more.

In [None]:
# Train the model and get performance
import os
import time

start = time.time()
epochs = 2
history = model.fit(X_train, y_train, epochs=epochs, \
                    validation_data=(scaler.transform(X_val.values),y_val), \
                    verbose = True, class_weight = class_weights)
end = time.time()
print(f"Training of model is complete. Took {end-start} seconds")

## Save the model file

In [None]:
# Save the model as ONNX for easy use of ModelMesh
model_proto, _ = tf2onnx.convert.from_keras(model)
os.makedirs("models/fraud/1", exist_ok=True)
onnx.save(model_proto, "models/fraud/1/model.onnx")

The output might include TensorFlow messages related to GPUs. You can ignore these messages.

## Confirm the model file was created successfully

The output should include the model name, size, and date. 

In [None]:
! ls -alRh ./models/

## Test the model

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import numpy as np
import pickle
import onnxruntime as rt

Load the test data and scaler:

In [None]:
with open('artifact/scaler.pkl', 'rb') as handle:
    scaler = pickle.load(handle)
with open('artifact/test_data.pkl', 'rb') as handle:
    (X_test, y_test) = pickle.load(handle)

Create an ONNX inference runtime session and predict values for all test inputs:

In [None]:
sess = rt.InferenceSession("models/fraud/1/model.onnx", providers=rt.get_available_providers())
input_name = sess.get_inputs()[0].name
output_name = sess.get_outputs()[0].name
y_pred_temp = sess.run([output_name], {input_name: scaler.transform(X_test.values).astype(np.float32)}) 
y_pred_temp = np.asarray(np.squeeze(y_pred_temp[0]))
threshold = 0.95
y_pred = np.where(y_pred_temp > threshold, 1, 0)

Show the results:

In [None]:
from sklearn.metrics import precision_score, recall_score, confusion_matrix, ConfusionMatrixDisplay
import numpy as np

y_test_arr = y_test.to_numpy().squeeze()
correct = np.equal(y_pred, y_test_arr).sum().item()
acc = (correct / len(y_pred)) * 100
precision = precision_score(y_test_arr, np.round(y_pred))
recall = recall_score(y_test_arr, np.round(y_pred))

print(f"Eval Metrics: \n Accuracy: {acc:>0.1f}%, "
      f"Precision: {precision:.4f}, Recall: {recall:.4f} \n")

c_matrix = confusion_matrix(y_test_arr, y_pred)
ConfusionMatrixDisplay(c_matrix).plot()

## Example: Is Sally's transaction likely to be fraudulent?

Here is the order of the fields from Sally's transaction details:
* distance_from_last_transaction
* ratio_to_median_price
* used_chip 
* used_pin_number
* online_order 

In [None]:
sally_transaction_details = [
    [0.3111400080477545,
    1.9459399775518593, 
    1.0, 
    0.0, 
    0.0]
    ]
prediction = sess.run([output_name], {input_name: scaler.transform(sally_transaction_details).astype(np.float32)})

print("Is Sally's transaction predicted to be fraudulent? (true = YES, false = NO) ")
print(np.squeeze(prediction) > threshold)

print("How likely was Sally's transaction to be fraudulent? ")
print("{:.5f}".format(np.squeeze(prediction)) + "%")