# Streaming Pipeline
This example is to show how streaming pipeline can be done in Hopsworks.

## 1. Login

We will first start by initiating the Spark application and logging in to Hopsworks.

Do note that Hopsworks's streaming capabilities are only working with Spark/PySpark server, aka. you must run your code in Spark server.

In [1]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store()

Starting Spark application


ID,Application ID,Kind,State,Spark UI,Driver log
1,application_1689280007503_0004,pyspark,idle,Link,Link


SparkSession available as 'spark'.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://35.171.129.215/p/119
Connected. Call `.close()` to terminate connection gracefully.

## 2. Kafka Connection
Hopsworks can connect to your Kafka broker via **storage connector**.

Currently there is no way of creating storage connector via coding, all storage connectors must be created via UI and retrieve using `get_storage_connector` function.


In [2]:
connector = fs.get_storage_connector("moneylion_kafka")

## 3. Transform / Aggregate
a. Read the Kafka event using `read_stream` function from Kafka connector. Specify which topic you would like to consume the data.

b. Parse the Kafka event data to the expected structure.

c. Define your transformation / aggregation using PySpark.

ps. `read_stream` is returning a **streaming Spark dataframe** instead of conventional Spark dataframe. Conventional dataframe is more or less similar to `pandas` dataframe while streaming dataframe has more complexity compared to the conventional one.

In [3]:
from pyspark.sql.functions import from_json, window, col, sum, udf, when
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType, BooleanType

In [4]:
# Read data stream from Kafka
df = connector.read_stream(topic='user-transaction-full')

full_schema = StructType([StructField('user_id', StringType(), True),
                          StructField('amount', IntegerType(), True),
                          StructField('description', StringType(), True),
                          StructField('created_at', StringType(), True)])

@udf(returnType=StringType())
def categorize(description):
    if "restaurant" in description.lower():
        return "F&B"
    if "bill" in description.lower():
        return "COMMITMENT"
    return "OTHERS"

# Deserialise data from and create streaming query
df = df.selectExpr("CAST(value AS STRING)")\
                   .select(from_json("value", full_schema).alias("value"))\
                   .select("value.user_id", "value.amount", "value.description", "value.created_at")\
                   .selectExpr("CAST(user_id as string)", "CAST(amount as int)", "CAST(description as string)", "CAST(created_at as timestamp)")\
                   .withColumn("category", categorize(col("description")))\
                   .groupBy("user_id", window("created_at", "2 days", "1 days"))\
                   .agg(sum("amount").alias("total_amount"), sum(when(col("category") != "OTHERS", col("amount")).otherwise(0)).alias("essential_spending"))\
                   .select("user_id", "total_amount", "essential_spending", "window.end")\
                   .withColumnRenamed("end", "created_at")

df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- total_amount: long (nullable = true)
 |-- essential_spending: long (nullable = true)
 |-- created_at: timestamp (nullable = true)

You can always confirm the final schema of your streaming dataframe using `printSchema` function.

## 4. Feature Group
**Feature group** in Hopsworks is something similar to batch / streaming feature view in Tecton. In Hopsworks, both batch and stream are using the same feature group, we do not differentiate between them.

Creating a feature group is just as simple as calling `get_or_create_feature_group` function.

a. set `online_enabled` to `True` if you want the data of this feature group to be saved to online store.

b. set `stream` to `True` to use Hopsworks streaming write API (this is the preferred setup). This is related to how data is saved to online store, it has nothing related to the type (batch/stream) of this feature.

After created the feature group, use `insert_stream` function to ingest the transformed / aggregated streaming dataframe we have from step 3. After calling the function, it will create a streaming Spark job to continuously ingest, transform and save the feature to online store until `stop` is called or an error is hit.

When you ingest the streaming dataframe into the feature group, Hopsworks will figure out the suitable data type for every features in both offline and online store for you.

In [5]:
user_transaction_complex = fs.get_or_create_feature_group(
    name="user_transaction_complex",
    version=1,
    description="User transaction complex amount",
    primary_key=['user_id'],
    event_time='created_at',
    online_enabled=True,
    stream=True
)

user_transaction_complex.insert_stream(df, output_mode="update")

Feature Group created successfully, explore it at 
https://35.171.129.215/p/119/fs/67/fg/17
<pyspark.sql.streaming.StreamingQuery object at 0x7f8ce1caa1f0>

There are other things like great expectation (not supported for streaming dataframe), alerts, feature correlation, monitoring metrics and etc.

## 5. Feature chaining
Hopsworks do not really support feature stacking as well. If feature group B is consuming data from feature group A which is getting data from a Kafka topic, feature group B will not get updated whenever feature group A is updated.

However, you can always define the chain of features in the same streaming pipeline.

a. Consume data from Kafka.

b. Transform as feature group A and save it to online store.

c. Continue using the result streaming dataframe from (b.) to do transformation/aggregation for feature group B and save it to online store.

In this way, whenever there is new data coming in, both feature group A and B will get the updates.

In [6]:
@udf(returnType=BooleanType())
def is_big_spent(non_essential_spending):
    if non_essential_spending > 90:
        return True
    return False


df = df.withColumn("non_essential_spending", col("total_amount") - col("essential_spending"))\
       .withColumn("is_big_spent", is_big_spent(col("non_essential_spending")))\
       .select("user_id", "is_big_spent", "created_at")

df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- is_big_spent: boolean (nullable = true)
 |-- created_at: timestamp (nullable = true)

Here we save the new transformation result as another feature group named `user_transaction_big_spent`.

In [7]:
user_transaction_big_spent = fs.get_or_create_feature_group(
    name="user_transaction_big_spent",
    version=1,
    description="User transaction complex amount",
    primary_key=['user_id'],
    event_time='created_at',
    online_enabled=True,
    stream=True
)

user_transaction_big_spent.insert_stream(df, output_mode="update")

Feature Group created successfully, explore it at 
https://35.171.129.215/p/119/fs/67/fg/18
<pyspark.sql.streaming.StreamingQuery object at 0x7f8ce1f1ad30>

To stop the streaming Spark query created, call `stop` function with the Job ID.

In [8]:
#spark.streams.get(spark.streams.active[0].id).stop()

To stop the Spark cluster, call `spark.stop()`.

In [9]:
#spark.stop()