In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
stream_input_path = "/Volumes/workspace/main/project_data/streaming_input"
dbutils.fs.mkdirs(stream_input_path)


## Define Streaming Schema

In [0]:
stream_schema = StructType([
    StructField("customer_id", StringType()),
    StructField("transaction_id", StringType()),
    StructField("event_time", TimestampType()),
    StructField("category", StringType()),
    StructField("merchant_id", StringType()),
    StructField("amount", DoubleType()),
    StructField("merch_lat", DoubleType()),
    StructField("merch_long", DoubleType()),
    StructField("label", IntegerType())
])


## Read Streaming Data

In [0]:
stream_df = spark.readStream \
    .schema(stream_schema) \
    .option("header", True) \
    .csv(stream_input_path)


In [0]:
customer_features = spark.table("customer_features")


## Real-Time Risk Scoring Logic

In [0]:
stream_scored = stream_df.join(
    customer_features,
    "customer_id",
    "left"
).withColumn(
    "risk_score",
    when(col("amount") > col("avg_amount") + 3 * col("amount_std"), 90)
    .when(col("amount") > col("avg_amount") + 2 * col("amount_std"), 70)
    .when(col("amount") > col("avg_amount") + col("amount_std"), 40)
    .otherwise(10)
)


## Simulate Live Streaming Input

In [0]:
spark.table("transactions") \
    .select(
        "customer_id",
        "transaction_id",
        "event_time",
        "category",
        "merchant_id",
        "amount",
        "merch_lat",
        "merch_long",
        "label"
    ) \
    .orderBy(rand()) \
    .limit(50) \
    .write \
    .mode("append") \
    .option("header", True) \
    .csv("/Volumes/workspace/main/project_data/streaming_input")


## Write Streaming Output to Managed Delta Table

In [0]:
query = stream_scored.writeStream \
    .format("delta") \
    .trigger(availableNow=True) \
    .outputMode("append") \
    .option(
        "checkpointLocation",
        "/Volumes/workspace/main/project_data/checkpoints/streaming_scored"
    ) \
    .toTable("streaming_scored_transactions")


### Re-run cell 13 and then the below cell to display

In [0]:
display(
    spark.table("streaming_scored_transactions")
         .orderBy(desc("event_time"))
         .limit(20)
)


### Clean Demo Cycle

In [0]:
spark.sql("TRUNCATE TABLE streaming_scored_transactions")
