# Spark Streaming

El objetivo del ejercicio es utilizar el `fileStream` y el `SparkContext`. Se debe tomar el ejemplo anterior, buscar en la carpeta los archivos json que se fueron generando y hacer una agrupación de count para los valores de los features para ver si hay `sepal_width` repetidos. Los resultados pueden ser escritos a otros archivos json en formato `update` o mostrados en consola.


## Dependencias

Aquí se instalan las dependencias y descargan los archivos necesarios para correr este colab

In [None]:
!pip install pyspark==3.2.0
!wget https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv
!wget https://github.com/openscoring/openscoring/releases/download/2.1.0/openscoring-server-executable-2.1.0.jar
!wget https://downloads.apache.org/kafka/3.4.1/kafka_2.12-3.4.1.tgz
!tar -xzf kafka_2.12-3.4.1.tgz

In [None]:
!./kafka_2.12-3.4.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.12-3.4.1/config/zookeeper.properties
!./kafka_2.12-3.4.1/bin/kafka-server-start.sh -daemon ./kafka_2.12-3.4.1/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 20
!ps -ef | grep kafka
# iniciando el tópico iris con replicación 1 y 1 partición
!./kafka_2.12-3.4.1/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic iris
!./kafka_2.12-3.4.1/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic iris

### Imports

In [None]:
import os
import time
import requests
from json import loads, dumps

from uuid import uuid4

from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.types import StructType, DoubleType, StringType, IntegerType
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, udf, from_json, to_json, struct, md5
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import (
    MinMaxScaler,
    VectorAssembler,
    OneHotEncoder,
    StringIndexer,
    IndexToString
)

### Creando el cluster de Spark con las dependencias instaladas

En este caso, en vez de usar archivos JAR, estamos especificando los paquetes que necesitamos y Spark se encarga de descargarlos por nosotros (si no estuvieran presentes).

Adicionalmente, se crea el cluster de Spark con `local[*]` para que el cluster decida la cantidad de threads que necesita para correr el notebook.

In [None]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.apache.kafka:kafka-clients:2.8.1 --master local[*] pyspark-shell'

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("Spark Streaming") \
    .getOrCreate()
sc = spark.sparkContext

### Importando el dataset

