In [1]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, TimestampType 	



# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('json-aggregator')
         # Add kafka package
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
         .getOrCreate())
sc = spark.sparkContext




In [2]:
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:29092") # kafka server      
  .option("subscribe", "patient-data") # topic
  .option("startingOffsets", "earliest") # start from beginning
  .load())



In [3]:
# create schema for patient
mySchema = StructType([
 StructField("id", IntegerType()),
 StructField("nome", StringType()),
 StructField("idade", IntegerType()),
 StructField("sexo", IntegerType()),
 StructField("peso", DoubleType()),
 StructField("altura", IntegerType()),
 StructField("bpm", DoubleType()),
 StructField("pressao", DoubleType()),
 StructField("respiracao", DoubleType()),
 StructField("temperatura", DoubleType()),
 StructField("glicemia", DoubleType()),
 StructField("saturacao_oxigenio", DoubleType()),
 StructField("estado_atividade", IntegerType()),
 StructField("dia_de_semana", IntegerType()),
 StructField("periodo_do_dia", IntegerType()),
 StructField("semana_do_mes", IntegerType()),
 StructField("estacao_do_ano", IntegerType()),
 StructField("passos", IntegerType()),
 StructField("calorias", DoubleType()),
 StructField("distancia", DoubleType()),
 StructField("tempo", DoubleType()),
 StructField("total_sleep_last_24", DoubleType()),
 StructField("deep_sleep_last_24", DoubleType()),
 StructField("light_sleep_last_24", DoubleType()),
 StructField("awake_last_24", DoubleType()),
 StructField("fumante", IntegerType()),
 StructField("genetica", IntegerType()),
 StructField("gestante", IntegerType()),
 StructField("frutas", IntegerType()),
 StructField("vegetais", IntegerType()),
 StructField("alcool", IntegerType()),
 StructField("doenca_coracao", IntegerType()),     
 StructField("avc", IntegerType()),
 StructField("colesterol_alto", IntegerType()), 
 StructField("exercicio", IntegerType()), 
 StructField("timestampstr", TimestampType()),
 StructField("timestamp_epoch", StringType())       
 
])

# extract data and ensure `eventTime` is timestamp
df = (
    df.selectExpr("CAST(value as string)")
      .select(F.from_json(F.col("value"),mySchema).alias("json_value"))
      .selectExpr("json_value.*") # gives us a dataframe with columns (eventTime,temperatura, etc...)
      .select(
          F.expr("CAST(timestampstr as timestamp)").alias("eventTime"),
          F.col("nome"),
          F.col("temperatura"),
          F.col("bpm")
      )
      
)


In [4]:

# when using window you will get a range or value resembling [start,end]. 
# I have chosen the `start` for this example
from pyspark.sql.functions import col, window

windowedAvg = ( 
    df.withWatermark("eventTime", "5 minutes") 
      .groupBy(window(F.col("eventTime"), "5 minutes").alias('eventTimeWindow'), F.col("nome"))
      .agg(F.avg("temperatura").alias("avgtemperature"),F.avg("bpm").alias("avgbpm"))       
      .orderBy(F.col("eventTimeWindow"))
      .select(
          F.col("eventTimeWindow.start").alias("eventTime"),
          F.col("nome"),
          F.col("avgtemperature"),
          F.col("avgbpm")
          
      )
)


In [5]:

# continue with your code to write to your various streams
query = windowedAvg\
        .select(
            F.expr("CAST(eventTime AS STRING)").alias("key"),
            F.expr("'{\"eventTime\":\"' || CAST(eventTime AS STRING) || '\",' || '\"nome\":' || CAST(nome AS STRING) || ',' || '\"avgbpm\":' || CAST(avgbpm AS STRING) || ',' || '\"avgtemp\":' || CAST(avgtemperature AS STRING) || '}'").alias("value")            
        ) \
        .writeStream\
        .outputMode('complete')\
        .format('console')\
        .option('truncate', 'true')\
        .start()



In [6]:

# write on kafka topic avgtemperature
# here i've chosen as an example to use the eventTime as the key and the value to be the avgtemperature
qk = (windowedAvg 
        .select(
            F.expr("CAST(eventTime AS STRING)").alias("key"),
            F.expr("'{\"eventTime\":\"' || CAST(eventTime AS STRING) || '\",' || '\"nome\":\"' || nome || '\",' || '\"avgbpm\":' || CAST(avgbpm AS STRING) || ',' || '\"avgtemp\":' || CAST(avgtemperature AS STRING) || '}'").alias("value")            
        ) 
        .writeStream 
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:29092") 
        .option("checkpointLocation", "/home/jovyan/work/json/checkpoint") 
        .option("topic", "avg-data")        
        .outputMode("complete") 
        .start()
        .awaitTermination())
        #query.awaitTermination()

StreamingQueryException: Partition patient-data-0's offset was changed from 201 to 10, some data may have been missed. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".
    
