<a href="https://colab.research.google.com/github/ALXAVIER-DEV/Spark/blob/master/Aula_9_Structured_Streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# **Running Pyspark in Colab**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 3.0.1 with hadoop 2.7 and Java 8. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. 
Follow the steps to install the dependencies:

In [None]:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
[33m0% [Connecting to archive.ubuntu.com (91.189.88.152)] [1 InRelease 14.2 kB/88.7[0m[33m0% [Waiting for headers] [Connected to cloud.r-project.org (65.8.186.116)] [Wai[0m[33m0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Connected to cloud.r-proje[0m                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
[33m0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait[0m                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
[33m0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [3 InRelease 3,626 B/3,626 [0m[33m0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait[0m             

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Structured Streaming - Introdução
O objetivo desse tópico é mostrar uma introdução ao Structured Streaming e como utilizá-lo em conjunto com uma source Kafka. Vamos ver como ler e exibir os dados recebidos em real-time

## Lendo o dado bruto do Kafka
Ao configurar o Spark Session, basta passar como parâmetro a config relacionada ao package do kafka

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Teste")\
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1')\
    .getOrCreate()

Criando o dataframe a partir do Kafka. Observe os parâmetros obrigatórios: **url do broker kafka** e o **nome do tópico**

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "13.82.0.139:9092") \
  .option("subscribe", "queueing.transactions") \
  .load()

Printando o schema do dado

In [None]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Escrevendo o Dataframe na memória, no formato de uma tabela temporária (utilizado apenas em ambiente de testes ou debug)

In [None]:
query = df \
  .writeStream \
  .format("memory") \
  .queryName("transactions_raw") \
  .start()

Vendo o dado recebido através do SparkSQL

In [None]:
spark.sql("""
SELECT count(*) FROM transactions_raw
""").show()

+--------+
|count(1)|
+--------+
|    9760|
+--------+



IMPORTANTE!!! Ao final do streaming, é necessário parar a query com o comando abaixo:

In [None]:
query.lastProgress

{'batchId': 456,
 'durationMs': {'addBatch': 34,
  'getBatch': 0,
  'latestOffset': 14,
  'queryPlanning': 7,
  'triggerExecution': 104,
  'walCommit': 27},
 'id': 'f9d4b4c5-8a87-408e-98bd-35a8b610e710',
 'inputRowsPerSecond': 833.3333333333334,
 'name': 'transactions_raw',
 'numInputRows': 20,
 'processedRowsPerSecond': 192.30769230769232,
 'runId': '246ce09c-0eda-4377-827f-14459597a1ef',
 'sink': {'description': 'MemorySink', 'numOutputRows': 20},
 'sources': [{'description': 'KafkaV2[Subscribe[queueing.transactions]]',
   'endOffset': {'queueing.transactions': {'0': 1433787}},
   'inputRowsPerSecond': 833.3333333333334,
   'numInputRows': 20,
   'processedRowsPerSecond': 192.30769230769232,
   'startOffset': {'queueing.transactions': {'0': 1433767}}}],
 'stateOperators': [],
 'timestamp': '2020-11-16T23:11:09.583Z'}

In [None]:
query.stop()

## Lendo o dado de forma legível (com schema)
Antes de iniciarmos, vamos definir um schema de acordo com o dado gerado pelo gerador de transações

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
import pyspark.sql.functions as f

schema = StructType([
    StructField("source", StringType(), True),
    StructField("target", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("x", IntegerType(), True),
    StructField("y", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("currency", StringType(), True)
  ]
)

Agora, criaremos o Dataframe utilizando o Kafka como source

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "13.82.0.139:9092") \
  .option("subscribe", "queueing.transactions") \
  .load()

Utilizaremos agora o schema criado previamente para traduzir o dado binário em um dado legível, dado o schema criado anteriormente

In [None]:
identified_df = df.select(f.from_json(f.col("value").cast("string"), schema).alias("json")) \
  .selectExpr(
      "json.source", 
      "json.target",
      "json.amount",
      "json.x",
      "json.y",
      "json.timestamp",
      "json.currency"
  )

Printando o schema

In [None]:
identified_df.printSchema()

Escrevendo o dataframe na memória, em formato de tabela temporária e visualizando os resultados

In [None]:
query = identified_df \
  .writeStream \
  .format("memory") \
  .queryName("transactions") \
  .start()

In [None]:
def display_df(df, limit=20):
  return df.limit(limit).toPandas()

In [None]:
display_df(spark.sql("SELECT count(*) FROM transactions"))

In [None]:
query.stop()

Veja mais sources disponíveis no Spark Structured Streaming em: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources

# Transformações básicas
Vamos decorrer essa seção executando algumas transformações básicas para agregar e exibir dados. Obs: considere a sigla NRT = Near Real Time

1) Quantidade de transações por source em NRT:

In [None]:
qnt_sources_df = identified_df \
  .groupBy("source") \
  .count()

In [None]:
query = qnt_sources_df \
  .writeStream \
  .outputMode("complete") \
  .format("memory") \
  .queryName("qnt_sources") \
  .start()

In [None]:
display_df(spark.sql("""
SELECT * FROM qnt_sources
"""))

In [None]:
query.stop()

2) O valor total movimentado por cada source começada com a letra **s**:

In [None]:
import pyspark.sql.functions as f

qnt_s_sources_df = identified_df \
  .groupBy("source") \
  .agg(f.sum("amount").alias("total_amount")) \
  .where("UPPER(source) LIKE 'S%'")

In [None]:
query = qnt_s_sources_df \
  .writeStream \
  .outputMode("complete") \
  .format("memory") \
  .queryName("qnt_s_sources") \
  .start()

In [None]:
display_df(spark.sql("""
SELECT * FROM qnt_s_sources
"""))

In [None]:
query.stop()

# Operações em Windows utilizando o conceito de event-time
Vamos ver agora como funcionam as operações em Janelas temporais (windows). Como explicação, considere o exemplo retirado da documentação do Spark Structured Streaming:

![](https://spark.apache.org/docs/latest/img/structured-streaming-window.png)


Vamos contar a quantidade de transações dentro de uma janela de 30 segundos, deslizante a cada 5 segundos.

In [None]:
import pyspark.sql.functions as f

windowed_count = identified_df \
  .groupBy(
      f.window("timestamp", "30 seconds", "5 seconds")
   ) \
  .count()

In [None]:
query = windowed_count \
  .writeStream \
  .outputMode("complete") \
  .format("memory") \
  .queryName("windowed_count") \
  .start()

In [None]:
display_df(spark.sql("""
SELECT * FROM windowed_count
"""))

In [None]:
spark.sql("""
SELECT * FROM windowed_count
""").printSchema()

In [None]:
query.stop()

Manuseando eventos atrasados com os **Watermark**. Veja mais em https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

In [None]:
windowed_watermark_count = identified_df \
  .withWatermark("timestamp", "30 seconds") \
  .groupBy(
      f.window("timestamp", "30 seconds", "5 seconds")
   ) \
  .count()

# Realizando a saída dos resultados

Sempre que o Structured Streaming realizar alguma operação de output dos dados, isso será feito em uma **sink**, ou seja, em um local fora do contexto do Spark.

Além disso, após definir o conjunto de transformações a qual o dado passará, é necessário colocar a query em streaming para executar e salvar os dados em algum local onde possam ser consumidos. Isso se dá através do método **writeStream**. Assim sendo, você pode especificar os seguintes parâmetros:
- Detalhes da output sink: formato, header, etc.
- Output mode: especifica o que será escrito na output sink.
- Query name: Opcional, um identificador único para a tabela, e assim poder realizar queries em cima do dado em memória.
- Trigger Interval:  Opcional; refere-se ao momento que ocorrerá a saída dos dados.
- Checkpoint location: local onde será salvo o checkpoint para garantir entrega end-to-end para algumas sinks.

Veja mais sobre os tipos de Output modes e suas implicações em determinadas queries no link https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes.

Veja também sobre as triggers disponíveis para uso: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

Vejamos agora algumas das sinks disponíveis para uso:


## File Sink

Modo append e sem trigger interval

In [None]:
qnt_sources_df = identified_df \
  .selectExpr("source", "amount")

In [None]:
query = qnt_sources_df \
  .writeStream \
  .outputMode("append") \
  .format("csv") \
  .option("path", "output/streaming_csv") \
  .option("checkpointLocation", "checkpoint") \
  .start()

In [None]:
query.stop()

Modo append e com trigger interval de 2 minutos

In [None]:
query = qnt_sources_df \
  .writeStream \
  .outputMode("append") \
  .trigger(processingTime='2 minutes') \
  .format("csv") \
  .option("path", "output/streaming_csv") \
  .option("checkpointLocation", "checkpoint_file") \
  .start()

In [None]:
query.stop()

## Kafka Sink

Modo complete x modo update

In [None]:
qnt_sources_df = identified_df \
  .groupBy("source") \
  .count() \
  .selectExpr("CAST(source AS STRING) AS key", "CAST(count AS STRING) AS value")

In [None]:
# trocar o modo para update

query = qnt_sources_df \
  .writeStream \
  .outputMode("complete") \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "13.82.0.139:9092") \
  .option("topic", "queueing.transactions.out")\
  .option("checkpointLocation", "checkpoint_kafka") \
  .start()

In [None]:
query.stop()

# Monitorando as suas queries de streaming ativas

Em alguns momentos, é interessante que você tenha acesso a todas as queries em execução naquele exato momento. Manter muitas conexões ativas pode ser prejudicial tanto para a estação que está executando as queries, quanto para o broker.

In [None]:
spark.streams.active

Para parar uma das seções ativas do comando anterior:

In [None]:
spark.streams.active[0].stop()

Veja uma lista de ações passíveis de serem realizadas utilizando o monitoramento das queries:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries

Exercícios

Utilizando os dados de transações financeiras coletados no broker Kafka, realize as seguintes queries em streaming:

1) Retornar a source, o valor da transação e o imposto pago, considerando 10% do valor da transação;

2) Implemente um contador de transações em streaming

3) Implemente um contador de transações utilizando uma janela de 2 minutos e um slide interval de 20 segundos: