In [7]:
project_name = "fraud-demo"

In [8]:
import mlrun

# Initialize the MLRun project object
project = mlrun.get_or_create_project(project_name, context="./", user_project=True)

> 2023-10-11 15:38:18,970 [info] Project loaded successfully: {'project_name': 'fraud-demo'}


# Transactions

In [9]:
# Helper functions to adjust the timestamps of our data
# while keeping the order of the selected events and
# the relative distance from one event to the other

import pandas as pd

def date_adjustment(sample, data_max, new_max, old_data_period, new_data_period):
    """
    Adjust a specific sample's date according to the original and new time periods
    """
    sample_dates_scale = (data_max - sample) / old_data_period
    sample_delta = new_data_period * sample_dates_scale
    new_sample_ts = new_max - sample_delta
    return new_sample_ts


def adjust_data_timespan(
    dataframe, timestamp_col="timestamp", new_period="2d", new_max_date_str="now"
):
    """
    Adjust the dataframe timestamps to the new time period
    """
    # Calculate old time period
    data_min = dataframe.timestamp.min()
    data_max = dataframe.timestamp.max()
    old_data_period = data_max - data_min

    # Set new time period
    new_time_period = pd.Timedelta(new_period)
    new_max = pd.Timestamp(new_max_date_str)
    new_min = new_max - new_time_period
    new_data_period = new_max - new_min

    # Apply the timestamp change
    df = dataframe.copy()
    df[timestamp_col] = df[timestamp_col].apply(
        lambda x: date_adjustment(
            x, data_max, new_max, old_data_period, new_data_period
        )
    )
    return df

In [10]:
import pandas as pd

# Fetch the transactions dataset from the server
transactions_data = pd.read_csv(
    "https://s3.wasabisys.com/iguazio/data/fraud-demo-mlrun-fs-docs/data.csv",
    parse_dates=["timestamp"],
)

# use only first 50k
transactions_data = transactions_data.sort_values(by="source", axis=0)[:10000]

# Adjust the samples timestamp for the past 2 days
transactions_data = adjust_data_timespan(transactions_data, new_period="2d")

# Sorting after adjusting timestamps
transactions_data = transactions_data.sort_values(by="timestamp", axis=0)

# Preview
transactions_data.head(3)

Unnamed: 0,step,age,gender,zipcodeOri,zipMerchant,category,amount,fraud,timestamp,source,target,device
274633,91,5,F,28007,28007,es_transportation,26.92,0,2023-10-09 15:38:36.015493000,C1022153336,M1823072687,33832bb8607545df97632a7ab02d69c4
286902,94,2,M,28007,28007,es_transportation,48.22,0,2023-10-09 15:38:54.309118913,C1006176917,M348934600,fadd829c49e74ffa86c8da3be75ada53
416998,131,3,M,28007,28007,es_transportation,17.56,0,2023-10-09 15:38:59.416288939,C1010936270,M348934600,58d0422a50bc40c89d2b4977b2f1beea


In [11]:
# Import MLRun's Feature Store
import mlrun.feature_store as fstore
from mlrun.feature_store.steps import OneHotEncoder, MapValues, DateExtractor
from mlrun.datastore import ParquetTarget, NoSqlTarget, CSVTarget


In [12]:
# Define the transactions FeatureSet
transaction_set = fstore.FeatureSet(
    "transactions",
    entities=[fstore.Entity("source")],
    timestamp_key="timestamp",
    description="transactions feature set",
)

In [13]:
# Define and add value mapping
main_categories = [
    "es_transportation",
    "es_health",
    "es_otherservices",
    "es_food",
    "es_hotelservices",
    "es_barsandrestaurants",
    "es_tech",
    "es_sportsandtoys",
    "es_wellnessandbeauty",
    "es_hyper",
    "es_fashion",
    "es_home",
    "es_contents",
    "es_travel",
    "es_leisure",
]

