# 01: Feature Pipeline

## Create a Feldera Feature Pipeline

In [None]:
from feldera import FelderaClient, SQLContext, SQLSchema

## Set Hopsworks API KEY

In [None]:
from ipython_secrets import *
KEY = get_secret('HOPSWORKS_API_KEY')

In [None]:
# Connect to the Feldera API

# Use Feldera online sandbox
# client = FelderaClient("https://try.feldera.com", api_key = get_secret('FELDERA_API_KEY'))

# Use local Feldera instance 
client = FelderaClient("http://localhost:8080")

sql = SQLContext("hopsworks_kafka", client).get_or_create()

## Register Feldera Input Tables

In [None]:
# define the table names for feldera DDL
TBL_NAMES = ["transactions", "profiles"]

# register input table in Feldera, based on the input data
sql.register_table(TBL_NAMES[0], SQLSchema({
    "tid": "STRING",
    "date_time": "TIMESTAMP",
    "cc_num": "STRING",
    "category": "STRING",
    "amount": "DOUBLE",
    "latitude": "DOUBLE",
    "longitude": "DOUBLE",
    "city": "STRING",
    "country": "STRING",
    "fraud_label": "INT",
}))

sql.register_table(TBL_NAMES[1], SQLSchema({
    "cc_num": "STRING",
    "cc_provider": "STRING",
    "cc_type": "STRING",
    "cc_expiration_date": "STRING",
    "name": "STRING",
    "mail": "STRING",
    "birthdate": "TIMESTAMP",
    "age": "INT",
    "city": "STRING",
    "country_of_residence": "STRING",
}))

## Define Views to Compute in Feldera

### Feature Engineering

Here we will create two different kinds of features: 

#### 1. **Features that aggregate data from different sources**

This view converts the credit card expiration date from `MM/YY` formatted string to a `TIMESTAMP`, so that we can perform computations on it. 

In [None]:
sql.register_local_view(
    "cc_expiration", 
    f"""
    SELECT
        cc_num,
        CAST(
            CONCAT(
                '20',
                SUBSTRING(
                    cc_expiration_date,
                    4,
                    2
                ),
                '-',
                SUBSTRING(
                    cc_expiration_date,
                    1,
                    2
                ),
                '-01 00:00:00'
            ) AS TIMESTAMP
        ) AS cc_expiration_date
    FROM {TBL_NAMES[1]}"""
)


Compute the age of the individual during the transaction, and the number of days until the credit card expires from `profiles` and `transactions` tables.

In [None]:
sql.register_output_view(
    "combined", 
    f"""
    SELECT
        T1.*,
        T2.cc_expiration_date,
        TIMESTAMPDIFF(YEAR, T3.birthdate, T1.date_time) age_at_transaction,
        TIMESTAMPDIFF(DAY, T1.date_time, T2.cc_expiration_date) days_until_card_expires
    FROM
        {TBL_NAMES[0]} T1 LEFT JOIN cc_expiration T2
        ON
            T1.cc_num = T2.cc_num
        LEFT JOIN {TBL_NAMES[1]} T3
    ON
        T1.cc_num = T3.cc_num"""
)

#### 2. **Features that aggregate data from multiple time steps.** 
Compute the frequency of transactions and other metrics in the span of a few hours using a hopping window aggregate.

Create a 4 hour hopping window aggregation from data from transactions table

In [None]:
sql.register_local_view(
    "hop",
    f"""
    SELECT * FROM TABLE(HOP(TABLE {TBL_NAMES[0]}, DESCRIPTOR(date_time), INTERVAL 4 HOURS, INTERVAL 1 HOURS))"""
)

Compute aggregates from it

In [None]:
sql.register_local_view(
    "agg",
    """
    SELECT
        AVG(amount) AS avg_amt,
        STDDEV(CAST(amount AS DECIMAL(25,5))) as stddev_amt,
        COUNT(cc_num) as trans,
        ARRAY_AGG(date_time) as moments,
        cc_num
    FROM hop
    GROUP BY cc_num, window_start"""
)


Produces the final output view to send to the feature store

