# Procesado de votos

Este script modela la rama real-time (es decir, la única rama que hay) de la aplicación. Spark nos genera un dataframe continuo sobre el que podemos aplicar filtros y expresiones como en un dataframe normal, con la salvedad de que el resultado no se devuelve en el mismo instante de ejecutar un `df.show()` o `df.write()`, sino que debemos decidir en el modo de escritura con `.outputMode()`. Además, no todos los formatos de escritura soportan todos los modos de salida: https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#basic-concepts y https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#output-modes.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json
from pyspark.sql.functions import col
import pyspark.sql.functions as fn
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

PACKAGES = "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0"
spark = SparkSession \
    .builder \
    .appName("StructuredVotes") \
    .config("spark.jars.packages", PACKAGES)\
    .getOrCreate()



In [None]:
from ejercicios.votes import TOPIC_VOTES, TOPIC_VOTES_ENRICHED

Obtenemos el dataframe de manera similar a los dataframes de orígenes estáticos: indicamos el origen, Kafka, y varios parámetros de configuración: la dirección del broker, el topic y el offset inicial: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries. Debemos subscribirnos desde el offset más antiguo para poder recalcular el resultado sobre todos los votos recibidos.

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("startingOffsets", "earliest") \
  .option("subscribe", TOPIC_VOTES_ENRICHED) \
  .load()


Los mensajes llegan en formato JSON, pero al contrario que con `spark.read.csv`, debemos indicar el esquema completo.

In [None]:
schema = StructType([
    StructField("CODIGO", IntegerType()),
    StructField("COMUNIDAD", StringType()),
    StructField("PROVINCIA", StringType()),
    StructField("MUNICIPIO", StringType()),
    StructField("PARTIDO", StringType())
])

Podemos usar el modo de salida por consola para hacer debugging. Si usamos el modo de salida `append`, en cada microbatch sólo se imprimen los mensajes procesados en ese batch. For formato por consola no soporta modo `complete`.

In [None]:
query = df \
    .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING) AS value") \
    .withColumn("value_json", fn.from_json(col('value'), schema)) \
    .select('topic', 'value_json') \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()


Al contrario que los clientes `Consumer` de otros ejercicios, `query.start()` arranca la query en el cluster de Spark sin bloquear el proceso python. Podemos pararla expresamente con `query.stop()` o bloquear el proceso hasta que termine la query con `query.awaitTermination()`.

In [None]:
query.stop()

In [None]:
query.awaitTermination()

También podemos volcar los datos en una vista SQL en memoria. Esta vista se puede leer como un dataframe por otros procesos de Spark.

In [None]:
query = df \
    .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING) AS value") \
    .withColumn("value_json", fn.from_json(col('value'), schema)) \
    .select('topic', 'value_json') \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName('preview') \
    .start()


In [None]:
spark.sql('SELECT * FROM preview').show()

La tabla de destino y su dataframe asociada tiene un esquema, al igual que cualquier otro dataframe. La función `to_json` genera una columna de tipo complejo que podemos simplificar con una operación select.

In [None]:
spark.sql('SELECT * FROM preview').printSchema()

In [None]:
query = df \
    .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING) AS value") \
    .withColumn("value_json", fn.from_json(col('value'), schema)) \
    .select('value_json.COMUNIDAD', 'value_json.PROVINCIA', 'value_json.PARTIDO') \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName('preview2') \
    .start()


In [None]:
spark.sql('SELECT * FROM preview2').show()

In [None]:
query.stop()

Tras leer el mensaje, una de las tareas es validar la _firma del mensaje_. El enunciado nos indica que asumamos que la firma digital está incluida en el mensaje y podemos hacer uso de una función que devuelva un booleano si la firma es satisfactoria. Como ya sabemos usar [UDFs](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.udf), podemos escribir la función en python y convertirla a UDF:

In [None]:
def process_signature(comunidad, provincia, municipio):
    return 'OK'

