# IEEE Fraud detection adversarial validation [sklearn, lgbm]
* IEEE Fraud detection train/test data binary classification task.
* Reference notebook: <https://www.kaggle.com/code/jtrotman/ieee-fraud-adversarial-lgb-split-points/notebook>
* Dataset: <https://www.kaggle.com/code/jtrotman/ieee-fraud-adversarial-lgb-split-points/input>

# Quickstart

By running this notebook, you’ll create a whole test suite in a few lines of code. The model used here is a LGBM classification model with the IEEE fraud detection dataset. Feel free to use your own model (tabular, text, or LLM).

You’ll learn how to:
* Detect vulnerabilities by scanning the model
* Generate a test suite with domain-specific tests
* Customize your test suite by loading a test from the Giskard catalog
* Upload your model to the Giskard server to:
* Compare models to decide which one to promote
* Debug your tests to diagnose issues
* Share your results and collect business feedback from your team

## Install Giskard

In [None]:
!pip install giskard

## Import libraries

In [None]:
import os
from pathlib import Path
from urllib.request import urlretrieve

import numpy as np
import pandas as pd
from lightgbm import LGBMClassifier
from sklearn.metrics import roc_auc_score
from pandas.api.types import union_categoricals
from sklearn.model_selection import train_test_split

import giskard
from giskard import GiskardClient, testing
from giskard import Dataset, Model

## Define constants

In [None]:
# Constants.
TARGET_COLUMN = 'isTest'
IDX_LABEL = 'TransactionID'

# Paths.
DATA_URL = os.path.join("ftp://sys.giskard.ai", "pub", "unit_test_resources", "fraud_detection_classification_dataset", "{}")
DATA_PATH = Path.home() / ".giskard" / "fraud_detection_classification_dataset"

## Data loading and preprocessing

In [None]:
def fetch_from_ftp(url: str, file: Path) -> None:
    """Helper to fetch data from the FTP server."""
    if not file.parent.exists():
        file.parent.mkdir(parents=True, exist_ok=True)

    if not file.exists():
        print(f"Downloading data from {url}")
        urlretrieve(url, file)

    print(f"Data was loaded!")

In [None]:
def fetch_dataset():
    files_to_fetch = ["train_transaction.csv", "train_identity.csv", "test_transaction.csv", "test_identity.csv"]
    for file_name in files_to_fetch:
        fetch_from_ftp(DATA_URL.format(file_name), DATA_PATH / file_name)

In [None]:
# Define data-types of transactions features.
DATA_TYPES_TRANSACTION = {
    'TransactionID': 'int32',
    'isFraud': 'int8',
    'TransactionDT': 'int32',
    'TransactionAmt': 'float32',
    'ProductCD': 'category',
    'card1': 'int16',
    'card2': 'float32',
    'card3': 'float32',
    'card4': 'category',
    'card5': 'float32',
    'card6': 'category',
    'addr1': 'float32',
    'addr2': 'float32',
    'dist1': 'float32',
    'dist2': 'float32',
    'P_emaildomain': 'category',
    'R_emaildomain': 'category',
}

C_COLS = [f'C{i}' for i in range(1, 15)]
D_COLS = [f'D{i}' for i in range(1, 16)]
M_COLS = [f'M{i}' for i in range(1, 10)]
V_COLS = [f'V{i}' for i in range(1, 340)]

DATA_TYPES_TRANSACTION.update((c, 'float32') for c in C_COLS)
DATA_TYPES_TRANSACTION.update((c, 'float32') for c in D_COLS)
DATA_TYPES_TRANSACTION.update((c, 'float32') for c in V_COLS)
DATA_TYPES_TRANSACTION.update((c, 'category') for c in M_COLS)

In [None]:
# Define datatypes of identity features.
DATA_TYPES_ID = {
    'TransactionID': 'int32',
    'DeviceType': 'category',
    'DeviceInfo': 'category',
}

ID_COLS = [f'id_{i:02d}' for i in range(1, 39)]
ID_CATS = [
    'id_12', 'id_15', 'id_16', 'id_23', 'id_27', 'id_28', 'id_29', 'id_30',
    'id_31', 'id_33', 'id_34', 'id_35', 'id_36', 'id_37', 'id_38'
]

DATA_TYPES_ID.update(((c, 'float32') for c in ID_COLS))
DATA_TYPES_ID.update(((c, 'category') for c in ID_CATS))

In [None]:
# Define list of all categorical features.
CATEGORICALS = [f_name for (f_name, f_type) in dict(DATA_TYPES_TRANSACTION, **DATA_TYPES_ID).items() if f_type == "category"]

In [None]:
def read_set(_type):
    """Read both transactions and identity data."""
    print(f"Reading transactions data...")
    _df = pd.read_csv(os.path.join(DATA_PATH, f'{_type}_transaction.csv'),
                      index_col=IDX_LABEL, dtype=DATA_TYPES_TRANSACTION, nrows=250)

    print(f"Reading identity data...")
    _df = _df.join(pd.read_csv(os.path.join(DATA_PATH, f'{_type}_identity.csv'),
                               index_col=IDX_LABEL, dtype=DATA_TYPES_ID))
    return _df

def read_dataset():
    """Read whole data."""

    fetch_dataset()

    print(f"Reading train data...")
    train_set = read_set('train')

    print(f"Reading test data...")
    test_set = read_set('test')

    return train_set, test_set