In [None]:
sql.register_output_view(
    "windowed",
    """
    SELECT
        avg_amt,
        trans,
        CAST(COALESCE(stddev_amt, 0) AS DOUBLE) as stddev_amt,
        date_time,
        cc_num
    FROM agg CROSS JOIN UNNEST(moments) as date_time"""
)

## Connect to Hopsworks Feature Store

In [None]:
import hopsworks
from hsfs import engine

# Connects to Hopsworks
project = hopsworks.login(host="c.app.hopsworks.ai", api_key_value=KEY)

kafka_api = project.get_kafka_api()

fs = project.get_feature_store()

#### Get the feature group from Hopsworks

In [None]:
profile_fg = fs.get_or_create_feature_group(
    name="profile",
    version=1
)

profile_df = profile_fg.read()

#### Connect this DataFrame as Input to Feldera

In [None]:
sql.connect_source_pandas(TBL_NAMES[1], profile_df)

#### Get Hopsworks Kafka config to stream transactions data to Feldera

In [None]:
# Get the public Kafka servers
kafka_config = engine.get_instance()._get_kafka_config(fs.id, {})

In [None]:
# Get the transactions feature group from Hopsworks
# We take the topic name from this feature group to set it as input for Feldera
trans_fg = fs.get_or_create_feature_group(
    name="raw_transactions",
    version=1
)

#### Connect the Kafka server to Feldera

In [None]:
from feldera.formats import JSONFormat, JSONUpdateFormat

in_fmt = JSONFormat().with_update_format(JSONUpdateFormat.Raw).with_array(False)

KAFKA_INPUT_TOPIC = trans_fg.topic_name
source_config = kafka_config | {"topics": [KAFKA_INPUT_TOPIC], "auto.offset.reset": "earliest"}

# Connect the transactions feature group Kafka to Feldera
sql.connect_source_kafka(TBL_NAMES[0], "hopsworks_transactions_kafka_in", source_config, in_fmt)

#### Create Feature Groups that represent Feldera Outputs

In [None]:
from hsfs.feature import Feature
import json

KAFKA_OUTPUT_TOPICS = ["transactions_fraud_streaming_fg_" + str(project.id), "transactions_aggs_fraud_streaming_fg_" + str(project.id)]

# Create a feature group for the 
combined_fg = fs.get_or_create_feature_group(
        name=KAFKA_OUTPUT_TOPICS[0],
        primary_key=["cc_num"],
        online_enabled=True,
        version=1,
        topic_name=KAFKA_OUTPUT_TOPICS[0],
        event_time="date_time",
        stream=True,
        features=[
            Feature("tid", type="string"),
            Feature("date_time", type="timestamp"),
            Feature("cc_num", type="string"),
            Feature("category", type="string"),
            Feature("amount", type="double"),
            Feature("latitude", type="double"),
            Feature("longitude", type="double"),
            Feature("city", type="string"),
            Feature("country", type="string"),
            Feature("fraud_label", type="int"),
            Feature("age_at_transaction", type="int"),
            Feature("days_until_card_expires", type="int"),
            Feature("cc_expiration_date", type="timestamp"),
        ],
)

try:
    combined_fg.save()
except Exception as e:
    print(e)

if KAFKA_OUTPUT_TOPICS[0] not in [topic.name for topic in kafka_api.get_topics()]:
    kafka_api.create_schema(KAFKA_OUTPUT_TOPICS[0], json.loads(combined_fg.avro_schema))
    kafka_api.create_topic(KAFKA_OUTPUT_TOPICS[0], KAFKA_OUTPUT_TOPICS[0], 1, replicas=1, partitions=1)

In [None]:
combined_fg.avro_schema

In [None]:
def create_sink_config(kafka_config: dict, fg, project_id):
    return kafka_config | {
        "topic": fg.topic_name,
        "auto.offset.reset": "earliest",
        "headers": [
            {
                'key': 'projectId',
                'value': str(project_id),
            },
            {
                'key': 'featureGroupId',
                'value': str(fg.id),
            },
            {
                'key': 'subjectId',
                'value': str(fg.subject["id"]),
            },
        ]
    }

#### Setup Hopsworks as a data sink for Feldera

In [None]:
from feldera.formats import AvroFormat

