##Hopsworks Installation

In [1]:
!pip install -U hopsworks --quiet

[31mERROR: Could not find a version that satisfies the requirement hopsworks (from versions: none)[0m
[31mERROR: No matching distribution found for hopsworks[0m


##Loading datasets

In [2]:
import pandas as pd
credit_cards_df = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/credit_cards.csv")
credit_cards_df.head(3)

URLError: <urlopen error [Errno 110] Connection timed out>

In [None]:
profiles_df = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/profiles.csv", parse_dates=["birthdate"])
profiles_df.head(3)

Unnamed: 0,name,sex,mail,birthdate,City,Country,cc_num
0,Catherine Zimmerman,F,valenciajason@hotmail.com,1988-09-20,Bryn Mawr-Skyway,US,4796807885357879
1,Michael Williams,M,brettkennedy@yahoo.com,1977-03-01,Gates-North Gates,US,4529266636192966
2,Jessica Krueger,F,marthacruz@hotmail.com,1947-09-10,Greenfield,US,4922690008243953


In [None]:
trans_df = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/transactions.csv", parse_dates=["datetime"])
trans_df.head(3)

Unnamed: 0,tid,datetime,cc_num,category,amount,latitude,longitude,city,country,fraud_label
0,11df919988c134d97bbff2678eb68e22,2022-01-01 00:00:24,4473593503484549,Health/Beauty,62.95,42.30865,-83.48216,Canton,US,0
1,dd0b2d6d4266ccd3bf05bc2ea91cf180,2022-01-01 00:00:56,4272465718946864,Grocery,85.45,33.52253,-117.70755,Laguna Niguel,US,0
2,e627f5d9a9739833bd52d2da51761fc3,2022-01-01 00:02:32,4104216579248948,Domestic Transport,21.63,37.60876,-77.37331,Mechanicsville,US,0


##Feature Engineering

Fraudulent transactions can differ from regular ones in many different ways. Typical red flags would for instance be a large transaction volume/frequency in the span of a few hours. It could also be the case that elderly people in particular are targeted by fraudsters. To facilitate model learning we will create additional features based on these patterns. In particular, we will create two types of features:
1. **Features that aggregate data from different data sources**. This could for instance be the age of a customer at the time of a transaction, which combines the `birthdate` feature from `profiles.csv` with the `datetime` feature from `transactions.csv`.
2. **Features that aggregate data from multiple time steps**. An example of this could be the transaction frequency of a credit card in the span of a few hours, which is computed using a window function.



In [None]:
import numpy as np

# Compute age at transaction.
age_df = trans_df.merge(profiles_df, on="cc_num", how="left")
trans_df["age_at_transaction"] = (age_df["datetime"] - age_df["birthdate"]) / np.timedelta64(1, "Y")

# Compute days until card expires.
card_expiry_df = trans_df.merge(credit_cards_df, on="cc_num", how="left")
card_expiry_df["expires"] = pd.to_datetime(card_expiry_df["expires"], format="%m/%y")
trans_df["days_until_card_expires"] = (card_expiry_df["expires"] - card_expiry_df["datetime"]) / np.timedelta64(1, "D")

trans_df[["age_at_transaction", "days_until_card_expires"]].head()

Unnamed: 0,age_at_transaction,days_until_card_expires
0,97.513297,1460.999722
1,33.752919,1733.999352
2,80.899681,242.998241
3,53.526088,150.997639
4,46.005059,515.99728


Next, we create features that for each credit card aggregate data from multiple time steps.

We start by computing the distance between consecutive transactions, which we will call `loc_delta`.
Here we use the [Haversine distance](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.haversine_distances.html?highlight=haversine#sklearn.metrics.pairwise.haversine_distances) to quantify the distance between two longitude and latitude coordinates.

In [None]:
from math import radians

# Do some simple preprocessing.
trans_df.sort_values("datetime", inplace=True)
trans_df[["longitude", "latitude"]] = trans_df[["longitude", "latitude"]].applymap(radians)

def haversine(long, lat):
    """Compute Haversine distance between each consecutive coordinate in (long, lat)."""

    long_shifted = long.shift()
    lat_shifted = lat.shift()
    long_diff = long_shifted - long
    lat_diff = lat_shifted - lat

    a = np.sin(lat_diff/2.0)**2
    b = np.cos(lat) * np.cos(lat_shifted) * np.sin(long_diff/2.0)**2
    c = 2*np.arcsin(np.sqrt(a + b))

    return c


trans_df["loc_delta"] = trans_df.groupby("cc_num")\
    .apply(lambda x : haversine(x["longitude"], x["latitude"]))\
    .reset_index(level=0, drop=True)\
    .fillna(0)

Next we compute windowed aggregates. Here we will use 4-hour windows, but feel free to experiment with different window lengths by setting `window_len` below to a value of your choice.

In [None]:
window_len = "4h"
cc_group = trans_df[["cc_num", "amount", "datetime"]].groupby("cc_num").rolling(window_len, on="datetime")

# Moving average of transaction volume.
df_4h_mavg = pd.DataFrame(cc_group.mean())
df_4h_mavg.columns = ["trans_volume_mavg", "datetime"]
df_4h_mavg = df_4h_mavg.reset_index(level=["cc_num"])
df_4h_mavg = df_4h_mavg.drop(columns=["cc_num", "datetime"])
df_4h_mavg = df_4h_mavg.sort_index()

# Moving standard deviation of transaction volume.
df_4h_std = pd.DataFrame(cc_group.mean())
df_4h_std.columns = ["trans_volume_mstd", "datetime"]
df_4h_std = df_4h_std.reset_index(level=["cc_num"])
df_4h_std = df_4h_std.drop(columns=["cc_num", "datetime"])
df_4h_std = df_4h_std.fillna(0)
df_4h_std = df_4h_std.sort_index()
window_aggs_df = df_4h_std.merge(df_4h_mavg,left_index=True, right_index=True)

# Moving average of transaction frequency.
df_4h_count = pd.DataFrame(cc_group.mean())
df_4h_count.columns = ["trans_freq", "datetime"]
df_4h_count = df_4h_count.reset_index(level=["cc_num"])
df_4h_count = df_4h_count.drop(columns=["cc_num", "datetime"])
df_4h_count = df_4h_count.sort_index()
window_aggs_df = window_aggs_df.merge(df_4h_count,left_index=True, right_index=True)

# Moving average of location difference between consecutive transactions.
cc_group = trans_df[["cc_num", "loc_delta", "datetime"]].groupby("cc_num").rolling(window_len, on="datetime").mean()
df_4h_loc_delta_mavg = pd.DataFrame(cc_group)
df_4h_loc_delta_mavg.columns = ["loc_delta_mavg", "datetime"]
df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.reset_index(level=["cc_num"])
df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.drop(columns=["cc_num", "datetime"])
df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.sort_index()
window_aggs_df = window_aggs_df.merge(df_4h_loc_delta_mavg,left_index=True, right_index=True)

window_aggs_df = window_aggs_df.merge(trans_df[["cc_num", "datetime"]].sort_index(),left_index=True, right_index=True)
window_aggs_df.tail()

Unnamed: 0,trans_volume_mstd,trans_volume_mavg,trans_freq,loc_delta_mavg,cc_num,datetime
106015,73.08,73.08,73.08,0.045635,4032019521897961,2022-03-24 10:57:02
106016,287.33,287.33,287.33,0.045846,4032019521897961,2022-03-28 11:57:02
106017,53.88,53.88,53.88,0.00012,4032019521897961,2022-04-01 12:57:02
106018,279.73,279.73,279.73,0.045928,4032019521897961,2022-04-05 13:57:02
106019,73.66,73.66,73.66,0.045974,4032019521897961,2022-04-09 14:57:02


### Convert date time object to unix epoch in milliseconds

In [None]:
trans_df.datetime = trans_df.datetime.values.astype(np.int64) // 10 ** 6
window_aggs_df.datetime = window_aggs_df.datetime.values.astype(np.int64) // 10 ** 6

## Creating feature groups

A [feature group](https://docs.hopsworks.ai/feature-store-api/latest/generated/feature_group/) can be seen as a collection of conceptually related features. In our case, we will create a feature group for the transaction data and a feature group for the windowed aggregations on the transaction data. Both will have `cc_num` as primary key, which will allow us to join them when creating a dataset in the next tutorial.

Feature groups can also be used to define a namespace for features. For instance, in a real-life setting we would likely want to experiment with different window lengths. In that case, we can create feature groups with identical schema for each window length. 

Before we can create a feature group we need to connect to our feature store.

In [None]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store()

Copy your Api Key (first register/login): https://c.app.hopsworks.ai/account/api/generated

Paste it here: zDbNxQYcq9mYmN5Z.8MnghQkzlJyEazYkW172GOrkEXGdeBa1JtvaSJyApDH72thyp1GqWc06aULkkJc3
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/295
2022-08-01 12:49:32,335 INFO: Generating grammar tables from /usr/lib/python3.7/lib2to3/Grammar.txt
2022-08-01 12:49:32,361 INFO: Generating grammar tables from /usr/lib/python3.7/lib2to3/PatternGrammar.txt
Connected. Call `.close()` to terminate connection gracefully.




To create a feature group we need to give it a name and specify a primary key. It is also good to provide a description of the contents of the feature group and a version number, if it is not defined it will automatically be incremented to `1`.

In [None]:
trans_fg = fs.get_or_create_feature_group(
    name="transactions_fraud_batch_fg",
    version=1,
    description="Transaction data",
    primary_key=['cc_num'],
    event_time=['datetime']
)

In [None]:
trans_fg.insert(trans_df)

Uploading Dataframe: 0.00% |          | Rows 0/106020 | Elapsed Time: 00:00 | Remaining Time: ?

Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/295/jobs/named/transactions_fraud_batch_fg_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7f34d0d23990>, None)

In [None]:
feature_descriptions = [
    {"name": "tid", "description": "Transaction id"},
    {"name": "datetime", "description": "Transaction time"},
    {"name": "cc_num", "description": "Number of the credit card performing the transaction"},
    {"name": "category", "description": "Expense category"},
    {"name": "amount", "description": "Dollar amount of the transaction"},
    {"name": "latitude", "description": "Transaction location latitude"},
    {"name": "longitude", "description": "Transaction location longitude"},
    {"name": "city", "description": "City in which the transaction was made"},
    {"name": "country", "description": "Country in which the transaction was made"},
    {"name": "fraud_label", "description": "Whether the transaction was fraudulent or not"},
    {"name": "age_at_transaction", "description": "Age of the card holder when the transaction was made"},
    {"name": "days_until_card_expires", "description": "Card validity days left when the transaction was made"},
    {"name": "loc_delta", "description": "Haversine distance between this transaction location and the previous transaction location from the same card"},
]

for desc in feature_descriptions: 
    trans_fg.update_feature_description(desc["name"], desc["description"])

In [None]:
window_aggs_fg = fs.get_or_create_feature_group(
    name=f"transactions_{window_len}_aggs_fraud_batch_fg",
    version=1,
    description=f"Aggregate transaction data over {window_len} windows.",
    primary_key=['cc_num'],
    event_time=['datetime']
)

In [None]:
window_aggs_fg.insert(window_aggs_df)

Uploading Dataframe: 0.00% |          | Rows 0/106020 | Elapsed Time: 00:00 | Remaining Time: ?

Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/295/jobs/named/transactions_4h_aggs_fraud_batch_fg_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7f34d0c48110>, None)

In [None]:
feature_descriptions = [
    {"name": "datetime", "description": "Transaction time"},
    {"name": "cc_num", "description": "Number of the credit card performing the transaction"},
    {"name": "loc_delta_mavg", "description": "Moving average of location difference between consecutive transactions from the same card"},
    {"name": "trans_freq", "description": "Moving average of transaction frequency from the same card"},
    {"name": "trans_volume_mavg", "description": "Moving average of transaction volume from the same card"},
    {"name": "trans_volume_mstd", "description": "Moving standard deviation of transaction volume from the same card"},
]

for desc in feature_descriptions: 
    window_aggs_fg.update_feature_description(desc["name"], desc["description"])