In [1]:
import mlrun
import pandas as pd
import os
from datetime import datetime, timedelta

# 1. Initialize Project
project = mlrun.get_or_create_project(
    name="fraud-demo",
    context="./",
    user_project=True
)

# 2.
my_redis_url = "redis://host.docker.internal:6379"

print(f"Using Redis URL: {my_redis_url}")

# (Helper Function: Adjust Timestamps)
def adjust_data_timespan(df, timestamp_col="timestamp", new_period="2d"):
    if new_period.endswith("d"):
        delta = timedelta(days=int(new_period[:-1]))
    elif new_period.endswith("h"):
        delta = timedelta(hours=int(new_period[:-1]))
    else:
        delta = timedelta(days=2)
    max_time = df[timestamp_col].max()
    now = datetime.now()
    shift = now - max_time - timedelta(minutes=5) 
    df[timestamp_col] = df[timestamp_col] + shift
    start_time = now - delta
    df = df[df[timestamp_col] >= start_time]
    return df

Project Source: git://github.com/mlrun/demo-fraud.git
> 2025-12-12 19:59:58,425 [info] Project loaded successfully: {"project_name":"fraud-demo-jovyan"}
Using Redis URL: redis://host.docker.internal:6379


In [2]:
import mlrun.feature_store as fstore
from mlrun.feature_store.steps import OneHotEncoder, MapValues, DateExtractor
from mlrun.datastore.targets import RedisNoSqlTarget, ParquetTarget

# 1. Fetch Data
print("Fetching transactions data...")
transactions_data = pd.read_csv(
    "https://s3.wasabisys.com/iguazio/data/fraud-demo-mlrun-fs-docs/data.csv",
    parse_dates=["timestamp"]
)

# 2. Preprocess Data (Sort and Adjust Time)
transactions_data = transactions_data.sort_values(by="source", axis=0)[:10000]
transactions_data = adjust_data_timespan(transactions_data, new_period="2d")
transactions_data = transactions_data.sort_values(by="timestamp", axis=0)

# 3. Define FeatureSet
transaction_set = fstore.FeatureSet(
    "transactions",
    entities=[fstore.Entity("source")],
    timestamp_key="timestamp",
    description="transactions feature set"
)

# 4. Define Transformation Graph
# Define categories for mapping
main_categories = ["es_transportation", "es_health", "es_otherservices",
       "es_food", "es_hotelservices", "es_barsandrestaurants",
       "es_tech", "es_sportsandtoys", "es_wellnessandbeauty",
       "es_hyper", "es_fashion", "es_home", "es_contents",
       "es_travel", "es_leisure"]

one_hot_encoder_mapping = {
    "category": main_categories,
    "gender": list(transactions_data.gender.unique())
}

transaction_set.graph\
    .to(DateExtractor(parts=["hour", "day_of_week"], timestamp_col="timestamp"))\
    .to(MapValues(mapping={"age": {"U": "0"}}, with_original_features=True))\
    .to(OneHotEncoder(mapping=one_hot_encoder_mapping))

# 5. Add Aggregations (Crucial for fraud detection)
transaction_set.add_aggregation(
    name="amount",
    column="amount",
    operations=["avg", "sum", "count", "max"],
    windows=["2h", "12h", "24h"],
    period="1h"
)

for category in main_categories:
    transaction_set.add_aggregation(
        name=category,
        column=f"category_{category}",
        operations=["sum"],
        windows=["14d"],
        period="1d"
    )

# 6. Ingest with Explicit Targets
print("Ingesting transactions to Redis and Parquet...")
transactions_df = transaction_set.ingest(
    transactions_data,
    infer_options=fstore.InferOptions.default(),
    targets=[
        ParquetTarget(name="parquet", path="./store/transactions"),
        RedisNoSqlTarget(path=my_redis_url)
    ]
)

print("Transactions ingestion done.")
transactions_df.head(3)

Fetching transactions data...
Ingesting transactions to Redis and Parquet...
Transactions ingestion done.


