# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"><img src="images/icon102.png" width="38px"></img> **Hopsworks Feature Store** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 01: Load, Engineer & Connect</span>

<span style="font-width:bold; font-size: 1.4rem;"> This is the first part of the quick start series of tutorials about Hopsworks Feature Store. As part of this first module, we will work with data related to credit card transactions. 
The objective of this tutorial is to demonstrate how to work with the **Hopworks Feature Store**  for batch data with a goal of training and deploying a model that can predict fraudulent transactions.</span>

## **🗒️ This notebook is divided in 4 sections:** 
1. Loading the data and do feature engineeing,
2. Connect to the Hopsworks feature store,
3. Create feature groups and upload them to the feature store.
4. Explore feature groups from the UI.

![tutorial-flow](../images/01_featuregroups.png)

First of all we will load the data and do some feature engineering on it.

### 📝 Import librararies 

In [None]:
# Import necessary libraries for feature engineering
# common libaries for hashing and date time conversions
import hashlib
import datetime

# pandas for feature engineering 
import pandas as pd
import numpy as np

## <span style="color:#ff5f27;"> 💽 Loading the Data </span>

The data we will use comes from three different CSV files:

- `transactions.csv`: transaction information such as timestamp, location, and the amount. 
- `alert_transactions.csv`: Suspicious Activity Report (SAR) transactions.
- `party.csv`: User profile information.

In a production system, these CSV files would originate from separate data sources or tables, and probably separate data pipelines. **All three files have a customer id column `id` in common, which we can use for joins.**

Let's go ahead and load the data.

#### ⛳️ Transactions dataset

In [None]:
transactions_df = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/aml/transactions.csv", parse_dates = ['tran_timestamp'])
transactions_df.head(5)

#### ⛳️ Alert Transactions dataset

In [None]:
alert_transactions = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/aml/alert_transactions.csv")
alert_transactions.head()

#### ⛳️ Party dataset

In [None]:
party = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/aml/party.csv")
party.head()

## <span style="color:#ff5f27;"> 🛠️ Feature Engineering </span>

#### To investigate patterns of suspicious activities you will make time window aggregates such monthly frequency, total, mean and standard deviation of amount of incoming and outgoing transasactions.  


In [None]:
transactions_df.columns = ['tran_id', 'tx_type', 'base_amt', 'tran_timestamp', 'source', 'target']
transactions_df = transactions_df[["source","target","tran_timestamp","tran_id", "base_amt"]]
transactions_df.head()

##### Outgoing transactions

In [None]:
out_df = transactions_df.groupby([pd.Grouper(key='tran_timestamp', freq='M'), 'source'])\
                            .agg(monthly_count=('source','count'), 
                                 monthly_total_amount=('base_amt','sum'),
                                 monthly_mean_amount=('base_amt','mean'),
                                 monthly_std_amount=('base_amt','std')
                                )
out_df = out_df.reset_index(level=["source"])
out_df = out_df.reset_index(level=["tran_timestamp"])
out_df.columns  = ["tran_timestamp", "id", "monthly_out_count", "monthly_out_total_amount", "monthly_out_mean_amount", "monthly_out_std_amount"]
out_df.tran_timestamp = out_df.tran_timestamp.values.astype(np.int64) // 10 ** 6
out_df.head(5)

##### Incoming transactions

In [None]:
in_df = transactions_df.groupby([pd.Grouper(key='tran_timestamp', freq='M'), 'target'])\
                            .agg(monthly_count=('target','count'), 
                                 monthly_total_amount=('base_amt','sum'),
                                 monthly_mean_amount=('base_amt','mean'),
                                 monthly_std_amount=('base_amt','std'))

in_df = in_df.reset_index(level=["target"])
in_df = in_df.reset_index(level=["tran_timestamp"])
in_df.columns  = ["tran_timestamp", "id", "monthly_in_count", "monthly_in_total_amount", "monthly_in_mean_amount", "monthly_in_std_amount"]
in_df.tran_timestamp = in_df.tran_timestamp.values.astype(np.int64) // 10 ** 6
in_df.head(5)

##### Now lets join incoming and outgoing transcations datasets

