# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"><img src="../images/icon102.png" width="38px"></img> **Hopsworks Feature Store** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 01: Load, Engineer & Connect</span>

<span style="font-width:bold; font-size: 1.4rem;"> This is the first part of the AML tutorial. As part of this first module, you will work with data related to credit card transactions. 
The objective of this tutorial is to demonstrate how to work with the **Hopworks Feature Store** with a goal of training and deploying a model that can predict fraudulent transactions.</span>

## **🗒️ This notebook is divided into the following sections:** 
1. **Data Loading**: Load the data. 
2. **Feature Engineering**.
2. **Hopsworks Feature Store Connection**.
3. **Feature Groups Creation**: Create feature groups and upload them to the feature store.
4. **Explore feature groups from the UI**.

![tutorial-flow](../images/01_featuregroups.png)

First of all we will load the data and do some feature engineering on it.

## <span style="color:#ff5f27;"> 📝 Imports </span>

In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import pandas as pd
import numpy as np

def create_ffn(hidden_units: list, dropout_rate: float, name: str = None) -> keras.Sequential:
    """
    Create a feedforward neural network layer.

    Parameters:
    - hidden_units (list): List of integers specifying the number of units in each hidden layer.
    - dropout_rate (float): Dropout rate for regularization.
    - name (str): Name of the layer.

    Returns:
    keras.Sequential: Feedforward neural network layer.
    """
    fnn_layers = []

    for units in hidden_units:
        fnn_layers.append(layers.BatchNormalization())
        fnn_layers.append(layers.Dropout(dropout_rate))
        fnn_layers.append(layers.Dense(units, activation=tf.nn.gelu))

    return keras.Sequential(fnn_layers, name=name)

class GraphConvLayer(layers.Layer):
    """
    Graph Convolutional Layer.

    Parameters:
    - hidden_units (list): List of integers specifying the number of units in each hidden layer.
    - dropout_rate (float): Dropout rate for regularization.
    - aggregation_type (str): Type of aggregation for neighbor messages ('sum', 'mean', 'max').
    - combination_type (str): Type of combination for node embeddings ('gated', 'gru', 'concat', 'add').
    - normalize (bool): Flag to normalize node embeddings.
    """

    def __init__(
        self,
        hidden_units: list,
        dropout_rate: float = 0.2,
        aggregation_type: str = "mean",
        combination_type: str = "concat",
        normalize: bool = False,
        *args,
        **kwargs,
    ):
        super(GraphConvLayer, self).__init__(*args, **kwargs)

        self.aggregation_type = aggregation_type
        self.combination_type = combination_type
        self.normalize = normalize

        self.ffn_prepare = create_ffn(hidden_units, dropout_rate)
        if self.combination_type == "gated":
            self.update_fn = layers.GRU(
                units=hidden_units,
                activation="tanh",
                recurrent_activation="sigmoid",
                dropout=dropout_rate,
                return_state=True,
                recurrent_dropout=dropout_rate,
            )
        else:
            self.update_fn = create_ffn(hidden_units, dropout_rate)

    def prepare(self, node_repesentations, weights=None) -> tf.Tensor:
        """
        Prepare neighbor messages.

        Parameters:
        - node_repesentations (tf.Tensor): Node representations.
        - weights (tf.Tensor): Weights for neighbor messages.

        Returns:
        tf.Tensor: Prepared neighbor messages.
        """
        messages = self.ffn_prepare(node_repesentations)
        if weights is not None:
            messages = messages * tf.expand_dims(weights, -1)
        return messages

    def aggregate(self, node_indices, neighbour_messages) -> tf.Tensor:
        """
        Aggregate neighbor messages.

        Parameters:
        - node_indices (tf.Tensor): Node indices.
        - neighbour_messages (tf.Tensor): Neighbor messages.

        Returns:
        tf.Tensor: Aggregated messages.
        """
        num_nodes = tf.math.reduce_max(node_indices) + 1
        if self.aggregation_type == "sum":
            aggregated_message = tf.math.unsorted_segment_sum(
                neighbour_messages, node_indices, num_segments=num_nodes
            )
        elif self.aggregation_type == "mean":
            aggregated_message = tf.math.unsorted_segment_mean(
                neighbour_messages, node_indices, num_segments=num_nodes
            )
        elif self.aggregation_type == "max":
            aggregated_message = tf.math.unsorted_segment_max(
                neighbour_messages, node_indices, num_segments=num_nodes
            )
        else:
            raise ValueError(f"Invalid aggregation type: {self.aggregation_type}.")

        return aggregated_message

    def update(self, node_repesentations, aggregated_messages) -> tf.Tensor:
        """
        Update node embeddings.

        Parameters:
        - node_repesentations (tf.Tensor): Node representations.
        - aggregated_messages (tf.Tensor): Aggregated neighbor messages.

        Returns:
        tf.Tensor: Updated node embeddings.
        """
        if self.combination_type == "gru":
            h = tf.stack([node_repesentations, aggregated_messages], axis=1)
        elif self.combination_type == "concat":
            h = tf.concat([node_repesentations, aggregated_messages], axis=1)
        elif self.combination_type == "add":
            h = node_repesentations + aggregated_messages
        else:
            raise ValueError(f"Invalid combination type: {self.combination_type}.")

        node_embeddings = self.update_fn(h)
        if self.combination_type == "gru":
            node_embeddings = tf.unstack(node_embeddings, axis=1)[-1]

        if self.normalize:
            node_embeddings = tf.nn.l2_normalize(node_embeddings, axis=-1)
        return node_embeddings

    def call(self, inputs) -> tf.Tensor:
        """
        Process inputs to produce node embeddings.

        Parameters:
        inputs: a tuple of three elements: node_repesentations, edges, edge_weights.

        Returns:
        tf.Tensor: Node embeddings.
        """
        node_repesentations, edges, edge_weights = inputs
        node_indices, neighbour_indices = edges[0], edges[1]
        neighbour_repesentations = tf.gather(node_repesentations, neighbour_indices)

        neighbour_messages = self.prepare(neighbour_repesentations, edge_weights)
        aggregated_messages = self.aggregate(node_indices, neighbour_messages)
        return self.update(node_repesentations, aggregated_messages)