udf_process_signature = fn.udf(process_signature)

Ya estamos en disposición de agregar los resultados por comunidad autónoma y provincia. Al ejecutar una agregación debemos cambiar al modo de salida `complete` o `update`. El formato `memory` no soporta `update`, así que actualizaremos la tabla completa en cada iteración.

In [None]:
query = df \
    .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING) AS value") \
    .withColumn("value_json", fn.from_json(col('value'), schema)) \
    .select('value_json.COMUNIDAD', 'value_json.PROVINCIA', 'value_json.MUNICIPIO', 'value_json.PARTIDO') \
    .withColumn('SIGNATURE', udf_process_signature(col('COMUNIDAD'), col('PROVINCIA'), col('MUNICIPIO'))) \
    .where(~ fn.isnull(col('SIGNATURE'))) \
    .groupBy('COMUNIDAD', 'PROVINCIA', 'PARTIDO') \
    .agg(fn.count('*').alias('VOTOS')) \
    .sort(col('COMUNIDAD').asc(), col('PROVINCIA').asc(), col('VOTOS').desc()) \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName('dashboard') \
    .start()


# Resultados

Sobre esta tabla podemos aplicar más filtros y podríamos usarla como origen de datos para escribir en un fichero externo (por ejemplo, al terminar las votaciones) o para mostrar gráficos con la composición de escaños.

In [None]:
spark.sql("""
SELECT COMUNIDAD, PARTIDO, sum(VOTOS) as VOTOS
  FROM dashboard
  WHERE VOTOS > 2 and COMUNIDAD LIKE 'And%'
  GROUP BY COMUNIDAD, PARTIDO
  ORDER BY VOTOS DESC
""").show(100, False)

In [None]:
query.stop()

# Arquitectura Kappa: cambio de core

El enunciado nos indica que a mitad de jornada se detecta que en una de las provincias hay problemas técnicos y la firma de los votos puede haber sido manipulada. Debemos recalcular los votos sin detener el sistema de recepción, ya que el resto de mesas electorales deben seguir funcionando.

En este caso podemos calcular una segunda tabla en la que descartamos todos los votos de la provincia afectada. Sólo es necesario cambiar la lógica de la función que valida las firmas. En un caso real reescribiríamos el código y arrancaríamos un segundo proceso (o contenedor, o llamada a pyspark-submit). En este caso podemos ejecutar la query desde el mismo notebook por simplificar.

Si la provincia afectada es, por ejemplo, Granada:

In [None]:
def process_signature_v2(comunidad, provincia, municipio):
    if provincia == 'Granada':
        return None
    else:
        return 'OK'

udf_process_signature_v2 = fn.udf(process_signature_v2)

In [None]:
query2 = df \
    .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING) AS value") \
    .withColumn("value_json", fn.from_json(col('value'), schema)) \
    .select('value_json.COMUNIDAD', 'value_json.PROVINCIA', 'value_json.MUNICIPIO', 'value_json.PARTIDO') \
    .withColumn('SIGNATURE', udf_process_signature_v2(col('COMUNIDAD'), col('PROVINCIA'), col('MUNICIPIO'))) \
    .where(~ fn.isnull(col('SIGNATURE'))) \
    .groupBy('COMUNIDAD', 'PROVINCIA', 'PARTIDO') \
    .agg(fn.count('*').alias('VOTOS')) \
    .sort(col('COMUNIDAD').asc(), col('PROVINCIA').asc(), col('VOTOS').desc()) \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName('dashboard_v2') \
    .start()


In [None]:
spark.sql("""
SELECT COMUNIDAD, PARTIDO, sum(VOTOS) as VOTOS
  FROM dashboard_v2
  WHERE VOTOS > 2 and COMUNIDAD LIKE 'And%'
  GROUP BY COMUNIDAD, PARTIDO
  ORDER BY VOTOS DESC
""").show(100, False)

In [None]:
query2.stop()