In [None]:
in_out_df = in_df.merge(out_df, on=['tran_timestamp', 'id'], how="outer")
in_out_df =  in_out_df.fillna(0)
in_out_df.head(5)

#### Assign labels to transuctons that were identified as suspicius activity

In [None]:
alert_transactions.head(5)

In [None]:
transaction_labels = transactions_df[["source","target","tran_id","tran_timestamp"]].merge(alert_transactions[["is_sar", "tran_id"]], on=["tran_id"], how="left")
transaction_labels.is_sar = transaction_labels.is_sar.map({True: 1, np.nan: 0})
transaction_labels.sort_values('tran_id',inplace = True)
transaction_labels.head(5)

#### Now lets prepare profile (party) dataset and assign lables whether they have been reported for suspicius activity or not 

In [None]:
party.columns = ["id","type"]
party.type = party.type.map({"Individual": 0, "Organization": 1})

party.head(5)

In [None]:
alert_transactions = transaction_labels[transaction_labels.is_sar ==1]
alert_transactions.head()

In [None]:
alert_transactions = transaction_labels[transaction_labels.is_sar ==1]
alert_sources = alert_transactions[["source", "tran_timestamp"]]
alert_sources.columns = ["id", "tran_timestamp"]
alert_sources.head()
alert_targets = alert_transactions[["target", "tran_timestamp"]]
alert_targets.columns = ["id", "tran_timestamp"]
sar_party = alert_sources.append(alert_targets, ignore_index=True)
sar_party.sort_values(["id", "tran_timestamp"], ascending = [False, True])

# find a 1st occurence of sar per id
sar_party = sar_party.iloc[[sar_party.id.eq(id).idxmax() for id in sar_party['id'].value_counts().index]]
sar_party = sar_party.groupby([pd.Grouper(key='tran_timestamp', freq='M'), 'id']).agg(monthly_count=('id','count'))
sar_party = sar_party.reset_index(level=["id"])
sar_party = sar_party.reset_index(level=["tran_timestamp"])
sar_party.drop(["monthly_count"], axis=1, inplace=True)

sar_party["is_sar"] = sar_party["is_sar"] = 1
sar_party

In [None]:
party_labels = party.merge(sar_party, on=["id"], how="left")
party_labels.is_sar = party_labels.is_sar.map({1.0: 1, np.nan: 0})
max_time_stamp = datetime.datetime.utcfromtimestamp(int(max(transaction_labels.tran_timestamp.values))/1e9)
party_labels = party_labels.fillna(max_time_stamp)

In [None]:
party_labels.head(5)

### Graph representational learning using graph convolution layer

Finanial transactions can be represented as a dynamic network graph. Using technique of graph representation 
give as opportunity to represnet transaction with a broader context. In this examples we will perfom node 
representation leaning. 

