# <font color='blue'>Big Data Real-Time Analytics com Python e Spark</font>

## <font color='blue'>Mini-Projeto 6</font>

### <font color='blue'>Análise de Dados de Sensores IoT em Tempo Real com Apache Spark Streaming e Apache Kafka</font>

In [1]:
# Versão da Linguagem Python
from platform import python_version
print('Versão da Linguagem Python Usada Neste Jupyter Notebook:', python_version())

Versão da Linguagem Python Usada Neste Jupyter Notebook: 3.9.13


In [2]:
# Para atualizar um pacote, execute o comando abaixo no terminal ou prompt de comando:
# pip install -U nome_pacote

# Para instalar a versão exata de um pacote, execute o comando abaixo no terminal ou prompt de comando:
#!pip install nome_pacote==versão_desejada

# Depois de instalar ou atualizar o pacote, reinicie o jupyter notebook.

# Instala o pacote watermark. 
# Esse pacote é usado para gravar as versões de outros pacotes usados neste jupyter notebook.
#!pip install -q -U watermark

In [3]:
#!pip install -q findspark

In [4]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [5]:
# Import required modules
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json

> Precisamos incluir o conector de integração do Spark Streaming com o Apache Kafka. Fique atento à versão do PySpark que está sendo usada.

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

In [6]:
# Conector
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2 pyspark-shell'

In [7]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Andrew Borges" --iversions

Author: Andrew Borges

findspark: 2.0.1
pyspark  : 3.3.2



## Criando a Sessão Spark

In [8]:
# Cria a sessão Spark
spark = SparkSession.builder.appName("Mini-Projeto6").getOrCreate()

## Leitura do Kafka Spark Structured Stream

In [9]:
# Vamos criar uma subscrição no tópico que tem o streaming de dados que desejamos "puxar" os dados.
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "dsamp6") \
    .load()

## Definição do Schema da Fonte de Dados

In [10]:
# Definimos o schema dos dados que desejamos capturar para análise (temperatura)
esquema_dados_temp = StructType([StructField("leitura",
                                            StructType([StructField("temperatura", DoubleType(), True)]), True)])

In [11]:
# Definimos o schema global dos dados no streaming
esquema_dados = StructType([
    StructField("id_sensor", StringType(), True),
    StructField("id_equipamento", StringType(), True),
    StructField("sensor", StringType(), True),
    StructField("data_evento", StringType(), True),
    StructField("padrao", esquema_dados_temp, True)
])

## Parse da Fonte de Dados

In [12]:
# Capturamos cada linha de dado (cada valor) como string
df_conversao = df.selectExpr("CAST(value AS STRING)")

In [13]:
# Parse do formato JSON em dataframe
df_conversao = df_conversao.withColumn("jsonData", from_json(col("value"), esquema_dados)).select("jsonData.*")

In [14]:
df_conversao.printSchema()

root
 |-- id_sensor: string (nullable = true)
 |-- id_equipamento: string (nullable = true)
 |-- sensor: string (nullable = true)
 |-- data_evento: string (nullable = true)
 |-- padrao: struct (nullable = true)
 |    |-- leitura: struct (nullable = true)
 |    |    |-- temperatura: double (nullable = true)



## Preparamos o Dataframe

Esse dataframe está no formato que precisamos para análise.

In [15]:
# Renomeamos as colunas para simplificar nossa análise
df_conversao_temp_sensor = df_conversao.select(col("padrao.leitura.temperatura").alias("temperatura"),
                                              col("sensor"))

In [16]:
df_conversao_temp_sensor.printSchema()

root
 |-- temperatura: double (nullable = true)
 |-- sensor: string (nullable = true)



## Análise de Dados em Tempo Real

In [17]:
# Aqui temos o objeto que irá conter nossa análise, o cálculo da média das temperaturas por sensor
df_media_temp_sensor = df_conversao_temp_sensor.groupby("sensor").mean("temperatura")

In [18]:
df_media_temp_sensor.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- avg(temperatura): double (nullable = true)



In [19]:
# Renomeamos as colunas para simplificar nossa análise
df_media_temp_sensor = df_media_temp_sensor.select(col("sensor").alias("sensor"),
                                                  col("avg(temperatura)").alias("media_temp"))

In [20]:
df_media_temp_sensor.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- media_temp: double (nullable = true)



Abaixo abrimos o streaming para análise de dados em tempo real, imprimindo o resultado no console.

In [32]:
# Objeto que inicia a consulta ao streaming com formato de console
query = df_media_temp_sensor.writeStream.outputMode("complete").format("console").start()

Envie novos arquivos para o Kafka a fim de ver a análise em tempo real por aqui. Clique no botão Stop no menu superior para interromper a célula a qualquer momento.