# One Hot Encode the newly defined mappings
one_hot_encoder_mapping = {
    "category": main_categories,
    "gender": list(transactions_data.gender.unique()),
}

# Define the graph steps
transaction_set.graph.to(
    DateExtractor(parts=["hour", "day_of_week"], timestamp_col="timestamp")
).to(MapValues(mapping={"age": {"U": "0"}}, with_original_features=True)).to(
    OneHotEncoder(mapping=one_hot_encoder_mapping)
)


# Add aggregations for 2, 12, and 24 hour time windows
transaction_set.add_aggregation(
    name="amount",
    column="amount",
    operations=["avg", "sum", "count", "max"],
    windows=["2h", "12h", "24h"],
    period="1h",
)


# Add the category aggregations over a 14 day window
for category in main_categories:
    transaction_set.add_aggregation(
        name=category,
        column=f"category_{category}",
        operations=["sum"],
        windows=["14d"],
        period="1d",
    )

# Add only offline store

parquet = CSVTarget(
    name="transactions", path=f"./transactions-csv-{project_name}.csv"
)

transaction_set.set_targets([parquet], with_defaults=False)


In [14]:
# Ingest your transactions dataset through your defined pipeline
transactions_df = fstore.ingest(
    transaction_set, transactions_data, infer_options=fstore.InferOptions.default()
)

transactions_df.head(3)

Unnamed: 0_level_0,amount_sum_2h,amount_sum_12h,amount_sum_24h,amount_count_2h,amount_count_12h,amount_count_24h,amount_max_2h,amount_max_12h,amount_max_24h,amount_avg_2h,...,category_es_contents,category_es_travel,category_es_leisure,amount,fraud,timestamp,target,device,timestamp_hour,timestamp_day_of_week
source,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
C1022153336,26.92,26.92,26.92,1.0,1.0,1.0,26.92,26.92,26.92,26.92,...,0,0,0,26.92,0,2023-10-09 15:38:36.015493000,M1823072687,33832bb8607545df97632a7ab02d69c4,15,0
C1006176917,48.22,48.22,48.22,1.0,1.0,1.0,48.22,48.22,48.22,48.22,...,0,0,0,48.22,0,2023-10-09 15:38:54.309118913,M348934600,fadd829c49e74ffa86c8da3be75ada53,15,0
C1010936270,17.56,17.56,17.56,1.0,1.0,1.0,17.56,17.56,17.56,17.56,...,0,0,0,17.56,0,2023-10-09 15:38:59.416288939,M348934600,58d0422a50bc40c89d2b4977b2f1beea,15,0


# Eventos do usuario

In [15]:
# Fetch the user_events dataset from the server
user_events_data = pd.read_csv(
    "https://s3.wasabisys.com/iguazio/data/fraud-demo-mlrun-fs-docs/events.csv",
    index_col=0,
    quotechar="'",
    parse_dates=["timestamp"],
)

# Adjust to the last 2 days to see the latest aggregations in the online feature vectors
user_events_data = adjust_data_timespan(user_events_data, new_period="2d")

# Preview
user_events_data.head(3)

Unnamed: 0,source,event,timestamp
0,C1974668487,details_change,2023-10-11 00:45:59.316315086
1,C1973547259,login,2023-10-11 03:47:33.573455508
2,C515668508,login,2023-10-11 00:20:39.854632302


In [16]:
user_events_set = fstore.FeatureSet(
    "events",
    entities=[fstore.Entity("source")],
    timestamp_key="timestamp",
    description="user events feature set",
)

In [17]:
# Define and add value mapping
events_mapping = {"event": list(user_events_data.event.unique())}

# One Hot Encode
user_events_set.graph.to(OneHotEncoder(mapping=events_mapping))


target = CSVTarget(
    name="labels", path=f"./events-{project_name}.csv"
)
# Add only offline store
user_events_set.set_targets([target], with_defaults=False)

# Plot the pipeline so you can see the different steps

In [18]:
# Ingestion of your newly created events feature set
events_df = fstore.ingest(user_events_set, user_events_data)
events_df.head(3)