=== Streaming Query ===
Identifier: [id = 8c7b0016-4ea7-4e8f-bb3f-9c7bbe718911, runId = ef23a8d2-61b2-429c-b2b6-1953f718971b]
Current Committed Offsets: {KafkaV2[Subscribe[patient-data]]: {"patient-data":{"0":201}}}
Current Available Offsets: {KafkaV2[Subscribe[patient-data]]: {"patient-data":{"0":10}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource org.apache.spark.sql.kafka010.KafkaStreamingWrite@76965309
+- Project [cast(eventTime#119 as string) AS key#131, concat(concat(concat(concat(concat(concat(concat(concat(concat(concat(concat({"eventTime":", cast(eventTime#119 as string)), ",), "nome":"), nome#26), ",), "avgbpm":), cast(avgbpm#113 as string)), ,), "avgtemp":), cast(avgtemperature#111 as string)), }) AS value#132]
   +- Project [eventTimeWindow#105-T300000ms.start AS eventTime#119, nome#26, avgtemperature#111, avgbpm#113]
      +- Sort [eventTimeWindow#105-T300000ms ASC NULLS FIRST], true
         +- Aggregate [window#114-T300000ms, nome#26], [window#114-T300000ms AS eventTimeWindow#105-T300000ms, nome#26, avg(temperatura#34) AS avgtemperature#111, avg(bpm#31) AS avgbpm#113]
            +- Filter isnotnull(eventTime#99-T300000ms)
               +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(eventTime#99-T300000ms, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) as double) = (cast((precisetimestampconversion(eventTime#99-T300000ms, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) THEN (CEIL((cast((precisetimestampconversion(eventTime#99-T300000ms, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(eventTime#99-T300000ms, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 300000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(eventTime#99-T300000ms, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) as double) = (cast((precisetimestampconversion(eventTime#99-T300000ms, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) THEN (CEIL((cast((precisetimestampconversion(eventTime#99-T300000ms, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(eventTime#99-T300000ms, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 300000000) + 0) + 300000000), LongType, TimestampType)) AS window#114-T300000ms, eventTime#99-T300000ms, nome#26, temperatura#34, bpm#31]
                  +- EventTimeWatermark eventTime#99: timestamp, 5 minutes
                     +- Project [cast(timestampstr#60 as timestamp) AS eventTime#99, nome#26, temperatura#34, bpm#31]
                        +- Project [json_value#23.id AS id#25, json_value#23.nome AS nome#26, json_value#23.idade AS idade#27, json_value#23.sexo AS sexo#28, json_value#23.peso AS peso#29, json_value#23.altura AS altura#30, json_value#23.bpm AS bpm#31, json_value#23.pressao AS pressao#32, json_value#23.respiracao AS respiracao#33, json_value#23.temperatura AS temperatura#34, json_value#23.glicemia AS glicemia#35, json_value#23.saturacao_oxigenio AS saturacao_oxigenio#36, json_value#23.estado_atividade AS estado_atividade#37, json_value#23.dia_de_semana AS dia_de_semana#38, json_value#23.periodo_do_dia AS periodo_do_dia#39, json_value#23.semana_do_mes AS semana_do_mes#40, json_value#23.estacao_do_ano AS estacao_do_ano#41, json_value#23.passos AS passos#42, json_value#23.calorias AS calorias#43, json_value#23.distancia AS distancia#44, json_value#23.tempo AS tempo#45, json_value#23.total_sleep_last_24 AS total_sleep_last_24#46, json_value#23.deep_sleep_last_24 AS deep_sleep_last_24#47, json_value#23.light_sleep_last_24 AS light_sleep_last_24#48, ... 13 more fields]
                           +- Project [from_json(StructField(id,IntegerType,true), StructField(nome,StringType,true), StructField(idade,IntegerType,true), StructField(sexo,IntegerType,true), StructField(peso,DoubleType,true), StructField(altura,IntegerType,true), StructField(bpm,DoubleType,true), StructField(pressao,DoubleType,true), StructField(respiracao,DoubleType,true), StructField(temperatura,DoubleType,true), StructField(glicemia,DoubleType,true), StructField(saturacao_oxigenio,DoubleType,true), StructField(estado_atividade,IntegerType,true), StructField(dia_de_semana,IntegerType,true), StructField(periodo_do_dia,IntegerType,true), StructField(semana_do_mes,IntegerType,true), StructField(estacao_do_ano,IntegerType,true), StructField(passos,IntegerType,true), StructField(calorias,DoubleType,true), StructField(distancia,DoubleType,true), StructField(tempo,DoubleType,true), StructField(total_sleep_last_24,DoubleType,true), StructField(deep_sleep_last_24,DoubleType,true), StructField(light_sleep_last_24,DoubleType,true), ... 15 more fields) AS json_value#23]
                              +- Project [cast(value#8 as string) AS value#21]
                                 +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6d2dd8ae, KafkaV2[Subscribe[patient-data]]
