# <font color='blue'>Data Science Academy</font>
# <font color='blue'>Big Data Real-Time Analytics com Python e Spark</font>

## <font color='blue'>Mini-Projeto 6</font>

### <font color='blue'>Análise de Dados de Sensores IoT em Tempo Real com Apache Spark Streaming e Apache Kafka</font>

![title](imagens/MP6.png)

In [1]:
# Versão da Linguagem Python
from platform import python_version
print('Versão da Linguagem Python Usada Neste Jupyter Notebook:', python_version())

Versão da Linguagem Python Usada Neste Jupyter Notebook: 3.9.12


In [2]:
# Para atualizar um pacote, execute o comando abaixo no terminal ou prompt de comando:
# pip install -U nome_pacote

# Para instalar a versão exata de um pacote, execute o comando abaixo no terminal ou prompt de comando:
#!pip install nome_pacote==versão_desejada

# Depois de instalar ou atualizar o pacote, reinicie o jupyter notebook.

# Instala o pacote watermark. 
# Esse pacote é usado para gravar as versões de outros pacotes usados neste jupyter notebook.
#!pip install -q -U watermark

In [3]:
#!pip install -q findspark

In [4]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [5]:
# Import required modules
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json

> Precisamos incluir o conector de integração do Spark Streaming com o Apache Kafka. Fique atento à versão do PySpark que está sendo usada.

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

In [6]:
# Conector
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

In [7]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Data Science Academy" --iversions

Author: Data Science Academy

findspark: 2.0.1
pyspark  : 3.3.1



## Criando a Sessão Spark

In [8]:
# Cria a sessão Spark
spark = SparkSession.builder.appName("Mini-Projeto6").getOrCreate()

## Leitura do Kafka Spark Structured Stream

In [9]:
# Vamos criar uma subscrição no tópico que tem o streaming de dados que desejamos "puxar" os dados.
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "dsamp6") \
  .load()

## Definição do Schema da Fonte de Dados

In [10]:
# Definimos o schema dos dados que desejamos capturar para análise (temperatura)
esquema_dados_temp = StructType([StructField("leitura", 
                                             StructType([StructField("temperatura", DoubleType(), True)]), True)])

In [11]:
# Definimos o schema global dos dados no streaming
esquema_dados = StructType([ 
    StructField("id_sensor", StringType(), True), 
    StructField("id_equipamento", StringType(), True), 
    StructField("sensor", StringType(), True), 
    StructField("data_evento", StringType(), True), 
    StructField("padrao", esquema_dados_temp, True)
])

## Parse da Fonte de Dados

In [12]:
# Capturamos cada linha de dado (cada valor) como string
df_conversao = df.selectExpr("CAST(value AS STRING)")

In [13]:
# Parse do formato JSON em dataframe
df_conversao = df_conversao.withColumn("jsonData", from_json(col("value"), esquema_dados)).select("jsonData.*")

In [14]:
df_conversao.printSchema()

root
 |-- id_sensor: string (nullable = true)
 |-- id_equipamento: string (nullable = true)
 |-- sensor: string (nullable = true)
 |-- data_evento: string (nullable = true)
 |-- padrao: struct (nullable = true)
 |    |-- leitura: struct (nullable = true)
 |    |    |-- temperatura: double (nullable = true)



## Preparamos o Dataframe 

Esse dataframe está no formato que precisamos para análise.

In [15]:
# Renomeamos as colunas para simplificar nossa análise
df_conversao_temp_sensor = df_conversao.select(col("padrao.leitura.temperatura").alias("temperatura"), 
                                               col("sensor"))

In [16]:
df_conversao_temp_sensor.printSchema()

root
 |-- temperatura: double (nullable = true)
 |-- sensor: string (nullable = true)



In [17]:
# Não podemos visualizar o dataframe, pois a fonte é de streaming
# df_conversao_temp_sensor.head()

## Análise de Dados em Tempo Real

In [18]:
# Aqui temos o objeto que irá conter nossa análise, o cálculo da média das temperaturas por sensor
df_media_temp_sensor = df_conversao_temp_sensor.groupby("sensor").mean("temperatura")

In [19]:
df_media_temp_sensor.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- avg(temperatura): double (nullable = true)



In [20]:
# Renomeamos as colunas para simplificar nossa análise
df_media_temp_sensor = df_media_temp_sensor.select(col("sensor").alias("sensor"), 
                                                   col("avg(temperatura)").alias("media_temp"))

In [21]:
df_media_temp_sensor.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- media_temp: double (nullable = true)



Abaixo abrimos o streaming para análise de dados em tempo real, imprimindo o resultado no console.

In [22]:
# Objeto que inicia a consulta ao streaming com formato de console
query = df_media_temp_sensor.writeStream.outputMode("complete").format("console").start()

Py4JJavaError: An error occurred while calling o70.start.
: java.io.IOException: Cannot run program "C:\hadoop\bin\winutils.exe": CreateProcess error=216, Esta versão de %1 não é compatível com a versão do Windows sendo executada. Verifique as informações de sistema do computador e contate o fornecedor do software
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:934)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
	at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
	at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
	at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353)
	at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
	at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
	at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
	at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
	at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:140)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:143)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
	at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:79)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$streamMetadata$1(StreamExecution.scala:140)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:138)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:49)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:279)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
	at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:427)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:406)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: CreateProcess error=216, Esta versão de %1 não é compatível com a versão do Windows sendo executada. Verifique as informações de sistema do computador e contate o fornecedor do software
	at java.base/java.lang.ProcessImpl.create(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:487)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:154)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
	... 47 more


Envie novos arquivos para o Kafka a fim de ver a análise em tempo real por aqui. Clique no botão Stop no menu superior para interromper a célula a qualquer momento.

In [None]:
# Executamos a query do streaming e evitamos que o processo seja encerrado
query.awaitTermination()

In [None]:
query.status

In [None]:
query.lastProgress

In [None]:
query.explain()

## Análise de Dados em Tempo Real

In [None]:
# Objeto que inicia a consulta ao streaming com formato de memória (cria tabela temporária)
query_memoria = df_media_temp_sensor \
    .writeStream \
    .queryName("dsa") \
    .outputMode("complete") \
    .format("memory") \
    .start()

In [None]:
# Streams ativados
spark.streams.active

In [None]:
# Vamos manter a query executando por algum tempo e aplicando SQL aos dados em tempo real
from time import sleep

for x in range(10):
    
    spark.sql("select sensor, round(media_temp, 2) as media from dsa where media_temp > 65").show()
    sleep(3)
    
query_memoria.stop()

# Fim