Unnamed: 0_level_0,event_details_change,event_login,event_password_change,timestamp
source,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
C1974668487,1,0,0,2023-10-11 00:45:59.316315086
C1973547259,0,1,0,2023-10-11 03:47:33.573455508
C515668508,0,1,0,2023-10-11 00:20:39.854632302


# Transform

In [19]:
def create_labels(df):
    labels = df[["fraud", "timestamp"]].copy()
    labels = labels.rename(columns={"fraud": "label"})
    labels["timestamp"] = labels["timestamp"].astype("datetime64[ms]")
    labels["label"] = labels["label"].astype(int)
    return labels

In [20]:
from mlrun.datastore import ParquetTarget
import os

# Define the "labels" feature set
labels_set = fstore.FeatureSet(
    "labels",
    entities=[fstore.Entity("source")],
    timestamp_key="timestamp",
    description="training labels",
    engine="pandas",
)

labels_set.graph.to(name="create_labels", handler=create_labels)


# specify only Parquet (offline) target since its not used for real-time
target = CSVTarget(
    name="labels", path=f"./labels-csv.csv"
)
labels_set.set_targets([target], with_defaults=False)

In [21]:
# Ingest the labels feature set
labels_df = fstore.ingest(labels_set, transactions_data)
labels_df.head(3)

Unnamed: 0_level_0,label,timestamp
source,Unnamed: 1_level_1,Unnamed: 2_level_1
C1022153336,0,2023-10-09 15:38:36.015
C1006176917,0,2023-10-09 15:38:54.309
C1010936270,0,2023-10-09 15:38:59.416


## Create feature vector

In [22]:
# Define the list of features to use
features = [
    "events.*",
    "transactions.amount_max_2h",
    "transactions.amount_sum_2h",
    "transactions.amount_count_2h",
    "transactions.amount_avg_2h",
    "transactions.amount_max_12h",
    "transactions.amount_sum_12h",
    "transactions.amount_count_12h",
    "transactions.amount_avg_12h",
    "transactions.amount_max_24h",
    "transactions.amount_sum_24h",
    "transactions.amount_count_24h",
    "transactions.amount_avg_24h",
    "transactions.es_transportation_sum_14d",
    "transactions.es_health_sum_14d",
    "transactions.es_otherservices_sum_14d",
    "transactions.es_food_sum_14d",
    "transactions.es_hotelservices_sum_14d",
    "transactions.es_barsandrestaurants_sum_14d",
    "transactions.es_tech_sum_14d",
    "transactions.es_sportsandtoys_sum_14d",
    "transactions.es_wellnessandbeauty_sum_14d",
    "transactions.es_hyper_sum_14d",
    "transactions.es_fashion_sum_14d",
    "transactions.es_home_sum_14d",
    "transactions.es_travel_sum_14d",
    "transactions.es_leisure_sum_14d",
    "transactions.gender_F",
    "transactions.gender_M",
    "transactions.step",
    "transactions.amount",
    "transactions.timestamp_hour",
    "transactions.timestamp_day_of_week",
]

In [23]:
# Import MLRun's Feature Store
import mlrun.feature_store as fstore

# Define the feature vector name for future reference
fv_name = "transactions-fraud"

# Define the feature vector using the feature store (fstore)
transactions_fv = fstore.FeatureVector(
    fv_name,
    features,
    label_feature="labels.label",
    description="Predicting a fraudulent transaction",
)

# Save the feature vector in the feature store
transactions_fv.save()

In [24]:
# Obtain the values of the features in the feature vector, to ensure the data appears as expected.

from mlrun.datastore.targets import CSVTarget

train_dataset = fstore.get_offline_features("store://feature-vectors/fraud-demo-eduardo/transactions-fraud:latest")
train_dataset.to_dataframe().tail(5)