class GNNNodeClassifier(tf.keras.Model):
    """
    Graph Neural Network Node Classifier.

    Parameters:
    - graph_info: Tuple of node_features, edges, and edge_weights.
    - hidden_units (list): List of integers specifying the number of units in each hidden layer.
    - aggregation_type (str): Type of aggregation for neighbor messages ('sum', 'mean', 'max').
    - combination_type (str): Type of combination for node embeddings ('gated', 'gru', 'concat', 'add').
    - dropout_rate (float): Dropout rate for regularization.
    - normalize (bool): Flag to normalize node embeddings.
    """

    def __init__(
        self,
        graph_info: tuple,
        hidden_units: list,
        aggregation_type: str = "sum",
        combination_type: str = "concat",
        dropout_rate: float = 0.2,
        normalize: bool = True,
        *args,
        **kwargs,
    ):
        super(GNNNodeClassifier, self).__init__(*args, **kwargs)

        node_features, edges, edge_weights = graph_info
        self.node_features = node_features
        self.edges = edges
        self.edge_weights = edge_weights

        if self.edge_weights is None:
            self.edge_weights = tf.ones(shape=edges.shape[1])

        self.edge_weights = self.edge_weights / tf.math.reduce_sum(self.edge_weights)

        self.preprocess = create_ffn(hidden_units, dropout_rate, name="preprocess")
        self.conv1 = GraphConvLayer(
            hidden_units,
            dropout_rate,
            aggregation_type,
            combination_type,
            normalize,
            name="graph_conv1",
        )
        self.conv2 = GraphConvLayer(
            hidden_units,
            dropout_rate,
            aggregation_type,
            combination_type,
            normalize,
            name="graph_conv2",
        )
        self.postprocess = create_ffn(hidden_units, dropout_rate, name="postprocess")
        self.compute_logits = layers.Dense(hidden_units[0], activation=tf.nn.tanh, name="logits")

    def call(self, input_node_indices) -> tf.Tensor:
        """
        Make predictions.

        Parameters:
        - input_node_indices (tf.Tensor): Input node indices.

        Returns:
        tf.Tensor: Predictions.
        """
        x = self.preprocess(self.node_features)
        x1 = self.conv1((x, self.edges, self.edge_weights))
        x = x1 + x
        x2 = self.conv2((x, self.edges, self.edge_weights))
        x = x2 + x
        x = self.postprocess(x)
        node_embeddings = tf.gather(x, input_node_indices)
        return self.compute_logits(node_embeddings)


