### 1. Definición de función

In [0]:
def promediarValores(df):
    df.createOrReplaceTempView("resultadoMedio")
    promedios = spark.sql("""SELECT tipo, AVG(total) AS promedio FROM resultadoMedio GROUP BY tipo ORDER BY promedio DESC""")
    return promedios

### 2. Subscripción al Topic

In [0]:
tiposStreamingDF = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("subscribe", "promedios")
  .load())

### 3. Definición del esquema de los datos a recibir

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pyspark.sql.functions as F

esquema = StructType([\
  StructField("tipo", StringType()),\
  StructField("total", DoubleType())\
])

parsedDF = tiposStreamingDF.select("value").withColumn("value", F.col("value").cast(StringType())).withColumn("parejas", F.from_json(F.col("value"), esquema)).withColumn("tipo", F.col("parejas.tipo")).withColumn("total", F.col("parejas.total"))

### 4. Inicialización de stream de datos

In [0]:
promediosStreamingDF = promediarValores(parsedDF)
salida = promediosStreamingDF\
                    .writeStream\
                    .queryName("AgregacionPromedios")\
                    .outputMode("complete")\
                    .format("memory")\
                    .start()

### 5. Mostrar resultados

In [0]:
promediosDF = spark.sql("select * from AgregacionPromedios")
promediosDF.show()

+-------+-----------------+
|   tipo|         promedio|
+-------+-----------------+
|ingreso|5.833333333333333|
|  gasto|              3.0|
+-------+-----------------+