def preprocess_dataset(train_set, test_set):
    """Unite train and test into common dataframe."""
    # Create a new target column and remove a former one from the train data.
    print("Start data preprocessing...")
    train_set.pop('isFraud')
    train_set['isTest'] = 0
    test_set['isTest'] = 1

    # Preprocess categorical features.
    n_train = train_set.shape[0]
    for c in train_set.columns:
        s = train_set[c]
        if hasattr(s, 'cat'):
            u = union_categoricals([train_set[c], test_set[c]], sort_categories=True)
            train_set[c] = u[:n_train]
            test_set[c] = u[n_train:]

    # Unite train and test data.
    united = pd.concat([train_set, test_set])

    # Add additional features.
    united['TimeInDay'] = united.TransactionDT % 86400
    united['Cents'] = united.TransactionAmt % 1

    # Remove useless columns.
    united.drop("TransactionDT", axis=1, inplace=True)

    print(f"Dataset merged and preprocessed! Resulted shape: {united.shape}")

    return united

In [None]:
united_dataset = preprocess_dataset(*read_dataset())

## Train-test split

In [None]:
X_train, X_test, y_train, y_test = train_test_split(united_dataset.drop(TARGET_COLUMN, axis=1), united_dataset[TARGET_COLUMN], test_size=0.25)

## Wrap test dataset

In [None]:
raw_dataset = pd.concat([X_test, y_test], axis=1)
wrapped_dataset = Dataset(raw_dataset,
                          name="fraud_detection_adversarial_dataset",
                          target=TARGET_COLUMN,
                          cat_columns=CATEGORICALS)

## Prepare estimator

In [None]:
# Define parameters of an estimator.
ESTIMATOR_PARAMS = {
    'num_leaves': 64,
    'objective': 'binary',
    'min_data_in_leaf': 10,
    'learning_rate': 0.1,
    'feature_fraction': 0.5,
    'bagging_fraction': 0.9,
    'bagging_freq': 1,
    'max_cat_to_onehot': 128,
    'metric': 'auc',
    'n_jobs': -1,
    'seed': 42,
    'subsample_for_bin': united_dataset.shape[0]
}

In [None]:
estimator = LGBMClassifier(**ESTIMATOR_PARAMS)
estimator.fit(X_train, y_train)

In [None]:
train_metric = roc_auc_score(y_train, estimator.predict_proba(X_train)[:, 1].T)
test_metric = roc_auc_score(y_test, estimator.predict_proba(X_test)[:, 1].T)

print(f"Train ROC-AUC score: {train_metric:.2f}")
print(f"Test ROC-AUC score: {test_metric:.2f}")

## Wrap estimator

In [None]:
def prediction_function(df: pd.DataFrame) -> np.ndarray:
    return estimator.predict_proba(df)

In [None]:
wrapped_model = Model(prediction_function,
                      model_type="classification",
                      name="train_test_data_classifier",
                      feature_names=X_train.columns,
                      classification_threshold=0.5,
                      classification_labels=[0, 1])

In [None]:
# Validate wrapped model.
wrapped_test_metric = roc_auc_score(y_test, wrapped_model.predict(wrapped_dataset).raw[:, 1].T)
print(f"Wrapped Test ROC-AUC score: {wrapped_test_metric:.2f}")

## Scan your model to find vulnerabilities
With the Giskard scan feature, you can detect vulnerabilities in your model, including performance biases, unrobustness, data leakage, stochasticity, underconfidence, ethical issues, and more. For detailed information about the scan feature, please refer to our scan documentation.

In [None]:
results = giskard.scan(wrapped_model, wrapped_dataset)

In [None]:
display(results)

## Generate a test suite from the Scan
The objects produced by the scan can be used as fixtures to generate a test suite that integrate domain-specific issues. To create custom tests, refer to the Test your ML Model page.

In [None]:
test_suite = results.generate_test_suite("My first test suite")
test_suite.run()

## Customize your suite by loading objects from the Giskard catalog

The Giskard open source catalog will enable to load:
* Tests such as metamorphic, performance, prediction & data drift, statistical tests, etc
* Slicing functions such as detectors of toxicity, hate, emotion, etc
* Transformation functions such as generators of typos, paraphrase, style tune, etc

For demo purposes, we will load a simple unit test (test_f1) that checks if the test F1 score is above the given threshold. For more examples of tests and functions, refer to the Giskard catalog.

In [None]:
test_suite.add_test(testing.test_f1(model=wrapped_model, dataset=wrapped_dataset, threshold=0.7)).run()

## Upload your suite to the Giskard server

Upload your suite to the Giskard server to:
* Compare models to decide which model to promote
* Debug your tests to diagnose the issues
* Create more domain-specific tests that are integrating business feedback
* Share your results

In [None]:
# Uploading the test suite will automatically save the model, dataset, tests, slicing & transformation functions inside the Giskard UI server
# Create a Giskard client after having install the Giskard server (see documentation)
token = "API_TOKEN"  # Find it in Settings in the Giskard server

client = GiskardClient(
    url="http://localhost:19000",  # URL of your Giskard instance
    token=token
)

my_project = client.create_project("my_project", "PROJECT_NAME", "DESCRIPTION")

# Upload to the current project ✉️
test_suite.upload(client, "my_project")