Unnamed: 0,event_details_change,event_login,event_password_change,amount_avg_2h,amount_avg_12h,amount_avg_24h,amount_sum_2h,amount_sum_12h,amount_sum_24h,amount_count_2h,...,es_home_sum_14d,es_travel_sum_14d,es_leisure_sum_14d,step,gender_F,gender_M,amount,timestamp_hour,timestamp_day_of_week,label
1763,0,0,1,28.912,30.842424,30.236667,144.56,1017.8,1904.91,5.0,...,0.0,1.0,0.0,96.0,1.0,0.0,24.02,15.0,2.0,0.0
1764,1,0,0,23.875,27.209167,24.745102,47.75,653.02,1212.51,2.0,...,0.0,0.0,0.0,134.0,0.0,1.0,26.81,15.0,2.0,0.0
1765,0,1,0,22.7775,31.29125,37.702031,91.11,1001.32,2412.93,4.0,...,2.0,0.0,0.0,141.0,1.0,0.0,14.95,15.0,2.0,0.0
1766,0,0,1,12.56,28.263889,28.415526,37.68,1017.5,2159.58,3.0,...,0.0,0.0,0.0,101.0,0.0,1.0,13.62,15.0,2.0,0.0
1767,0,0,1,19.2175,31.195789,28.273418,76.87,1185.44,2233.6,4.0,...,0.0,0.0,0.0,40.0,0.0,1.0,12.82,15.0,2.0,0.0


## Treinamento de modelos

In [25]:
# Import the Sklearn classifier function from the functions hub
classifier_fn = mlrun.import_function("hub://auto_trainer")

In [26]:
# Prepare the parameters list for the training function
# you use 3 different models
training_params = {
    "model_name": [
        "transaction_fraud_rf",
        "transaction_fraud_xgboost",
        "transaction_fraud_adaboost",
    ],
    "model_class": [
        "sklearn.ensemble.RandomForestClassifier",
        "sklearn.ensemble.GradientBoostingClassifier",
        "sklearn.ensemble.AdaBoostClassifier",
    ],
}

# Define the training task, including your feature vector, label and hyperparams definitions
train_task = mlrun.new_task(
    "training",
    inputs={"dataset": transactions_fv.uri},
    params={"label_columns": "label"},
)

train_task.with_hyper_params(training_params, strategy="list", selector="max.accuracy")

# Specify your cluster image
classifier_fn.spec.image = "mlrun/mlrun"

# Run training
classifier_fn.run(train_task, local=True, verbose=True)

> 2023-10-11 15:39:30,556 [info] Storing function: {'name': 'training', 'uid': 'e8c605fdcc544498a08a782ed1528d56', 'db': 'http://localhost:8080'}
> 2023-10-11 15:39:31,756 [error] run error, Traceback (most recent call last):
  File "/home/eduardo/.asdf/installs/python/3.9.0/lib/python3.9/site-packages/mlrun/runtimes/local.py", line 278, in _run
    sout, serr = exec_from_params(fn, runobj, context)
  File "/home/eduardo/.asdf/installs/python/3.9.0/lib/python3.9/site-packages/mlrun/runtimes/local.py", line 426, in exec_from_params
    kwargs = get_func_arg(handler, runobj, context)
  File "/home/eduardo/.asdf/installs/python/3.9.0/lib/python3.9/site-packages/mlrun/runtimes/local.py", line 501, in get_func_arg
    kwargs[key] = _get_input_value(key)
  File "/home/eduardo/.asdf/installs/python/3.9.0/lib/python3.9/site-packages/mlrun/runtimes/local.py", line 482, in _get_input_value
    input_obj = context.get_input(input_key, inputs[input_key])
  File "/home/eduardo/.asdf/installs/python

project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
fraud-demo-eduardo,...d1528d56,0,Oct 11 18:39:30,error,training,kind=owner=eduardo,dataset,label_columns=label,,iteration_resultsparallel_coordinates





> 2023-10-11 15:39:32,253 [info] Run execution finished: {'status': 'error', 'name': 'training'}


RunError: 3 of 3 tasks failed, check logs in db for details