# spotter

This is a text cell. Start editing!

In [1]:
val schemaRegistryURL = "http://schema-registry:8081"
val topic = "spot-price-topic"

In [2]:
val df = spark.read
        .format("kafka")
        .option("kafka.bootstrap.servers", "broker:29092")
        .option("subscribe", topic)
        .option("startingOffsets", "earliest") // From starting
        .load()
df.printSchema

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [3]:
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient

val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryURL, 128)

val valueSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema

println(valueSchema)

{"type":"record","name":"SpotPrice","namespace":"io.github.ahappypie.spotter","fields":[{"name":"provider","type":"string"},{"name":"zone","type":"string"},{"name":"instance","type":"string"},{"name":"timestamp","type":{"type":"long","logicalType":"timestamp-millis"}},{"name":"price","type":"double"}]}


In [4]:
import org.apache.spark.sql.avro._
import org.apache.spark.sql.types.DataTypes
val avroDf = df.select(
  'key.cast(DataTypes.StringType),
  from_avro('value, valueSchema).as("value")
)

In [5]:
avroDf.printSchema

root
 |-- key: string (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- provider: string (nullable = false)
 |    |-- zone: string (nullable = false)
 |    |-- instance: string (nullable = false)
 |    |-- timestamp: timestamp (nullable = false)
 |    |-- price: double (nullable = false)



In [6]:
import za.co.absa.abris.avro.functions.from_confluent_avro
import za.co.absa.abris.avro.read.confluent.SchemaManager

val schemaRegistryConfig = Map(
  SchemaManager.PARAM_SCHEMA_REGISTRY_URL          -> schemaRegistryURL,
  SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC        -> topic,
  SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME, // choose a subject name strategy
  SchemaManager.PARAM_VALUE_SCHEMA_ID              -> "latest" // set to "latest" if you want the latest schema version to used  
)

val data = df.select(from_confluent_avro(col("value"), schemaRegistryConfig) as 'data).select("data.*")
data.printSchema

root
 |-- provider: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- instance: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- price: double (nullable = true)



In [7]:
data.select(col("zone"), col("instance"), col("price"), col("timestamp"))

[zone: string, instance: string ... 2 more fields]

In [8]:
data.createOrReplaceTempView("data")
spark.sql("select instance, min(price), avg(price), max(price), std(price) from data group by instance order by instance asc")//.show(20, false)

[instance: string, min(price): double ... 3 more fields]