def construct_graph(input_df: pd.DataFrame, data_party_labels: pd.DataFrame) -> dict:
    """
    Construct a graph and generate node embeddings.

    Parameters:
    - input_df (pd.DataFrame): Input transaction DataFrame.
    - data_party_labels (pd.DataFrame): DataFrame containing party labels.

    Returns:
    dict: Dictionary with keys 'id' and 'graph_embeddings'.
    """
    sampled_party = data_party_labels[data_party_labels.id.isin(input_df.source) | (data_party_labels.id.isin(input_df.target))]
    sampled_party = sampled_party[["id", "type", "is_sar"]]

    unique_ids = set(sampled_party.id.values)
    id_dict = {idn: i for i, idn in enumerate(unique_ids)}

    sampled_party['int_id'] = sampled_party['id'].apply(lambda x: id_dict[x])
    input_df['source'] = input_df['source'].apply(lambda x: id_dict[x])
    input_df['target'] = input_df['target'].apply(lambda x: id_dict[x])

    feature_names = ["type"]
    x_train = sampled_party.int_id.to_numpy()

    edges = input_df[["source", "target"]].to_numpy().T
    edge_weights = tf.ones(shape=edges.shape[1])
    node_features = tf.cast(
        sampled_party.sort_values("id")[feature_names].to_numpy(), dtype=tf.dtypes.float32
    )
    graph_info = (node_features, edges, edge_weights)

    # Hyperparameters for graph embeddings model
    hidden_units = [32, 32]
    learning_rate = 0.01
    dropout_rate = 0.5
    num_epochs = 2
    batch_size = 256

    # Construct the model
    model = GNNNodeClassifier(
        graph_info=graph_info,
        hidden_units=hidden_units,
        dropout_rate=dropout_rate,
        name="gnn_model",
    )

    # Compile the model.
    model.compile(
        optimizer=keras.optimizers.RMSprop(learning_rate=learning_rate),
        loss=keras.losses.MeanSquaredError(),
        metrics=[keras.metrics.SparseCategoricalAccuracy(name="acc")],
    )

    # Fit the model.
    history = model.fit(
        x=x_train,
        y=x_train,
        epochs=num_epochs,
        batch_size=batch_size,
    )

    graph_embeddings = list(model.predict(x_train).reshape(node_features.shape[0], hidden_units[0]))
    return {"id": sampled_party.id.to_numpy(), "graph_embeddings": graph_embeddings}


2023-12-12 16:37:01.205018: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
import datetime
import pandas as pd
import numpy as np

def get_party_labels(data_transaction_labels: pd.DataFrame, data_party: pd.DataFrame) -> pd.DataFrame:
    """
    Assign SAR(Suspicious Activity Reports) labels to parties based on transaction data.

    Parameters:
    - data_transaction_labels (pd.DataFrame): DataFrame containing transaction labels, including SAR information.
    - data_party (pd.DataFrame): DataFrame with party information.

    Returns:
    pd.DataFrame: DataFrame with party labels indicating SAR occurrences.
    """
    alert_transactions = data_transaction_labels[data_transaction_labels.is_sar == 1]
    alert_sources = alert_transactions[["source", "tran_timestamp"]]
    alert_sources.columns = ["id", "tran_timestamp"]
    alert_targets = alert_transactions[["target", "tran_timestamp"]]
    alert_targets.columns = ["id", "tran_timestamp"]
    sar_party = alert_sources.append(alert_targets, ignore_index=True)
    sar_party.sort_values(["id", "tran_timestamp"], ascending=[False, True])

    # Find the first occurrence of SAR per ID
    sar_party = sar_party.iloc[[sar_party.id.eq(id).idxmax() for id in sar_party['id'].value_counts().index]]
    sar_party = sar_party.groupby([pd.Grouper(key='tran_timestamp', freq='M'), 'id']).agg(monthly_count=('id', 'count'))
    sar_party = sar_party.reset_index(level=["id"])
    sar_party = sar_party.reset_index(level=["tran_timestamp"])
    sar_party.drop(["monthly_count"], axis=1, inplace=True)

    sar_party["is_sar"] = sar_party["is_sar"] = 1

    party_labels = data_party.merge(sar_party, on=["id"], how="left")
    party_labels.is_sar = party_labels.is_sar.map({1.0: 1, np.nan: 0})
    max_time_stamp = datetime.datetime.utcfromtimestamp(int(max(data_transaction_labels.tran_timestamp.values)) / 1e9)
    party_labels = party_labels.fillna(max_time_stamp)

    return party_labels


In [3]:
import numpy as np
import pandas as pd