Network architecture of the graph convolution layer for learning node represantion learning  was taken from 
[this Keras example](https://keras.io/examples/graph/gnn_citations/).  It performs the following steps:

1. **Prepare**: The input node representations are processed using a FFN to produce a *message*. You can simplify
the processing by only applying linear transformation to the representations.
2. **Aggregate**: The messages of the neighbours of each node are aggregated with
respect to the `edge_weights` using a *permutation invariant* pooling operation, such as *sum*, *mean*, and *max*,
to prepare a single aggregated message for each node. See, for example, [tf.math.unsorted_segment_sum](https://www.tensorflow.org/api_docs/python/tf/math/unsorted_segment_sum)
APIs used to aggregate neighbour messages.
3. **Update**: The `node_repesentations` and `aggregated_messages`—both of shape `[num_nodes, representation_dim]`—
are combined and processed to produce the new state of the node representations (node embeddings).
If `combination_type` is `gru`, the `node_repesentations` and `aggregated_messages` are stacked to create a sequence,
then processed by a GRU layer. Otherwise, the `node_repesentations` and `aggregated_messages` are added
or concatenated, then processed using a FFN.


In [None]:
# import libraries to compute graph embeddings 
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

tf.config.optimizer.set_jit(True)

def create_ffn(hidden_units, dropout_rate, name=None):
    fnn_layers = []

    for units in hidden_units:
        fnn_layers.append(layers.BatchNormalization())
        fnn_layers.append(layers.Dropout(dropout_rate))
        fnn_layers.append(layers.Dense(units, activation=tf.nn.gelu))

    return keras.Sequential(fnn_layers, name=name)

class GraphConvLayer(layers.Layer):
    def __init__(
        self,
        hidden_units,
        dropout_rate=0.2,
        aggregation_type="mean",
        combination_type="concat",
        normalize=False,
        *args,
        **kwargs,
    ):
        super(GraphConvLayer, self).__init__(*args, **kwargs)

        self.aggregation_type = aggregation_type
        self.combination_type = combination_type
        self.normalize = normalize

        self.ffn_prepare = create_ffn(hidden_units, dropout_rate)
        if self.combination_type == "gated":
            self.update_fn = layers.GRU(
                units=hidden_units,
                activation="tanh",
                recurrent_activation="sigmoid",
                dropout=dropout_rate,
                return_state=True,
                recurrent_dropout=dropout_rate,
            )
        else:
            self.update_fn = create_ffn(hidden_units, dropout_rate)

    def prepare(self, node_repesentations, weights=None):
        # node_repesentations shape is [num_edges, embedding_dim].
        messages = self.ffn_prepare(node_repesentations)
        if weights is not None:
            messages = messages * tf.expand_dims(weights, -1)
        return messages

    def aggregate(self, node_indices, neighbour_messages):
        # node_indices shape is [num_edges].
        # neighbour_messages shape: [num_edges, representation_dim].
        num_nodes = tf.math.reduce_max(node_indices) + 1
        if self.aggregation_type == "sum":
            aggregated_message = tf.math.unsorted_segment_sum(
                neighbour_messages, node_indices, num_segments=num_nodes
            )
        elif self.aggregation_type == "mean":
            aggregated_message = tf.math.unsorted_segment_mean(
                neighbour_messages, node_indices, num_segments=num_nodes
            )
        elif self.aggregation_type == "max":
            aggregated_message = tf.math.unsorted_segment_max(
                neighbour_messages, node_indices, num_segments=num_nodes
            )
        else:
            raise ValueError(f"Invalid aggregation type: {self.aggregation_type}.")

        return aggregated_message

    def update(self, node_repesentations, aggregated_messages):
        # node_repesentations shape is [num_nodes, representation_dim].
        # aggregated_messages shape is [num_nodes, representation_dim].
        if self.combination_type == "gru":
            # Create a sequence of two elements for the GRU layer.
            h = tf.stack([node_repesentations, aggregated_messages], axis=1)
        elif self.combination_type == "concat":
            # Concatenate the node_repesentations and aggregated_messages.
            h = tf.concat([node_repesentations, aggregated_messages], axis=1)
        elif self.combination_type == "add":
            # Add node_repesentations and aggregated_messages.
            h = node_repesentations + aggregated_messages
        else:
            raise ValueError(f"Invalid combination type: {self.combination_type}.")

        # Apply the processing function.
        node_embeddings = self.update_fn(h)
        if self.combination_type == "gru":
            node_embeddings = tf.unstack(node_embeddings, axis=1)[-1]

        if self.normalize:
            node_embeddings = tf.nn.l2_normalize(node_embeddings, axis=-1)
        return node_embeddings

    def call(self, inputs):
        """Process the inputs to produce the node_embeddings.

        inputs: a tuple of three elements: node_repesentations, edges, edge_weights.
        Returns: node_embeddings of shape [num_nodes, representation_dim].
        """

        node_repesentations, edges, edge_weights = inputs
        # Get node_indices (source) and neighbour_indices (target) from edges.
        node_indices, neighbour_indices = edges[0], edges[1]
        # neighbour_repesentations shape is [num_edges, representation_dim].
        neighbour_repesentations = tf.gather(node_repesentations, neighbour_indices)

        # Prepare the messages of the neighbours.
        neighbour_messages = self.prepare(neighbour_repesentations, edge_weights)
        # Aggregate the neighbour messages.
        aggregated_messages = self.aggregate(node_indices, neighbour_messages)
        # Update the node embedding with the neighbour messages.
        return self.update(node_repesentations, aggregated_messages)


class GNNNodeClassifier(tf.keras.Model):
    def __init__(
        self,
        graph_info,
        hidden_units,
        aggregation_type="sum",
        combination_type="concat",
        dropout_rate=0.2,
        normalize=True,
        *args,
        **kwargs,
    ):
        super(GNNNodeClassifier, self).__init__(*args, **kwargs)

        # Unpack graph_info to three elements: node_features, edges, and edge_weight.
        node_features, edges, edge_weights = graph_info
        self.node_features = node_features
        self.edges = edges
        self.edge_weights = edge_weights
        # Set edge_weights to ones if not provided.
        if self.edge_weights is None:
            self.edge_weights = tf.ones(shape=edges.shape[1])
        # Scale edge_weights to sum to 1.
        self.edge_weights = self.edge_weights / tf.math.reduce_sum(self.edge_weights)

        # Create a process layer.
        self.preprocess = create_ffn(hidden_units, dropout_rate, name="preprocess")
        # Create the first GraphConv layer.
        self.conv1 = GraphConvLayer(
            hidden_units,
            dropout_rate,
            aggregation_type,
            combination_type,
            normalize,
            name="graph_conv1",
        )
        # Create the second GraphConv layer.
        self.conv2 = GraphConvLayer(
            hidden_units,
            dropout_rate,
            aggregation_type,
            combination_type,
            normalize,
            name="graph_conv2",
        )
        # Create a postprocess layer.
        self.postprocess = create_ffn(hidden_units, dropout_rate, name="postprocess")
        # Create a compute logits layer.
        self.compute_logits = layers.Dense(hidden_units[0],  activation=tf.nn.tanh, name="logits")
        

    def call(self, input_node_indices):
        # Preprocess the node_features to produce node representations.
        x = self.preprocess(self.node_features)
        # Apply the first graph conv layer.
        x1 = self.conv1((x, self.edges, self.edge_weights))
        # Skip connection.
        x = x1 + x
        # Apply the second graph conv layer.
        x2 = self.conv2((x, self.edges, self.edge_weights))
        # Skip connection.
        x = x2 + x
        # Postprocess node embedding.
        x = self.postprocess(x)
        # Fetch node embeddings for the input node_indices.
        node_embeddings = tf.gather(x, input_node_indices)
        # Compute logits
        return self.compute_logits(node_embeddings)


In [None]:
def construct_gruph(input_df):
    sampled_party = party_labels[party_labels.id.isin(input_df.source) | (party_labels.id.isin(input_df.target))]
    sampled_party = sampled_party [["id", "type", "is_sar"]]

    # assigne unquie interger ids to each node to be compatible with thensorlfow
    unique_ids = set()
    for id in sampled_party.id.values:
      unique_ids.add(id)
    id_dict = {}

    for i, idn in enumerate(unique_ids):
        id_dict[idn]=i

    sampled_party['int_id'] = sampled_party['id'].apply(lambda x : id_dict[x])
    input_df['source'] = input_df['source'].apply(lambda x : id_dict[x])
    input_df['target'] = input_df['target'].apply(lambda x : id_dict[x])

    # construct graph info
    feature_names = ["type"]
    x_train = sampled_party.int_id.to_numpy()

    # Create an edges array (sparse adjacency matrix) of shape [2, num_edges].
    edges = input_df[["source", "target"]].to_numpy().T

    # Create an edge weights array of ones.
    edge_weights = tf.ones(shape=edges.shape[1])
    # Create a node features array of shape [num_nodes, num_features].
    node_features = tf.cast(
        sampled_party.sort_values("id")[feature_names].to_numpy(), dtype=tf.dtypes.float32
    )
    # Create graph info tuple with node_features, edges, and edge_weights.
    graph_info = (node_features, edges, edge_weights)
    
    node_features, edges, edge_weights = graph_info

    # hyper parameter for graph embeddings model
    hidden_units = [32, 32]
    learning_rate = 0.01
    dropout_rate = 0.5
    num_epochs = 2
    batch_size = 256
    
    # Construct the model
    model = GNNNodeClassifier(
        graph_info=graph_info,
        hidden_units=hidden_units,
        dropout_rate=dropout_rate,
        name="gnn_model",
    )

    # Compile the model.
    model.compile(
            #optimizer=keras.optimizers.Adam(learning_rate),
            optimizer=keras.optimizers.RMSprop(learning_rate=learning_rate),
            loss=keras.losses.MeanSquaredError(),    
            metrics=[keras.metrics.SparseCategoricalAccuracy(name="acc")],
        )
    
    # Fit the model.
    history = model.fit(
            x=x_train,
            y=x_train,
            epochs=num_epochs,
            batch_size=batch_size,
        )
    graph_embeddings = list(model.predict(x_train).reshape(node_features.shape[0], hidden_units[0]))
    # predict and return
    return {"id": sampled_party.id.to_numpy(), "graph_embeddings": graph_embeddings}

#### Compute time evolving graph embeddings

In [None]:
transaction_graphs_by_month = transaction_labels.groupby(pd.Grouper(key='tran_timestamp', freq='M')).apply(lambda x: construct_gruph(x))       

In [None]:
timestamps = transaction_graphs_by_month.index.values
graph_embeddings = transaction_graphs_by_month.tolist()

In [None]:
graph_embdeddings_df = pd.DataFrame()
for timestamp, graph_embedding in zip(timestamps, graph_embeddings):
    df_tmp = pd.DataFrame(graph_embedding)
    df_tmp["tran_timestamp"] = timestamp
    graph_embdeddings_df = pd.concat([graph_embdeddings_df, df_tmp])    
graph_embdeddings_df

#### Convert date time to unix epoc milliseconds 

In [None]:
transaction_labels.tran_timestamp = transaction_labels.tran_timestamp.values.astype(np.int64) // 10 ** 6
graph_embdeddings_df.tran_timestamp = graph_embdeddings_df.tran_timestamp.values.astype(np.int64) // 10 ** 6
party_labels.tran_timestamp = party_labels.tran_timestamp.map(lambda x: datetime.datetime.timestamp(x) * 1000)
party_labels.tran_timestamp = party_labels.tran_timestamp.values.astype(np.int64)

---

# 👮🏼‍♀️ Data Validation 

Before you define [feature groups](https://docs.hopsworks.ai/latest/generated/feature_group/) lets define [validation rules](https://docs.hopsworks.ai/latest/generated/feature_validation/) for features. You do expect some of the features to comply with certain *rules* or *expectations*. For example: a transacted amount must be a positive value. In the case of a transacted amount arriving as a negative value you can decide whether to stop it to `write` into a feature group and throw an error or allow it to be written but provide a warning. In the next section you will create feature store `expectations`, attach them to feature groups, and apply them to dataframes being appended to said feature group.

#### Data validation with Greate Expectations in Hopsworks
You can use GE library for validation in Hopsworks features store. 

##  <img src="../images/icon102.png" width="18px"></img> Hopsworks feature store

The Hopsworks feature feature store library is Apache V2 licensed and available [here](https://github.com/logicalclocks/feature-store-api). The library is currently available for Python and JVM languages such as Scala and Java.
In this notebook, we are going to cover Python part.

You can find the complete documentation of the library here: 

The first step is to establish a connection with your Hopsworks feature store instance and retrieve the object that represents the feature store you'll be working with. 

> By default `connection.get_feature_store()` returns the feature store of the project we are working with. However, it accepts also a project name as parameter to select a different feature store.

In [None]:
import hopsworks

project = hopsworks.login()

# Get the feature store handle for the project's feature store
fs = project.get_feature_store()

### 🔬 Expectations suite

In [None]:
# Define Expectation Suite - no use of HSFS
import great_expectations as ge
from pprint import pprint
import json

expectation_suite = ge.core.ExpectationSuite(expectation_suite_name="aml_project_validations")
pprint(expectation_suite.to_json_dict(), indent=2)

In [None]:
expectation_suite.add_expectation(
  ge.core.ExpectationConfiguration(
  expectation_type="expect_column_max_to_be_between",
  kwargs={"column": "monthly_in_count", "min_value": 0, "max_value": 10000000}) 
)

In [None]:
pprint(expectation_suite)

---

## <span style="color:#ff5f27;"> 🪄 Register Feature Groups </span>

### Feature Groups

A `Feature Groups` is a logical grouping of features, and experience has shown, that this grouping generally originates from the features being derived from the same data source. The `Feature Group` lets you save metadata along features, which defines how the Feature Store interprets them, combines them and reproduces training datasets created from them.

Generally, the features in a feature group are engineered together in an ingestion job. However, it is possible to have additional jobs to append features to an existing feature group. Furthermore, `feature groups` provide a way of defining a namespace for features, such that you can define features with the same name multiple times, but uniquely identified by the group they are contained in.

> It is important to note that `feature groups` are not groupings of features for immediate training of Machine Learning models. Instead, to ensure reusability of features, it is possible to combine features from any number of groups into training datasets.

#### Transactions monthly aggregates feature group

In [None]:
transactions_fg = fs.get_or_create_feature_group(
    name = "transactions_monthly_aml_fg",
    version = 1,
    primary_key = ["id"],
    partition_key = ["tran_timestamp"],   
    description = "transactions monthly aggregates features",
    event_time = ['tran_timestamp'],
    online_enabled = True,
    statistics_config = {"enabled": True, "histograms": True, "correlations": True, "exact_uniqueness": False},
    expectation_suite=expectation_suite
)   

transactions_fg.insert(in_out_df)

#### Alert Transaction labels feature group

In [None]:
transaction_labels_fg = fs.get_or_create_feature_group(
    name = "transaction_labels_aml_fg",
    version = 1,
    primary_key = ["tran_id"],
    partition_key = ["alert_type"],         
    description = "alert transactions",
    event_time = ['tran_timestamp'],    
    online_enabled = True,                                                
    statistics_config = {"enabled": True, "histograms": True, "correlations": True, "exact_uniqueness": False}
)

transaction_labels_fg.insert(transaction_labels)

#### Party feature group

In [None]:
party_fg = fs.get_or_create_feature_group(
    name = "party_aml_fg",
    version = 1,
    primary_key = ["id"],
    description = "party fg with labels",
    event_time = ['tran_timestamp'],        
    online_enabled = True,
    statistics_config = {"enabled": True, "histograms": True, "correlations": True, "exact_uniqueness": False}
)

party_fg.insert(party_labels)

#### Graph embeddings feature group

In [None]:
from hsfs import engine
features = engine.get_instance().parse_schema_feature_group(graph_embdeddings_df)
for f in features:
    if f.type == "array<float>":
        f.online_type = "VARBINARY(200)"   

In [None]:
graph_embeddings_fg = fs.get_or_create_feature_group(name="graph_embeddings_aml_fg",
                                       version=1,
                                       primary_key=["id"],
                                       description="node embeddings from transactions graph",
                                       event_time = ['tran_timestamp'],      
                                       online_enabled=True,                                                
                                       statistics_config={"enabled": False, "histograms": False, "correlations": False, "exact_uniqueness": False},
                                       features=features)

graph_embeddings_fg.insert(graph_embdeddings_df)

---

## <span style="color:#ff5f27;"> 👓 Exploration </span>

### Feature groups are now accessible and searchable in the UI
![fg-overview](images/fg_explore.gif)

## 📊 Statistics
We can explore feature statistics in the feature groups. If statistics was not enabled when feature group was created then this can be done by:

```python
transactions_fg = fs.get_or_create_feature_group(
    name = "transactions_monthly_fg", 
    version = 1)

transactions_fg.statistics_config = {
    "enabled": True,
    "histograms": True,
    "correlations": True
}

transactions_fg.update_statistics_config()
transactions_fg.compute_statistics()
```

![fg-stats](images/freature_group_stats.gif)

## <span style="color:#ff5f27;"> ⏭️ **Next:** Part 02 </span>
    
In the following notebook you will use feature groups to create feature viewa and training dataset.