In [33]:
# Executamos a query do streaming e evitamos que o processo seja encerrado
# query.awaitTermination()
query.stop()

In [34]:
query.status

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

In [35]:
query.lastProgress

{'id': '0efbd42f-34aa-4e31-baa1-893b5e0ba356',
 'runId': '4fca966b-a184-4138-999d-950364ca1631',
 'name': None,
 'timestamp': '2023-04-17T15:59:14.723Z',
 'batchId': 3,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 1, 'triggerExecution': 1},
 'stateOperators': [{'operatorName': 'stateStoreSave',
   'numRowsTotal': 50,
   'numRowsUpdated': 0,
   'allUpdatesTimeMs': 591,
   'numRowsRemoved': 0,
   'allRemovalsTimeMs': 0,
   'commitTimeMs': 176835,
   'memoryUsedBytes': 101688,
   'numRowsDroppedByWatermark': 0,
   'numShufflePartitions': 200,
   'numStateStoreInstances': 200,
   'customMetrics': {'loadedMapCacheHitCount': 800,
    'loadedMapCacheMissCount': 0,
    'stateOnCurrentVersionSizeBytes': 30552}}],
 'sources': [{'description': 'KafkaV2[Subscribe[dsamp6]]',
   'startOffset': {'dsamp6': {'0': 30000}},
   'endOffset': {'dsamp6': {'0': 30000}},
   'latestOffset': {'dsamp6': {'0': 30000}},
   'numInputRows': 0,
   'inp

In [36]:
query.explain()

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@4696a783, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2899/0x0000000101349440@5669b645
+- *(4) HashAggregate(keys=[sensor#28], functions=[avg(temperatura#36)])
   +- StateStoreSave [sensor#28], state info [ checkpoint = file:/C:/Users/andrew/AppData/Local/Temp/temporary-f11d88a4-9539-4cba-886c-0625e84c91e0/state, runId = 4fca966b-a184-4138-999d-950364ca1631, opId = 0, ver = 2, numPartitions = 200], Complete, 0, 2
      +- *(3) HashAggregate(keys=[sensor#28], functions=[merge_avg(temperatura#36)])
         +- StateStoreRestore [sensor#28], state info [ checkpoint = file:/C:/Users/andrew/AppData/Local/Temp/temporary-f11d88a4-9539-4cba-886c-0625e84c91e0/state, runId = 4fca966b-a184-4138-999d-950364ca1631, opId = 0, ver = 2, numPartitions = 200], 2
            +- *(2) HashAggregate(keys=[sensor#28], functions=[merge_avg(temperatura#36)])
               +- 

## Análise de Dados em Tempo Real

In [37]:
# Objeto que inicia a consulta ao streaming com formato memória (cria tabela temporária)
query_memoria = df_media_temp_sensor \
    .writeStream \
    .queryName("dsa") \
    .outputMode("complete") \
    .format("memory") \
    .start()

In [38]:
# Streams ativados
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x2336e111160>]

In [39]:
# Vamos manter a query executando por algum tempo e aplicando SQL aos dados em tempo real
from time import sleep

for x in range(10):
    
    spark.sql("select sensor, round(media_temp, 2) as media from dsa where media_temp > 50").show()
    sleep(3)
    
query_memoria.stop()

+------+-----+
|sensor|media|
+------+-----+
+------+-----+

+------+-----+
|sensor|media|
+------+-----+
+------+-----+

+------+-----+
|sensor|media|
+------+-----+
+------+-----+

+------+-----+
|sensor|media|
+------+-----+
+------+-----+

+------+-----+
|sensor|media|
+------+-----+
+------+-----+

+------+-----+
|sensor|media|
+------+-----+
+------+-----+

+------+-----+
|sensor|media|
+------+-----+
+------+-----+

+--------+-----+
|  sensor|media|
+--------+-----+
| sensor7|78.93|
|sensor34|84.57|
|sensor41| 62.9|
|sensor50|56.88|
|sensor38|58.98|
|sensor30|73.26|
|sensor10| 60.8|
| sensor4|76.83|
| sensor5|72.55|
|sensor19|57.42|
| sensor8|52.13|
|sensor43|56.17|
|sensor47| 51.7|
|sensor26|50.42|
|sensor12| 53.6|
|sensor18|54.65|
|sensor15|50.95|
| sensor6|64.59|
|sensor28|74.03|
|sensor33|51.22|
+--------+-----+
only showing top 20 rows

+--------+-----+
|  sensor|media|
+--------+-----+
| sensor7|80.54|
|sensor34|83.95|
|sensor41|63.98|
|sensor50|58.18|
|sensor38|58.28|
|se

# Fim