In [None]:
import hopsworks
from hops import kafka, tls

from pyspark.sql.types import * 
from pyspark.sql.functions import * 

In [None]:
project = hopsworks.login()
fs = project.get_feature_store()

In [None]:
KAFKA_TOPIC_NAME = "live_card_transactions"

In [None]:
parse_schema = StructType([StructField('tid', StringType(), True),
                           StructField('datetime', TimestampType(), True),
                           StructField('cc_num', StringType(), True),
                           StructField('amount', StringType(), True)])

In [None]:
kafka_connector = fs.get_storage_connector("demo_kafka")

In [None]:
df_read = kafka_connector.read_stream(topic = KAFKA_TOPIC_NAME, 
                                      options={"kafka.group.id": "live-1", "startingOffset": "earliest"})

In [None]:
df_deser = (
    df_read.selectExpr("CAST(value AS STRING)")
    .select(from_json("value", parse_schema).alias("value"))
    .select("value.tid", "value.datetime", "value.cc_num", "value.amount")
)

In [None]:
windowed10mSignalDF = (
    df_deser.withWatermark("datetime", "10 minutes")
    .groupBy(window("datetime", "10 minutes", "1 minute"), "cc_num")
    .agg(
        avg("amount").alias("avg_amt_per_10m"),
        stddev("amount").alias("stdev_amt_per_10m"),
        count("cc_num").alias("num_trans_per_10m"),
    )
    .selectExpr(
        "cc_num",
        "current_timestamp() as datetime",
        "num_trans_per_10m",
        "avg_amt_per_10m",
        "stdev_amt_per_10m",
    )
)

In [None]:
windowed5mSignalDF.printSchema()

In [None]:
profiles_activity_10m = fs.get_or_create_feature_group(
    name="profiles_activity_10m",
    version=7,
    description="Credit card activity over 10 minutes window (sliding window 1 minute)",
    primary_key=['cc_num'],
    event_time="datetime",
    online_enabled=True,
    statistics_config={'histograms': True, 'correlations': True},
    stream=True
)

In [None]:
query = profiles_activity_10m.insert_stream(windowed10mSignalDF)

In [None]:
query.isActive

In [None]:
query.status