Unnamed: 0_level_0,amount_sum_2h,amount_sum_12h,amount_sum_24h,amount_count_2h,amount_count_12h,amount_count_24h,amount_max_2h,amount_max_12h,amount_max_24h,amount_avg_2h,...,category_es_contents,category_es_travel,category_es_leisure,amount,fraud,timestamp,target,device,timestamp_hour,timestamp_day_of_week
source,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
C1004109477,23.41,23.41,23.41,1.0,1.0,1.0,23.41,23.41,23.41,23.41,...,0,0,0,23.41,0,2025-12-10 20:35:06.170739,M1823072687,06e72f0e821545ce8bb796a89aacab96,20,2
C1038329920,21.66,21.66,21.66,1.0,1.0,1.0,21.66,21.66,21.66,21.66,...,0,0,0,21.66,0,2025-12-10 20:47:51.207130,M1823072687,49d6227a6dbd4cc48d5c33f0bd33ceb4,20,2
C1033736586,61.7,61.7,61.7,1.0,1.0,1.0,61.7,61.7,61.7,61.7,...,0,0,0,61.7,0,2025-12-10 20:48:21.818050,M1823072687,c0176fe7f71b437d8e51bd7fc865ab4e,20,2


In [3]:
# 1. Fetch Data
print("Fetching user events data...")
user_events_data = pd.read_csv(
    "https://s3.wasabisys.com/iguazio/data/fraud-demo-mlrun-fs-docs/events.csv",
    index_col=0,
    quotechar="'",
    parse_dates=["timestamp"]
)

# 2. Adjust Time
user_events_data = adjust_data_timespan(user_events_data, new_period="2d")

# 3. Define FeatureSet
user_events_set = fstore.FeatureSet(
    "events",
    entities=[fstore.Entity("source")],
    timestamp_key="timestamp",
    description="user events feature set"
)

# 4. Define Graph (Simple OneHot)
events_mapping = {"event": list(user_events_data.event.unique())}
user_events_set.graph.to(OneHotEncoder(mapping=events_mapping))

# 5. Ingest with Explicit Targets
print("Ingesting user events to Redis and Parquet...")
events_df = user_events_set.ingest(
    user_events_data,
    targets=[
        ParquetTarget(name="parquet", path="./store/events"),
        RedisNoSqlTarget(path=my_redis_url)
    ]
)

print("User events ingestion done.")
events_df.head(3)

Fetching user events data...
Ingesting user events to Redis and Parquet...
User events ingestion done.


Unnamed: 0_level_0,event_details_change,event_login,event_password_change,timestamp
source,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
C517412999,1,0,0,2025-12-10 20:33:57.829454
C1457388461,1,0,0,2025-12-10 21:08:13.752237
C34104097,0,1,0,2025-12-12 15:11:14.455855


In [4]:
# 1. Define Label Creation Logic
def create_labels(df):
    labels = df[["fraud", "timestamp"]].copy()
    labels = labels.rename(columns={"fraud": "label"})
    labels["timestamp"] = labels["timestamp"].astype("datetime64[ns]")
    labels["label"] = labels["label"].astype(int)
    return labels

# 2. Define FeatureSet
labels_set = fstore.FeatureSet(
    "labels",
    entities=[fstore.Entity("source")],
    timestamp_key="timestamp",
    description="training labels",
    engine="pandas"
)

labels_set.graph.to(name="create_labels", handler=create_labels)

# 3. Ingest with Explicit Targets (Parquet only)
print("Ingesting labels to Parquet...")
labels_df = labels_set.ingest(
    transactions_data,
    targets=[
        ParquetTarget(name="parquet", path="./store/labels")
    ]
)

print("Labels ingestion done.")
labels_df.head(3)

Ingesting labels to Parquet...
Labels ingestion done.


Unnamed: 0_level_0,label,timestamp
source,Unnamed: 1_level_1,Unnamed: 2_level_1
C1004109477,0,2025-12-10 20:35:06.170739
C1038329920,0,2025-12-10 20:47:51.207130
C1033736586,0,2025-12-10 20:48:21.818050


In [6]:
print("Part 1: Data Ingestion Complete!")
print("-" * 30)
print(f"Transactions stored in: ./store/transactions")
print(f"Events stored in:       ./store/events")
print(f"Labels stored in:       ./store/labels")

Part 1: Data Ingestion Complete!
------------------------------
Transactions stored in: ./store/transactions
Events stored in:       ./store/events
Labels stored in:       ./store/labels
