This is the first part of the quick start series of tutorials about Hopsworks Feature Store. As part of this first module, you will work with data related to credit card transactions. 
The objective of this tutorial is to demonstrate how to work with the **Hopworks Feature Store**  for online data with a goal of training and deploying a model that can predict fraudulent transactions.


## 🗒️ This notebook is divided in 3 sections:
1. Loading the data and feature engineeing,
2. Connect to the Hopsworks Feature Store,
3. Create feature groups and upload them to the Feature Store.


First of all you will load the data and do some feature engineering on it.

## <span style='color:#ff5f27'> 📝 Imports

In [11]:
import hopsworks

from mlopstemplate.features import transactions, profile
from mlopstemplate.synthetic_data import synthetic_data
from mlopstemplate.synthetic_data.data_sources import get_datasets

First of all you will load the data and do some feature engineering on it.

## <span style="color:#ff5f27;"> 💽 Loading the Data </span>

The data you will use comes from 2 different CSV files:

- `transactions.csv`: events containing information about when a credit card was used, such as a timestamp, location, and the amount spent. A boolean fraud_label variable (True/False) tells us whether a transaction was fraudulent or not.
- `profiles.csv`: credit card user information such as birthdate and city of residence.

In a production system, these CSV files would originate from separate data sources or tables, and probably separate data pipelines. **These files have a common credit card number column cc_num, which you will use later to join features together from the different datasets.**

Now, you can go ahead and load the data.

In [12]:
# get data from the source
trans_df, labels_df, profiles_df = get_datasets()

---

## <span style="color:#ff5f27;"> 🛠️ Feature Engineering </span>

Fraudulent transactions can differ from regular ones in many different ways. Typical red flags would for instance be a large transaction volume/frequency in the span of a few hours. It could also be the case that elderly people in particular are targeted by fraudsters. To facilitate model learning you will create additional features based on these patterns. In particular, you will create two types of features:
1. **Features that aggregate data from different data sources**. This could for instance be the age of a customer at the time of a transaction, which combines the `birthdate` feature from `profiles.csv` with the `datetime` feature from `transactions.csv`.
2. **Features that aggregate data from multiple time steps**. An example of this could be the transaction frequency of a credit card in the span of a few hours, which is computed using a window function.

Let's start with the first category.

Now you are ready to start by computing the distance between consecutive transactions, lets call it `loc_delta`.
Here you will use the [Haversine distance](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.haversine_distances.html?highlight=haversine#sklearn.metrics.pairwise.haversine_distances) to quantify the distance between two longitude and latitude coordinates.

In [15]:
# compute profile features
# select final features
profiles_df = profile.select_features(profiles_df)

In [14]:
# Compute transaction features

# compute previous location of the transaction
trans_df = transactions.loc_delta_t_minus_1(trans_df)

# Computes time difference between current and previous transaction
trans_df = transactions.time_delta_t_minus_1(trans_df)

# Compute year and month string from datetime column.
trans_df["month"] = transactions.get_year_month(trans_df.datetime)

# compute on demand features
# customer's age at transaction
trans_df = transactions.card_owner_age(trans_df, profiles_df)

# days untill card expires at the time of transaction
trans_df = transactions.expiry_days(trans_df, profiles_df)


In [16]:
# labels
labels_df["month"] = transactions.get_year_month(labels_df.datetime)

In [18]:
trans_df

Unnamed: 0,tid,datetime,cc_num,category,amount,latitude,longitude,city,country,loc_delta_t_minus_1,time_delta_t_minus_1,month,age_at_transaction,days_until_card_expires
0,2e4968a4b5e70e4508c95bc3f41b1a19,2023-04-23 14:08:53,4117769962104727,Grocery,44.48,40.728160,-74.077640,Jersey City,US,1.548302,17.211887,2023-4,95.636706,1379.410498
1,bfa5db90f9189466714709845186f54a,2023-04-23 14:16:15,4359690750021930,Domestic Transport,41.89,44.990120,-123.026210,Keizer,US,0.465695,6.591088,2023-4,83.622784,1773.405382
2,a63e93fa9699a8137e21ebb0cb36b8ec,2023-04-23 14:20:16,4865352294091955,Grocery,34.67,47.380930,-122.234840,Kent,US,2.534995,9.718252,2023-4,27.643545,312.402593
3,fe8a86dd38334a6822cd328af1c16ab1,2023-04-23 14:24:32,4725403619791355,Clothing,45.34,33.950150,-118.039170,South Whittier,US,0.310804,1.753056,2023-4,25.280739,1042.399630
4,b9606d13115bab47a8af639c14e59d3f,2023-04-23 14:28:37,4564997137285299,Restaurant/Cafeteria,6.70,33.522530,-117.707550,Laguna Niguel,US,1.928053,4.200220,2023-4,46.967708,1560.396794
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
74803,a35cd443d55abd1dbdadd9416337df08,2023-10-02 11:56:44,4895642366200763,Cash Withdrawal,19.74,37.529688,-77.554699,Bon Air,US,0.005806,-1.958333,2023-10,91.644586,668.502269
74804,be925750cd5f8d03f307dc177de39747,2023-09-30 12:56:44,4895642366200763,Cash Withdrawal,2.88,37.530976,-77.548956,Bon Air,US,0.008057,-1.958333,2023-9,91.639224,670.460602
74805,6d31780dae3c266116693f16286b211a,2023-09-28 13:56:44,4895642366200763,Cash Withdrawal,440.57,37.538403,-77.552121,Bon Air,US,0.006713,-1.958333,2023-9,91.633863,672.418935
74806,d368ee38be7eaa52f7e64934d3a712f5,2023-09-26 14:56:44,4895642366200763,Cash Withdrawal,0.38,37.534508,-77.557662,Bon Air,US,0.004854,-1.958333,2023-9,91.628501,674.377269


In [19]:
# https://github.com/logicalclocks/hopsworks-tutorials/blob/master/integrations/great_expectations/fraud_batch_data_validation.ipynb
import great_expectations as ge
from great_expectations.core import ExpectationSuite, ExpectationConfiguration

In [20]:
ge_trans_df = ge.from_pandas(trans_df)

expectation_suite_trans = ge_trans_df.get_expectation_suite()
expectation_suite_trans.expectation_suite_name = "transaction_suite"
print(expectation_suite_trans)


2023-10-20 14:22:08,119 INFO: 	0 expectation(s) included in expectation_suite.
{
  "expectations": [],
  "data_asset_type": "Dataset",
  "meta": {
    "great_expectations_version": "0.15.12"
  },
  "expectation_suite_name": "transaction_suite",
  "ge_cloud_id": null
}


In [21]:
# Check for errors which could lead to technical issues
expectation_suite_trans.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_unique",
        kwargs={"column":"tid", "result_format":"COMPLETE"}
    )
)

