## Structured Streaming com Kafka e Spark

#### 1. Ler o tópico do kafka “topic-kvspark” em modo batch

In [1]:
from pyspark.sql.functions import *

In [2]:
topic_read = spark.read\
    .format("kafka")\
    .option("kafka.bootstrap.servers","kafka:9092")\
    .option("subscribe","topic-kvspark")\
    .load()

#### 2. Visualizar o schema do tópico

In [3]:
topic_read.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



#### 3. Visualizar o tópico com o campo key e value convertidos em string

In [4]:
topic_string = topic_read.select(col("key").cast("string"),col("value").cast("string"))
topic_string.show()

+---+--------------------+
|key|               value|
+---+--------------------+
|  1|                Msg1|
|  1|                Msg1|
|  3|                Msg3|
|   |                Msg1|
|  3|                Msg3|
|  1|             Marcelo|
|  3|Estudo de domingo...|
|  1|             Marcelo|
|  3|Estudo de domingo...|
|  1|             Marcelo|
|  3|Estudo de domingo...|
|  2|                Msg2|
|  2|                Msg2|
|  2|           Feliciani|
|  2|           Feliciani|
|  2|           Feliciani|
+---+--------------------+



#### 4. Ler o tópico do kafka “topic-kvspark” em modo streaming

In [5]:
# se for preciso ler todas as mensagens desde o ínicio adicinar    .option("startingOffsets","earliest")\
# da forma abaixo, vai pegar as novas mensagens de stream

topic_read_stream = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers","kafka:9092")\
    .option("subscribe","topic-kvspark")\
    .load()

#### 5. Visualizar o schema do tópico em streaming

In [6]:
topic_read_stream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



#### 6. Alterar o tópico em streaming com o campo key e value convertidos para string

In [13]:
topic_string_stream = topic_read_stream.select(col("key").cast("string"),col("value").cast("string"))

#### 7. Salvar o tópico em streaming no tópico topic-kvspark-output a cada 5 segundos

In [8]:
# salvará esse topico (topic-kvspark-output) lá no broker do Kafka
topic_string_stream_output = topic_string_stream.writeStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers","kafka:9092")\
    .option("topic","topic-kvspark-output")\
    .option("checkpointLocation","/user/feliciani/kafka_checkpoint")\
    .trigger(continuous='5 seconds')\
    .start()

In [9]:
topic_string_stream_output.status

{'message': 'Initializing sources',
 'isDataAvailable': False,
 'isTriggerActive': True}

Para que fosse criado este novo tópico no Kafka foi necessário criar novas mensagens no Kafka Producer para que o Spark fizesse a captura, realizasse o processamento e gravasse o novo tópico no Kafka.

### Topic criado com sucesso no Kafka. Print do Kafka abaixo:
#### bash-4.4# kafka-topics.sh --bootstrap-server kafka:9092 --list
#### --topic-kvspark
#### --topic-spark
#### __consumer_offsets
#### topic-kvspark
#### topic-kvspark-output
#### topic-spark
#### bash-4.4#

#### 8. Salvar o tópico na pasta hdfs://namenode:8020/user/<nome>/Kafka/topic-kvspark-output

In [15]:
# cada aquivo que chegar, será salvo no HDFS

topic_string_stream_output = topic_string_stream.writeStream\
    .format("csv")\
    .option("checkpointLocation","/user/feliciani/kafka_checkpoint1")\
    .option("path","/user/feliciani/kafka/topic-kvspark-output")\
    .start()

In [19]:
# arquivo gravado no HDFS (hdfs://namenode:8020)

!hdfs dfs -ls /user/feliciani/kafka/topic-kvspark-output

Found 2 items
drwxr-xr-x   - root supergroup          0 2021-07-05 13:05 /user/feliciani/kafka/topic-kvspark-output/_spark_metadata
-rw-r--r--   2 root supergroup          0 2021-07-05 13:05 /user/feliciani/kafka/topic-kvspark-output/part-00000-1cf3eb20-3799-4ba1-9408-2db1e58d4435-c000.csv


In [20]:
# Conteúdo do arquivo está em string

!hdfs dfs -cat /user/feliciani/kafka/topic-kvspark-output/part-00000-1cf3eb20-3799-4ba1-9408-2db1e58d4435-c000.csv