def get_transaction_labels(data_transactions: pd.DataFrame, data_alert_transactions: pd.DataFrame) -> pd.DataFrame:
    """
    Merge transaction data with alert transaction data to get labels indicating SAR occurrences.

    Parameters:
    - data_transactions (pd.DataFrame): DataFrame containing transaction information.
    - data_alert_transactions (pd.DataFrame): DataFrame with alert transaction information, including SAR labels.

    Returns:
    pd.DataFrame: Merged DataFrame with transaction labels indicating SAR occurrences.
    """
    transaction_labels = data_transactions[
        ["source", "target", "tran_id", "tran_timestamp"]
    ].merge(
        data_alert_transactions[["is_sar", "tran_id"]],
        on=["tran_id"],
        how="left",
    )
    transaction_labels.is_sar = transaction_labels.is_sar.map({
        True: 1,
        np.nan: 0,
    })
    transaction_labels.sort_values(
        'tran_id',
        inplace=True,
    )
    transaction_labels.rename(columns={"tran_id": "id"}, inplace=True)
    return transaction_labels

In [4]:
import pandas as pd
import numpy as np

def get_out_transactions(data: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate monthly outgoing transaction statistics for each source ID.

    Parameters:
    - data (pd.DataFrame): DataFrame containing transaction information.

    Returns:
    pd.DataFrame: DataFrame with monthly outgoing transaction statistics.
    """
    out_df = data.groupby([pd.Grouper(key='tran_timestamp', freq='M'), 'source']) \
        .agg(monthly_count=('source', 'count'),
             monthly_total_amount=('base_amt', 'sum'),
             monthly_mean_amount=('base_amt', 'mean'),
             monthly_std_amount=('base_amt', 'std')
             )
    out_df = out_df.reset_index(level=["source"])
    out_df = out_df.reset_index(level=["tran_timestamp"])
    out_df.columns = ["tran_timestamp", "id", "monthly_out_count", "monthly_out_total_amount",
                      "monthly_out_mean_amount", "monthly_out_std_amount"]
    out_df.tran_timestamp = out_df.tran_timestamp.values.astype(np.int64) // 10 ** 6
    return out_df


def get_in_transactions(data: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate monthly incoming transaction statistics for each target ID.

    Parameters:
    - data (pd.DataFrame): DataFrame containing transaction information.

    Returns:
    pd.DataFrame: DataFrame with monthly incoming transaction statistics.
    """
    in_df = data.groupby([pd.Grouper(key='tran_timestamp', freq='M'), 'target']) \
        .agg(monthly_count=('target', 'count'),
             monthly_total_amount=('base_amt', 'sum'),
             monthly_mean_amount=('base_amt', 'mean'),
             monthly_std_amount=('base_amt', 'std'))

    in_df = in_df.reset_index(level=["target"])
    in_df = in_df.reset_index(level=["tran_timestamp"])
    in_df.columns = ["tran_timestamp", "id", "monthly_in_count", "monthly_in_total_amount",
                     "monthly_in_mean_amount", "monthly_in_std_amount"]
    in_df.tran_timestamp = in_df.tran_timestamp.values.astype(np.int64) // 10 ** 6
    return in_df


def get_in_out_transactions(data_transactions: pd.DataFrame) -> pd.DataFrame:
    """
    Merge monthly incoming and outgoing transaction statistics.

    Parameters:
    - data_transactions (pd.DataFrame): DataFrame containing transaction information.

    Returns:
    pd.DataFrame: Merged DataFrame with monthly incoming and outgoing transaction statistics.
    """
    out_df = get_out_transactions(data_transactions)
    in_df = get_in_transactions(data_transactions)
    in_out_df = in_df.merge(out_df, on=['tran_timestamp', 'id'], how="outer")
    in_out_df = in_out_df.fillna(0)
    return in_out_df


In [5]:
import hashlib
import datetime
import pandas as pd
import numpy as np

from pprint import pprint
import json

# from features.transactions import get_in_out_transactions
# from features.transaction_labels import get_transaction_labels
# from features.party import get_party_labels
# from features.graph_embeddings import construct_graph

## <span style="color:#ff5f27;"> 💽 Loading the Data </span>

The data you will use comes from three different CSV files:

- `transactions.csv`: Transaction information such as timestamp, location, and the amount. 
- `alert_transactions.csv`: Suspicious Activity Report (SAR) transactions.
- `party.csv`: User profile information.

In a production system, these CSV files would originate from separate data sources or tables, and probably separate data pipelines. **All three files have a customer id column `id` in common, which we can use for joins.**

Let's go ahead and load the data.

### <span style="color:#ff5f27;"> ⛳️ Transactions dataset </span>

In [6]:
transactions_df = pd.read_csv(
    "https://repo.hops.works/master/hopsworks-tutorials/data/aml/transactions.csv", 
    parse_dates = ['tran_timestamp'],
)
transactions_df.head(3)

Unnamed: 0,tran_id,tx_type,base_amt,tran_timestamp,src,dst
0,496,TRANSFER-FanOut,858.77,2020-01-01 00:00:00+00:00,3aa9646b,1e46e726
1,1342,TRANSFER-Mutual,386.86,2020-01-01 00:00:00+00:00,49203bc3,a74d1101
2,1580,TRANSFER-FanOut,616.43,2020-01-02 00:00:00+00:00,616d4505,99af2455


### <span style="color:#ff5f27;"> ⛳️ Alert Transactions dataset </span>

In [7]:
alert_transactions = pd.read_csv(
    "https://repo.hops.works/master/hopsworks-tutorials/data/aml/alert_transactions.csv",
)
alert_transactions.head(3)

Unnamed: 0,alert_id,alert_type,is_sar,tran_id
0,47,gather_scatter,True,11873
1,47,gather_scatter,True,11874
2,47,gather_scatter,True,11875


### <span style="color:#ff5f27;"> ⛳️ Party dataset </span>

In [8]:
party = pd.read_csv(
    "https://repo.hops.works/master/hopsworks-tutorials/data/aml/party.csv",
)
party.head(3)

Unnamed: 0,partyId,partyType
0,5628bd6c,Organization
1,a1fcba39,Organization
2,f56c9501,Individual


## <span style="color:#ff5f27;"> 🛠️ Feature Engineering </span>

To investigate patterns of suspicious activities you will make time window aggregates such monthly frequency, total, mean and standard deviation of amount of incoming and outgoing transasactions.  


In [9]:
# Renaming columns for clarity
transactions_df.columns = ['tran_id', 'tx_type', 'base_amt', 'tran_timestamp', 'source', 'target']

# Reordering columns for better readability
transactions_df = transactions_df[["source", "target", "tran_timestamp", "tran_id", "base_amt"]]

# Displaying the first few rows of the DataFrame
transactions_df.head(3)

Unnamed: 0,source,target,tran_timestamp,tran_id,base_amt
0,3aa9646b,1e46e726,2020-01-01 00:00:00+00:00,496,858.77
1,49203bc3,a74d1101,2020-01-01 00:00:00+00:00,1342,386.86
2,616d4505,99af2455,2020-01-02 00:00:00+00:00,1580,616.43


### <span style="color:#ff5f27;">⛳️ Incoming and Outgoing transactions </span>

In [10]:
# Generating a DataFrame with monthly incoming and outgoing transaction statistics
in_out_df = get_in_out_transactions(transactions_df)

# Displaying the first few rows of the resulting DataFrame
in_out_df.head(3)

Unnamed: 0,tran_timestamp,id,monthly_in_count,monthly_in_total_amount,monthly_in_mean_amount,monthly_in_std_amount,monthly_out_count,monthly_out_total_amount,monthly_out_mean_amount,monthly_out_std_amount
0,1580428800000,0016359b,4.0,1872.92,468.23,175.2747,4.0,1843.32,460.83,252.951744
1,1580428800000,001dcc27,9.0,5874.64,652.737778,271.236889,0.0,0.0,0.0,0.0
2,1580428800000,00298665,1.0,755.64,755.64,0.0,1.0,521.11,521.11,0.0


### <span style="color:#ff5f27;"> ⛳️ Transactions identified as suspicious activity </span>

Assign labels to transactions that were identified as suspicius activity.

In [11]:
# Displaying the first few rows of the 'alert_transactions' DataFrame
alert_transactions.head(3)

Unnamed: 0,alert_id,alert_type,is_sar,tran_id
0,47,gather_scatter,True,11873
1,47,gather_scatter,True,11874
2,47,gather_scatter,True,11875


In [12]:
# Generating transaction labels based on transaction and alert transaction data
transaction_labels = get_transaction_labels(
    transactions_df, 
    alert_transactions,
)

# Displaying the first three rows of the resulting DataFrame
transaction_labels.head(3)

Unnamed: 0,source,target,id,tran_timestamp,is_sar
322886,cee9cf6d,79c248ae,2,2020-01-01 00:00:00+00:00,0
307052,65ab2f44,b20ce84b,3,2020-01-01 00:00:00+00:00,0
181198,2a39b731,a07edae4,4,2020-01-01 00:00:00+00:00,0


### <span style="color:#ff5f27;"> ⛳️ Party dataset </span>

Now lets prepare profile (party) dataset and assign lables whether they have been reported for suspicius activity or not.

In [13]:
# Renaming columns for clarity
party.columns = ["id", "type"]

# Mapping 'type' values to numerical values for better representation
party.type = party.type.map({"Individual": 0, "Organization": 1})

# Displaying the first three rows of the DataFrame
party.head(3)

Unnamed: 0,id,type
0,5628bd6c,1
1,a1fcba39,1
2,f56c9501,0


In [14]:
# Filtering transactions with SAR(Suspicious Activity Reports) labels from the generated transaction labels DataFrame
alert_transactions = transaction_labels[transaction_labels.is_sar == 1]

# Displaying the first few rows of transactions flagged as SAR
alert_transactions.head(3)

Unnamed: 0,source,target,id,tran_timestamp,is_sar
41322,5e7442f1,0bffd1da,11873,2020-01-09 00:00:00+00:00,1
62128,65c7b5a1,0bffd1da,11874,2020-01-09 00:00:00+00:00,1
57575,04128f28,0bffd1da,11875,2020-01-09 00:00:00+00:00,1


In [15]:
# Generating party labels based on transaction labels and party information
party_labels = get_party_labels(
    transaction_labels, 
    party,
)

# Displaying the first three rows of the resulting DataFrame
party_labels.head(3)

  sar_party = alert_sources.append(alert_targets, ignore_index=True)


Unnamed: 0,id,type,tran_timestamp,is_sar
0,5628bd6c,1,2021-12-20 00:00:00,0
1,a1fcba39,1,2021-12-20 00:00:00,0
2,f56c9501,0,2021-12-20 00:00:00,0


## <span style="color:#ff5f27;">🧬 Graph representational learning using Graph Neural Network</span>

Finanial transactions can be represented as a dynamic network graph. Using technique of graph representation 
give as opportunity to represent transaction with a broader context. In this example you will perfom node 
representation learning. 

Network architecture of the graph convolution layer for learning node represantion learning  was taken from 
[this Keras example](https://keras.io/examples/graph/gnn_citations/).  It performs the following steps:

1. **Prepare**: The input node representations are processed using a FFN to produce a *message*. You can simplify
the processing by only applying linear transformation to the representations.
2. **Aggregate**: The messages of the neighbours of each node are aggregated with
respect to the `edge_weights` using a *permutation invariant* pooling operation, such as *sum*, *mean*, and *max*,
to prepare a single aggregated message for each node. See, for example, [tf.math.unsorted_segment_sum](https://www.tensorflow.org/api_docs/python/tf/math/unsorted_segment_sum)
APIs used to aggregate neighbour messages.
3. **Update**: The `node_repesentations` and `aggregated_messages`—both of shape `[num_nodes, representation_dim]`—
are combined and processed to produce the new state of the node representations (node embeddings).
If `combination_type` is `gru`, the `node_repesentations` and `aggregated_messages` are stacked to create a sequence,
then processed by a GRU layer. Otherwise, the `node_repesentations` and `aggregated_messages` are added
or concatenated, then processed using a FFN.


### <span style="color:#ff5f27;">🔮 Compute time evolving graph embeddings</span>

In [16]:
# Grouping transaction labels by month using pandas Grouper
transaction_graphs_by_month = transaction_labels.groupby(
    pd.Grouper(key='tran_timestamp', freq='M')
).apply(lambda x: construct_graph(x, party_labels))

# The resulting variable 'transaction_graphs_by_month' is a pandas DataFrame
# where each row corresponds to a month, and the 'graph_embeddings' column contains
# the node embeddings generated for each month using the 'construct_graph' function.
# The embeddings capture the graph structure of transactions during that month.

2023-12-12 16:37:45.972169: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2


In [17]:
# Extracting timestamps and graph embeddings
timestamps = transaction_graphs_by_month.index.values
graph_embeddings = transaction_graphs_by_month.tolist()

In [18]:
# Creating an empty DataFrame to store graph embeddings
graph_embeddings_df = pd.DataFrame()

# Iterating through timestamps and corresponding graph embeddings
for timestamp, graph_embedding in zip(timestamps, graph_embeddings):
    # Creating a temporary DataFrame for each month's graph embeddings
    df_tmp = pd.DataFrame(graph_embedding)
    
    # Adding a 'tran_timestamp' column to store the timestamp for each row
    df_tmp["tran_timestamp"] = timestamp
    
    # Concatenating the temporary DataFrame to the main DataFrame
    graph_embeddings_df = pd.concat([graph_embeddings_df, df_tmp])

# Displaying the first three rows of the resulting DataFrame
graph_embeddings_df.head(3)

Unnamed: 0,id,graph_embeddings,tran_timestamp
0,5628bd6c,"[0.99970543, 0.9996497, 0.9996238, 0.99984884,...",2020-01-31
1,a1fcba39,"[0.9997054, 0.99964976, 0.9996239, 0.99984896,...",2020-01-31
2,f56c9501,"[0.9997054, 0.99964976, 0.9996239, 0.99984884,...",2020-01-31


In [19]:
# Converting 'tran_timestamp' values to milliseconds for consistency
transaction_labels.tran_timestamp = transaction_labels.tran_timestamp.values.astype(np.int64) // 10 ** 6
graph_embeddings_df.tran_timestamp = graph_embeddings_df.tran_timestamp.values.astype(np.int64) // 10 ** 6

# Converting 'tran_timestamp' values in 'party_labels' to milliseconds
party_labels.tran_timestamp = party_labels.tran_timestamp.map(lambda x: datetime.datetime.timestamp(x) * 1000)
party_labels.tran_timestamp = party_labels.tran_timestamp.values.astype(np.int64)

## <span style="color:#ff5f27;">👮🏻‍♂️ Data Validation</span>

Before you define [feature groups](https://docs.hopsworks.ai/latest/generated/feature_group/) lets define [validation rules](https://docs.hopsworks.ai/latest/generated/feature_validation/) for features. You do expect some of the features to comply with certain *rules* or *expectations*. For example: a transacted amount must be a positive value. In the case of a transacted amount arriving as a negative value you can decide whether to stop it to `write` into a feature group and throw an error or allow it to be written but provide a warning. In the next section you will create feature store `expectations`, attach them to feature groups, and apply them to dataframes being appended to said feature group.

#### Data validation with Greate Expectations in Hopsworks
You can use GE library for validation in Hopsworks features store. 

##  <img src="../images/icon102.png" width="18px"></img> Hopsworks feature store

The Hopsworks feature feature store library is Apache V2 licensed and available [here](https://github.com/logicalclocks/feature-store-api). The library is currently available for Python and JVM languages such as Scala and Java.
In this notebook, we are going to cover Python part.

You can find the complete documentation of the library here: 

The first step is to establish a connection with your Hopsworks feature store instance and retrieve the object that represents the feature store you'll be working with. 

> By default `project.get_feature_store()` returns the feature store of the project we are working with. However, it accepts also a project name as parameter to select a different feature store.

In [20]:
# !pip install --quiet hopsworks

In [21]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store()

  from .autonotebook import tqdm as notebook_tqdm


Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/282773
Connected. Call `.close()` to terminate connection gracefully.


## <span style="color:#ff5f27;">🔬 Expectations suite</span>


In [22]:
import great_expectations as ge

In [23]:
# Creating an Expectation Suite named "aml_project_validations"
expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aml_project_validations",
)

# Displaying the JSON representation of the Expectation Suite
pprint(expectation_suite.to_json_dict(), indent=2)

{ 'data_asset_type': None,
  'expectation_suite_name': 'aml_project_validations',
  'expectations': [],
  'ge_cloud_id': None,
  'meta': {'great_expectations_version': '0.14.13'}}


In [24]:
# Adding an expectation to the Expectation Suite
expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_max_to_be_between",
        kwargs={
            "column": "monthly_in_count", 
            "min_value": 0, 
            "max_value": 10000000,
        }
    )
)

# Displaying the updated Expectation Suite
pprint(expectation_suite.to_json_dict(), indent=2)

{ 'data_asset_type': None,
  'expectation_suite_name': 'aml_project_validations',
  'expectations': [ { 'expectation_type': 'expect_column_max_to_be_between',
                      'kwargs': { 'column': 'monthly_in_count',
                                  'max_value': 10000000,
                                  'min_value': 0},
                      'meta': {}}],
  'ge_cloud_id': None,
  'meta': {'great_expectations_version': '0.14.13'}}


---

## <span style="color:#ff5f27;"> 🪄 Feature Groups Creation</span>

### Feature Groups

A `Feature Groups` is a logical grouping of features, and experience has shown, that this grouping generally originates from the features being derived from the same data source. The `Feature Group` lets you save metadata along features, which defines how the Feature Store interprets them, combines them and reproduces training datasets created from them.

Generally, the features in a feature group are engineered together in an ingestion job. However, it is possible to have additional jobs to append features to an existing feature group. Furthermore, `feature groups` provide a way of defining a namespace for features, such that you can define features with the same name multiple times, but uniquely identified by the group they are contained in.

> It is important to note that `feature groups` are not groupings of features for immediate training of Machine Learning models. Instead, to ensure reusability of features, it is possible to combine features from any number of groups into training datasets.

### <span style="color:#ff5f27;">⛳️ Transactions monthly aggregates Feature Group</span>


In [25]:
# Get or create the 'transactions_monthly' feature group
transactions_fg = fs.get_or_create_feature_group(
    name="transactions_monthly",
    version=1,
    primary_key=["id"],
    partition_key=["tran_timestamp"],   
    description="transactions monthly aggregates features",
    event_time=['tran_timestamp'],
    online_enabled=True,
    stream=True,
    statistics_config={
        "enabled": True, 
        "histograms": True, 
        "correlations": True, 
        "exact_uniqueness": False,
    },
    expectation_suite=expectation_suite,
)   
# Insert data into the feature group
transactions_fg.insert(in_out_df)



Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/282773/fs/282692/fg/309629
2023-12-12 16:44:29,093 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/282773/fs/282692/fg/309629



Uploading Dataframe: 0.00% | | Rows 0/170876 | Elapsed Time: 00:00 | Remaining T

KafkaException: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Unable to produce message: Broker: Topic authorization failed"}

### <span style="color:#ff5f27;">⛳️ Party Feature Group</span>

In [None]:
# Get or create the 'party_labels' feature group
party_fg = fs.get_or_create_feature_group(
    name = "party_labels",
    version = 1,
    primary_key = ["id"],
    description = "party fg with labels",
    event_time = ['tran_timestamp'],        
    online_enabled = True,
    stream=True,
    statistics_config = {
        "enabled": True, 
        "histograms": True, 
        "correlations": True, 
        "exact_uniqueness": False,
    },
)
# Insert data into the feature group
party_fg.insert(party_labels)

### <span style="color:#ff5f27;">⛳️ Graph embeddings Feature Group</span>

In [None]:
from hsfs import engine
features = engine.get_instance().parse_schema_feature_group(graph_embeddings_df)
for f in features:
    if f.type == "array<float>":
        f.online_type = "VARBINARY(200)"   

In [None]:
# Get or create the 'graph_embeddings' feature group
graph_embeddings_fg = fs.get_or_create_feature_group(
    name="graph_embeddings",
    version=1,
    primary_key=["id"],
    description="node embeddings from transactions graph",
    event_time = ['tran_timestamp'],      
    online_enabled=True,       
    stream=True,
    statistics_config={
        "enabled": False, 
        "histograms": False, 
        "correlations": False, 
        "exact_uniqueness": False,
    },
    features=features,
)
# Insert data into the feature group
graph_embeddings_fg.insert(graph_embeddings_df)

%4|1702401702.609|FAIL|rdkafka#producer-1| [thrd:ssl://3.138.125.47:9092/bootstrap]: ssl://3.138.125.47:9092/2: Disconnected (after 9431431ms in state UP)


---
## <span style="color:#ff5f27;"> 👓 Exploration </span>

### Feature groups are now accessible and searchable in the UI
![fg-overview](images/fg_explore.gif)

## 📊 Statistics
We can explore feature statistics in the feature groups. If statistics was not enabled when feature group was created then this can be done by:

```python
transactions_fg = fs.get_or_create_feature_group(
    name = "transactions_monthly_fg", 
    version = 1)

transactions_fg.statistics_config = {
    "enabled": True,
    "histograms": True,
    "correlations": True
}

transactions_fg.update_statistics_config()
transactions_fg.compute_statistics()
```

![fg-stats](images/freature_group_stats.gif)

---
## <span style="color:#ff5f27;"> ⏭️ **Next:** Part 02 </span>
    
In the next notebook you will create a training dataset, train and deploy a trained model.