# Assess data correctness
expectation_suite_trans.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"amount",
            "min_value": 0
        }
    )
)

expectation_suite_trans.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"age_at_transaction",
            "min_value": 17,
            "max_value": 101
        }
    )
)

#monitor data statistics and quality, e.g number of null values
expectation_suite_trans.add_expectation(
    ExpectationConfiguration(
      expectation_type="expect_column_mean_to_be_between",
      kwargs={"column":"age_at_transaction", "min_value": 55, "max_value": 70}
    )
)

expectation_suite_trans.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column":"category"}
    )
)

ge_trans_df = ge.from_pandas(trans_df, expectation_suite=expectation_suite_trans)

validation_report_trans = ge_trans_df.validate()

2023-10-20 14:22:09,167 INFO: 	5 expectation(s) included in expectation_suite.


In [22]:
validation_report_trans

{
  "evaluation_parameters": {},
  "meta": {
    "great_expectations_version": "0.15.12",
    "expectation_suite_name": "transaction_suite",
    "run_id": {
      "run_time": "2023-10-20T14:22:09.167187+00:00",
      "run_name": null
    },
    "batch_kwargs": {
      "ge_batch_id": "0da126fe-6f54-11ee-8767-06e8b8b59184"
    },
    "batch_markers": {},
    "batch_parameters": {},
    "validation_time": "20231020T142209.167135Z",
    "expectation_suite_meta": {
      "great_expectations_version": "0.15.12"
    }
  },
  "results": [
    {
      "result": {
        "element_count": 74808,
        "missing_count": 0,
        "missing_percent": 0.0,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "unexpected_percent_total": 0.0,
        "unexpected_percent_nonmissing": 0.0,
        "partial_unexpected_list": [],
        "partial_unexpected_index_list": [],
        "partial_unexpected_counts": [],
        "unexpected_list": [],
        "unexpected_index_list": []
  

In [None]:
#Generate an Expectation Suite from your data using Great Expectations Profiler
#For complex DataFrames, Great Expectations offers a profiler which generates a basic expectation suite tailored to your data. You can then use this suite as you would any other expectation suite with Hopsworks.

# ignore deprecation warnings
expectation_suite_profiled, validation_report = ge_trans_df.profile(profiler=ge.profile.BasicSuiteBuilderProfiler)

print(f"The suite contains {len(expectation_suite_profiled['expectations'])} expectations for {len(trans_df.columns.values)} columns. See sample below\n" + ge_trans_df.get_expectation_suite().__repr__()[:455])

#Note that you cannot register the report generated by the profiler as the suite was not registered with Hopsworks before the validation was run.

# 4.Setup Automatic Validation On Insert and upload a DataFrame
# Register the expectation suite corresponding to a Feature Group with the backend

# The "ALWAYS" ingestion policy inserts data even when validation fails, 
# ideal to avoid data loss and rapid prototyping

trans_fg.save_expectation_suite(expectation_suite_trans, validation_ingestion_policy="ALWAYS")

#Once the suite is registered in the backend, data validation will run on every insert without additional boilerplate. The suite is retrieved from the backend, used to validate the DataFrame and the resulting validation report uploaded. Depending on the ingestion policy and validation success, data are subsequently inserted in the Feature Group. The example below illustrate the "ALWAYS" use case where insertion is performed despite a validation failure.

trans_job, trans_report = trans_fg.insert(trans_df)


## <span style="color:#ff5f27;"> 📡 Connecting to Hopsworks Feature Store </span>

### <span style="color:#ff5f27;"> 🪄 Creating Feature Groups </span>

A [feature group](https://docs.hopsworks.ai/3.0/concepts/fs/feature_group/fg_overview/) can be seen as a collection of conceptually related features. In this case, you will create a feature group for the transaction data and a feature group for the windowed aggregations on the transaction data. Both will have `cc_num` as primary key, which will allow you to join them when creating a dataset in the next tutorial.

Feature groups can also be used to define a namespace for features. For instance, in a real-life setting you would likely want to experiment with different window lengths. In that case, you can create feature groups with identical schema for each window length. 

Before you can create a feature group you need to connect to Hopsworks feature store.

In [None]:
import hopsworks

project = hopsworks.login()
fs = project.get_feature_store()

To create a feature group you need to give it a name and specify a primary key. It is also good to provide a description of the contents of the feature group and a version number, if it is not defined it will automatically be incremented to `1`.

In [None]:
# get or create feature group
trans_fg = fs.get_or_create_feature_group(
    name="transactions",
    version=1,
    description="Transaction data",
    primary_key=['cc_num'],
    event_time='datetime',
    partition_key=['month'],
    stream=True,
    online_enabled=True
)

# materialize feature data in to the feature group
trans_fg.insert(trans_df)

Here you have also set `online_enabled=True`, which enables low latency access to the data. A full list of arguments can be found in the [documentation](https://docs.hopsworks.ai/feature-store-api/latest/generated/api/feature_store_api/#create_feature_group).

In [None]:
feature_descriptions = [
    {"name": "tid", "description": "Transaction id"},
    {"name": "datetime", "description": "Transaction time"},
    {"name": "month", "description": "Month of the transaction"},    
    {"name": "cc_num", "description": "Number of the credit card performing the transaction"},
    {"name": "amount", "description": "Dollar amount of the transaction"},
    {"name": "country", "description": "Country in which the transaction was made"},
    {"name": "loc_delta_t_minus_1", "description": "Location of previous transaction"},
    {"name": "time_delta_t_minus_1", "description": "Time of previous transaction"},    
]

for desc in feature_descriptions: 
    trans_fg.update_feature_description(desc["name"], desc["description"])

In [None]:
# get or create feature group
labels_fg = fs.get_or_create_feature_group(
    name="fraud_labels",
    version=1,
    description="Transaction data",
    primary_key=['cc_num'],
    event_time='datetime',
    partition_key=['month'],
    stream=True,
    online_enabled=True
)


In [None]:
# materialize feature data in to the feature group
labels_fg.insert(labels_df)

You can move on and do the same thing for the profile and label feature groups.

In [None]:
# get or create feature group
profile_fg = fs.get_or_create_feature_group(
    name="profile",
    version=1,
    description="Credit card holder demographic data",
    primary_key=["cc_num"],
    partition_key=["sex"],
    stream=True,
    online_enabled=True
)


In [None]:
# materialize feature data in to the feature group
profile_fg.insert(profiles_df)

In [None]:
feature_descriptions = [
    {"name": "cc_num", "description": "Number of the credit card performing the transaction"},
    {"name": "sex", "description": "Gender of the credit card holder"},
    {"name": "birthdate", "description": "Birth date"},
]

for desc in feature_descriptions: 
    profile_fg.update_feature_description(desc["name"], desc["description"])

Click on the hyperlink printed in the cell output above to inspect your feature group in the UI.

### <span style="color:#ff5f27;"> 👓  Exploration</span>
In the Hopsworks feature store, the metadata allows for multiple levels of explorations and review. Here you will explore a few of those capacities. 

### <span style="color:#ff5f27;"> 🔎 Search</span>
Using the search function in the UI, you can query any aspect of the feature groups, feature_view and training data that was previously created.

### <span style="color:#ff5f27;"> 📊 Statistics</span>
You can also enable statistics in one or all the feature groups.

In [None]:
trans_fg = fs.get_or_create_feature_group("transactions", version=1)
trans_fg.statistics_config = {
    "enabled": True,
    "histograms": True,
    "correlations": True
}

trans_fg.update_statistics_config()
trans_fg.compute_statistics()

### ⛓️ <b> Lineage </b> 
In all the feature groups and feature view you can look at the relation between each abstractions; what feature group created which training dataset and that is used in which model.
This allows for a clear undestanding of the pipeline in relation to each element. 