En el siguiente bloque se define el schema. En la mayoría de los casos esto no es necesario, pero como las columnas del dataset `iris.csv` tienen puntos en los nombres: `sepal.width` Spark entiende que es un `Struct` o un objeto y trata de descomponerlo. Como no puede, este falla. Lo que hacemos para solucionar esto es cambiarle el nombre agregando *backticks* (el siguiente caracter: `)

En este caso, vamos a usar la función `cache()` al final de la definición del dataset. Esto sirve para mantener el dataset en memoria y que las operaciones sean mucho más rapidas. Hacemos esto ya que luego vamos a ver como se pueden usar un dataset estático y streaming en conjunto.

In [None]:
iris_schema = StructType().add('sepal.length', DoubleType()) \
  .add('sepal.width', DoubleType()) \
  .add('petal.length', DoubleType()) \
  .add('petal.width', DoubleType()) \
  .add('variety', StringType())

# renaming columns to remove dot for better compatibility
iris_df = spark.read.format('csv') \
  .schema(iris_schema) \
  .option('header', 'true') \
  .load('iris.csv') \
  .select(
      col('`sepal.width`').alias('sepal_width'),
      col('`sepal.length`').alias('sepal_length'),
      col('`petal.width`').alias('petal_width'),
      col('`petal.length`').alias('petal_length'),
      col('variety')
    ).cache()
iris_df.show()
iris_df.printSchema()

## Openscoring y copia de modelos

**NOTA IMPORTANTE**: en los comandos `cp` de las celdas que siguen, deben ponerse la dirección de su drive donde apunte a estos archivos. Los archivos estan disponibles en la carpeta del colab. Para conectar colab con drive, abrir los archivos (botón arriba a la izquierda que es una carpeta) y arriba de todo habrá un ícono de drive. Si se le da click se conecta y se agrega una carpeta llamada **drive** en la dirección `/content/drive`.

In [None]:
!nohup java -jar /content/openscoring-server-executable-2.1.0.jar --port 8081 &
!sleep 10

nohup: appending output to 'nohup.out'


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**NOTA IMPORTANTE**: el path debe ser donde cada uno guardo la carpeta_index_to_class y RandomForestIris.pmml
El link donde encuentran la carpeta index_to_class y el archivo PMML es: https://drive.google.com/drive/folders/135dwjynARvhTEAtdS2dm85aiYqLXLKzR?usp=sharing

In [None]:
!cp -r ./drive/MyDrive/humai/spark/index_to_class ./index_to_string
!cp -r ./drive/MyDrive/humai/spark/RandomForestIris.pmml .

### Data Locallity o Localidad de la Data

Las personas que diseñaron Spark notaron que es más costoeficiente "mover los cómputos" que "mover la data". Es decir, es más barato ejecutar los computos donde esta la data que mover la data a donde esta el computo. Por eso, no solamente Spark es procesamiento distribuido, sino que usa un patrón crucial para su funcionamiento óptimo. Este es, tener en cuenta la **Localidad de los datos**. Esto significa que los procesamientos que se envían al cluster de Spark, deben intentar poder ser performados por las máquinas en donde la data esta y evitar el *shuffling* (que los datos de una maquina termine en otra, que vimos que es costoso).

Es importante tener esto en cuenta al momento de diseñar un sistema utilizando las tecnologías vistas en este colab. Se podría pensar en una arquitectura con los modelos desplegados en la misma máquina donde esta la data, de esta manera las consultas no saldrían de esta y sería extremadamente rápido, a pesar de que fuera HTTP.

Los invito a considerar diferentes opciones y conversarlas en el discord.

In [None]:
!curl -X PUT --data-binary @RandomForestIris.pmml -H "Content-type: text/xml" http://localhost:8081/openscoring/model/RandomForestIris

## Se define la UDF para la inferencia en tiempo real

In [None]:
def make_model_prediction(sepal_width, sepal_length, petal_width, petal_length):
  body = {
    'id': f'record-{uuid4()}',
    'arguments': {'sepal_width': sepal_width, 'sepal_length': sepal_length,
             'petal_width': petal_width, 'petal_length': petal_length}
          }

  headers = {"Content-type": "application/json"}
  response = requests.post(url='http://localhost:8081/openscoring/model/RandomForestIris', json=body, headers=headers)

  return response.json()['results']['prediction']


make_model_prediction_udf = udf(make_model_prediction)

## Se carga el modelo de `IndexToString` para pasar de la predicción numérica a la clase real

In [None]:
index_to_class = IndexToString.load('./index_to_string')

In [None]:
!mkdir results
!mkdir results2
!mkdir filestream_results
!mkdir checkpoint
!mkdir checkpoint2

## Leyendo de Kafka

En este paso se ejecuta la lectura de Kafka, pero no automáticamente ni directamente, sino que se comienza un proceso que va a leer la data cuando llegue, va a desarmar el json, va a ejecutar la predicción, y va a hacer un join con la data original para ver si se predijo bien o no. Esto se ejecuta de manera asincrónica.

In [None]:
source_schema = StructType().add('sepal_length', DoubleType()) \
  .add('sepal_width', DoubleType()) \
  .add('petal_length', DoubleType()) \
  .add('petal_width', DoubleType())

streaming_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
  .option("subscribe", "iris") \
  .load() \
  .select(from_json(col('value').cast('string'), source_schema).alias('value')) \
  .select(col('value.sepal_length').alias('sepal_length'),
          col('value.sepal_width').alias('sepal_width'),
          col('value.petal_length').alias('petal_length'),
          col('value.petal_width').alias('petal_width')) \
  .select('*', make_model_prediction_udf('sepal_width', 'sepal_length',
                                    'petal_width', 'petal_length') \
          .cast(IntegerType()).alias('prediction')) \
  .join(iris_df, ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']) \
  .withColumnRenamed('variety', 'original_class')

streaming_df = index_to_class.transform(streaming_df)

streaming_df.writeStream \
  .outputMode('append') \
  .format('json') \
  .option('path', 'results') \
  .option('header', 'true') \
  .option("checkpointLocation", "checkpoint") \
  .start()

<pyspark.sql.streaming.StreamingQuery at 0x7ca161b380d0>

In [None]:
!sleep 10

def write_result(rdd):
  if not rdd.isEmpty():
    spark.read.json(rdd) \
      .write \
      .format('json') \
      .option('path',  f'filestream_results/{time.time()}') \
      .option('header', 'true') \
      .save()

ssc = StreamingContext(sc, 1)

lines = ssc.textFileStream('file:///content/results2')

lines.map(lambda x: loads(x)) \
  .map(lambda x: (x['sepal_width'], 1)) \
  .reduceByKey(lambda a, b: a + b) \
  .map(lambda x: dumps({'sepal_width': x[0], 'count': x[1]})) \
  .foreachRDD(lambda rdd: write_result(rdd))

lines.pprint()

ssc.start()

## Escribiendo a Kafka

Ya iniciado el paso anterior, se escribe a Kafka el dataset completo. Esto llegara al proceso anterior y se hara la predicción. Para ver los resutlados, ir a la carpeta ubicada en `/content/results` y buscar los archivos que comienzan en `part-000...`. Ahi estan los resultados en formato json.

En este caso como clave se eligió el hash de los valores de entrada. La realidad es que no hace diferencia ya que solo hay una partición.

In [None]:
!sleep 10

iris_df.select(to_json(struct('sepal_width', 'sepal_length', 'petal_width', 'petal_length')).alias('value')) \
  .withColumn('key', md5('value')) \
  .selectExpr("key", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
  .option("topic", "iris") \
  .save()

In [None]:
!sleep 10
!cp results/*.json ./results2
ssc.awaitTermination()