# Set the output format to use the avro schema from the feature group, and skip the schema id while serializing to Avro
trans_out_fmt = AvroFormat().with_schema(combined_fg.avro_schema).with_skip_schema_id(True)
sql.connect_sink_kafka("combined", "hopsworks_combined_kafka_out", create_sink_config(kafka_config, combined_fg, project.id), trans_out_fmt)

In [None]:
windowed_fg = fs.get_or_create_feature_group(
    name=str(KAFKA_OUTPUT_TOPICS[1]),
    primary_key=["cc_num"],
    online_enabled=True,
    version=1,
    topic_name=KAFKA_OUTPUT_TOPICS[1],
    event_time="date_time",
    stream=True,
    features=[
        Feature("avg_amt", type="double"),
        Feature("trans", type="bigint"),
        Feature("stddev_amt", type="double"),
        Feature("date_time", type="timestamp"),
        Feature("cc_num", type="string"),
    ],
)


try:
    windowed_fg.save()
except Exception as e:
    print(e)

In [None]:
windowed_fg.avro_schema

In [None]:
# Set the output format to use the avro schema from the feature group, and skip the schema id while serializing to Avro
win_out_fmt = AvroFormat().with_schema(windowed_fg.avro_schema).with_skip_schema_id(True)
sql.connect_sink_kafka("windowed", "hopsworks_windowed_kafka_out", create_sink_config(kafka_config, windowed_fg, project.id), win_out_fmt)

## Run Feldera Pipeline

In [None]:
sql.start()

In [None]:
import time

time.sleep(60)

sql.shutdown()

## Set a materialization job for the feature group in Hopsworks

In [None]:
import datetime

# materialize every 10 minutes
combined_fg.materialization_job.schedule(cron_expression = "0 /10 * ? * * *", start_time=datetime.datetime.now(tz=datetime.timezone.utc))
windowed_fg.materialization_job.schedule(cron_expression = "0 /10 * ? * * *", start_time=datetime.datetime.now(tz=datetime.timezone.utc))

In [None]:
combined_fg.materialization_job.run()
windowed_fg.materialization_job.run()

# Great Expectations

In [None]:
import great_expectations as ge
from great_expectations.core import ExpectationSuite, ExpectationConfiguration

# Set the expectation suite name to "transactions_suite"
expectation_suite_transactions = ge.core.ExpectationSuite(
    expectation_suite_name="transactions_suite"
)

In [None]:
# Check binary fraud_label column to be in set [0,1]
expectation_suite_transactions.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_distinct_values_to_be_in_set",
        kwargs={
            "column": "fraud_label",
            "value_set": [0, 1],
        }
    )
)

# Check amount column to be not negative
expectation_suite_transactions.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column": "amount",
            "min_value": 0.0,
        }
    )
)

# Loop through specified columns ('tid', 'date_time', 'cc_num') and add expectations for null values
for column in ['tid', 'date_time', 'cc_num']:
    expectation_suite_transactions.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_be_null",
            kwargs={
                "column": column,
                "mostly": 0.0,
            }
        )
    )

In [None]:
# update the feature group to store this expectation suite
combined_fg.save_expectation_suite(expectation_suite_transactions)

In [None]:
# Update feature descriptions
feature_descriptions = [
    {"name": "tid", "description": "Transaction id"},
    {"name": "date_time", "description": "Transaction time"},
    {"name": "cc_num", "description": "Number of the credit card performing the transaction"},
    {"name": "category", "description": "Expense category"},
    {"name": "amount", "description": "Dollar amount of the transaction"},
    {"name": "latitude", "description": "Transaction location latitude"},
    {"name": "longitude", "description": "Transaction location longitude"},
    {"name": "city", "description": "City in which the transaction was made"},
    {"name": "country", "description": "Country in which the transaction was made"},
    {"name": "fraud_label", "description": "Whether the transaction was fraudulent or not"},
    {"name": "age_at_transaction", "description": "Age of the card holder when the transaction was made"},
    {"name": "days_until_card_expires", "description": "Card validity days left when the transaction was made"},
]

for desc in feature_descriptions: 
    combined_fg.update_feature_description(desc["name"], desc["description"])