# <font color='blue'>Data Science Academy</font>
# <font color='blue'>Big Data Real-Time Analytics com Python e Spark</font>

## <font color='blue'>Mini-Projeto 6</font>

### <font color='blue'>Análise de Dados de Sensores IoT em Tempo Real com Apache Spark Streaming e Apache Kafka</font>

![title](imagens/MP6.png)

In [None]:
# Versão da Linguagem Python
from platform import python_version
print('Versão da Linguagem Python Usada Neste Jupyter Notebook:', python_version())

In [None]:
# Para atualizar um pacote, execute o comando abaixo no terminal ou prompt de comando:
# pip install -U nome_pacote

# Para instalar a versão exata de um pacote, execute o comando abaixo no terminal ou prompt de comando:
#!pip install nome_pacote==versão_desejada

# Depois de instalar ou atualizar o pacote, reinicie o jupyter notebook.

# Instala o pacote watermark. 
# Esse pacote é usado para gravar as versões de outros pacotes usados neste jupyter notebook.
#!pip install -q -U watermark

In [None]:
#!pip install -q findspark

In [None]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [None]:
# Import required modules
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json

> Precisamos incluir o conector de integração do Spark Streaming com o Apache Kafka. Fique atento à versão do PySpark que está sendo usada.

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

In [None]:
# Conector
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

In [None]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Data Science Academy" --iversions

## Criando a Sessão Spark

In [None]:
# Cria a sessão Spark
spark = SparkSession.builder.appName("Mini-Projeto6").getOrCreate()

## Leitura do Kafka Spark Structured Stream

In [None]:
# Vamos criar uma subscrição no tópico que tem o streaming de dados que desejamos "puxar" os dados.
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "dsamp6") \
  .load()

## Definição do Schema da Fonte de Dados

In [None]:
# Definimos o schema dos dados que desejamos capturar para análise (temperatura)
esquema_dados_temp = StructType([StructField("leitura", 
                                             StructType([StructField("temperatura", DoubleType(), True)]), True)])

In [None]:
# Definimos o schema global dos dados no streaming
esquema_dados = StructType([ 
    StructField("id_sensor", StringType(), True), 
    StructField("id_equipamento", StringType(), True), 
    StructField("sensor", StringType(), True), 
    StructField("data_evento", StringType(), True), 
    StructField("padrao", esquema_dados_temp, True)
])

## Parse da Fonte de Dados

In [None]:
# Capturamos cada linha de dado (cada valor) como string
df_conversao = df.selectExpr("CAST(value AS STRING)")

In [None]:
# Parse do formato JSON em dataframe
df_conversao = df_conversao.withColumn("jsonData", from_json(col("value"), esquema_dados)).select("jsonData.*")

In [None]:
df_conversao.printSchema()

## Preparamos o Dataframe 

Esse dataframe está no formato que precisamos para análise.

In [None]:
# Renomeamos as colunas para simplificar nossa análise
df_conversao_temp_sensor = df_conversao.select(col("padrao.leitura.temperatura").alias("temperatura"), 
                                               col("sensor"))

In [None]:
df_conversao_temp_sensor.printSchema()

In [None]:
# Não podemos visualizar o dataframe, pois a fonte é de streaming
# df_conversao_temp_sensor.head()

## Análise de Dados em Tempo Real

In [None]:
# Aqui temos o objeto que irá conter nossa análise, o cálculo da média das temperaturas por sensor
df_media_temp_sensor = df_conversao_temp_sensor.groupby("sensor").mean("temperatura")

In [None]:
df_media_temp_sensor.printSchema()

In [None]:
# Renomeamos as colunas para simplificar nossa análise
df_media_temp_sensor = df_media_temp_sensor.select(col("sensor").alias("sensor"), 
                                                   col("avg(temperatura)").alias("media_temp"))

In [None]:
df_media_temp_sensor.printSchema()

Abaixo abrimos o streaming para análise de dados em tempo real, imprimindo o resultado no console.

In [None]:
# Objeto que inicia a consulta ao streaming com formato de console
query = df_media_temp_sensor.writeStream.outputMode("complete").format("console").start()

Envie novos arquivos para o Kafka a fim de ver a análise em tempo real por aqui. Clique no botão Stop no menu superior para interromper a célula a qualquer momento.

In [None]:
# Executamos a query do streaming e evitamos que o processo seja encerrado
query.awaitTermination()

In [None]:
query.status

In [None]:
query.lastProgress

In [None]:
query.explain()

## Análise de Dados em Tempo Real

In [None]:
# Objeto que inicia a consulta ao streaming com formato de memória (cria tabela temporária)
query_memoria = df_media_temp_sensor \
    .writeStream \
    .queryName("dsa") \
    .outputMode("complete") \
    .format("memory") \
    .start()

In [None]:
# Streams ativados
spark.streams.active

In [None]:
# Vamos manter a query executando por algum tempo e aplicando SQL aos dados em tempo real
from time import sleep

for x in range(10):
    
    spark.sql("select sensor, round(media_temp, 2) as media from dsa where media_temp > 65").show()
    sleep(3)
    
query_memoria.stop()

# Fim