In [0]:
spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

// 16 cores on my Databricks Cluster, 
// 3 slots for kafka (Because the Kafka topic has 3 partitions configured and Spark by default is 1:1), 
// 13 for stage 1 (the only stage), 
spark.conf.set("spark.sql.shuffle.partitions", "13")

In [0]:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming._

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders

// All of the TransformWithState code and tests are in here
import demo.heat._

In [0]:
val kafkaBootstrap = "pkc-56d1g.eastus.azure.confluent.cloud:9092"
val eventsTopic    = "player_signals"

In [0]:
val itemSchema = new StructType()
  .add("slot", IntegerType)
  .add("itemID", LongType)
  .add("displayName", StringType)
  .add("totalCost", IntegerType)   // from DDragon enrichment
  .add("count", IntegerType)

val scoresSchema = new StructType()
  .add("kills", IntegerType)
  .add("deaths", IntegerType)
  .add("assists", IntegerType)
  .add("creepScore", IntegerType)

val payloadSchema = new StructType()
  // snapshot fields
  .add("level", IntegerType)
  .add("items", ArrayType(itemSchema))
  .add("scores", scoresSchema)
  // pulse fields
  .add("etype", StringType)        // ChampionKill, DragonKill, ...
  .add("role", StringType)         // killer | victim | assister
  .add("eventId", LongType)
  .add("timeSec", DoubleType)
  .add("dragonType", StringType)
  .add("stolen", BooleanType)
  .add("turret", StringType)
  .add("firstBlood", BooleanType)
  .add("championName", StringType)
  .add("isDead", BooleanType)

// ---- top-level envelope ----
val signalSchema = new StructType()
  .add("kind", StringType)         // "snapshot" | "pulse"
  .add("tsMillis", LongType)
  .add("gameId", StringType)
  .add("riotId", StringType)
  .add("team", StringType)
  .add("payload", payloadSchema)

In [0]:
// This is useful for testing so I don't have to actually run the game
// or worry about what's on the topic
def getStreamingDF(useDelta: Boolean = false): DataFrame = {
  useDelta match {
    case true => 
      spark.readStream.table("users.alex_vanadio.player_signals")
    case false =>
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaBootstrap)
        .option("startingOffsets", "latest")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='DUZSLDUERISLSVET' password='cfltiL+DtOkmn4axL6BPsTr5+N//BAt7lQECDGcUdVpvI6HrVx75ICzlWbxs2h5g';")
        .option("subscribe", eventsTopic)
        .load()
  }
}

In [0]:
val playerSignalsStreamingDF = getStreamingDF(useDelta = false)

val playerSignals = playerSignalsStreamingDF
  .selectExpr("topic", "partition", "offset", "CAST(key AS STRING) as key", "CAST(value AS STRING) AS json", "timestamp AS kafkaTs")
  .select(col("*"), from_json(col("json"), signalSchema).as("s"), col("kafkaTs"))
  .select(
    col("key").as("kafkaKey"),
    col("kafkaTs"),
    col("s.kind"),
    col("s.tsMillis"),
    upper(trim(col("s.gameId"))).as("gameId"),
    trim(col("s.riotId")).as("riotId"),
    upper(trim(col("s.team"))).as("team"), 
    col("s.payload.*"),   // expands union payload fields; unrelated ones will be null
    col("topic"),
    col("partition"),
    col("offset")
  )

// display(playerSignals)

In [None]:
val playerSignalsWithItems = playerSignals
  .withColumn(
    "items_str",
    when(col("kind") === "snapshot" && col("items").isNotNull,
      array_join(
        transform(
          array_sort(
            transform(col("items"),
              x => struct(
                x.getField("slot").as("slot"),
                x.getField("itemID").as("itemID"),
                x.getField("displayName").as("displayName")
              )
            )
          ),
          x => format_string(
            "%s:%s:%s",
            x.getField("slot"),
            x.getField("itemID"),
            coalesce(x.getField("displayName"), lit(""))
          )
        ),
        ", "
      )
    ).otherwise(lit(null).cast("string")) // => NULL for non-snapshots (and when items is NULL)
  )

// display(playerSignalsWithItems)

This is a neat optimization because we know how small the item table really is. LoL has < 1000 items.

The other approach is to do a static-stream join, but this should be much more performant

In [0]:
// Build driver-side lookup once
val costMapLocal: Map[Long, Double] =
  spark.table("users.alex_vanadio.ddragon_items")
    .select($"itemId".cast(LongType), $"totalCost".cast(DoubleType))
    .collect()
    .map(r => r.getLong(0) -> r.getDouble(1)).toMap

val costMapCol = typedLit(costMapLocal) // MapType(LongType, DoubleType)

// Compute invValue only on snapshots (no joins)
val playerSignalsWithInv = playerSignalsWithItems
  .withColumn(
    "invValue",
    when($"kind" === "snapshot",
      aggregate(
        coalesce($"items", array()),           // items: array<struct<itemID, count, ...>>
        lit(0.0),                              // zero init
        (acc, it) =>
          acc +
            coalesce(                            // price * count
              element_at(costMapCol, it.getField("itemID").cast(LongType)),
              lit(0.0)
            ) * coalesce(it.getField("count").cast(IntegerType), lit(1))
      )
    ).otherwise(lit(null).cast(DoubleType))
  )
  .drop("items")

// display(playerSignalsWithInv)

In [0]:
import demo.heat._
implicit val heatInEnc : Encoder[HeatIn]  = Encoders.product[HeatIn]
implicit val heatOutEnc: Encoder[HeatOut] = Encoders.product[HeatOut]

// Keep just what we need, and skip rows without a player identity
val heatInDS =
  playerSignalsWithInv
    .filter($"gameId".isNotNull && $"riotId".isNotNull && $"kind".isNotNull && $"tsMillis".isNotNull && $"team".isNotNull)
    .select(
      $"gameId", $"team", $"riotId", $"tsMillis", $"kind", $"championName", $"isDead",
      $"etype", $"role",
      $"level", $"invValue", $"items_str",
      $"eventId", 
      $"scores.kills", $"scores.deaths", $"scores.assists", $"scores.creepScore"
    )
    .as[HeatIn]

// display(heatInDS)

In [0]:
import demo.heat._

val heatDS =
  heatInDS
    .groupByKey(h => s"${h.gameId}|${h.riotId}|${h.team}")
    .transformWithState(new HeatProcessor(TTLConfig(java.time.Duration.ofHours(2))),
                        TimeMode.ProcessingTime,
                        OutputMode.Update)

// display(heatDS)

In [0]:
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import java.nio.file.Files

val uuid = java.util.UUID.randomUUID.toString
val cols = heatDS.columns.map(col).toIndexedSeq
val toKafka =
  heatDS.select(
    concat_ws("|", $"gameId", $"riotId", $"team").as("key"),
    to_json(struct(cols: _*)).as("value")
  )

toKafka
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrap)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='DUZSLDUERISLSVET' password='cfltiL+DtOkmn4axL6BPsTr5+N//BAt7lQECDGcUdVpvI6HrVx75ICzlWbxs2h5g';")
  .option("topic", "heat")
  .option("checkpointLocation", s"/tmp/alexv/$uuid")
  .trigger(RealTimeTrigger.apply())
  .outputMode(OutputMode.Update())
  .start()
