In [0]:
# Importa o findspark e inicializa
# import findspark
# findspark.init()

# Import required modules
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json

Precisamos incluir o conector de integração do Spark Streaming com Apache Kafka. Documentação ->: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

In [0]:
# Conector
import os 
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pysparkshell'

# Cria a Sessão Sparl
spark = SparkSession.builder.appName("IoT-Sensores").getOrCreate()

In [0]:
# Leitura do Kafka Spark Structured Stream
# Vamos criar uma subscrição (assinatura) no tópico que tem o streaming de dados que desejamos "puxar" os dados. 
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "Eric") \
  .load()

# Definição do Schema da Fonte de Dados
# Definimos o schema dos dados que desejamos capturar para análise (temperatura)
esquema_dados_temp = StructType([StructField("leitura",
                                            StructType([StructField("temperatura", DoubleType(), True)]), True)])

# Definimos o schema global dos dados no streaming
esquema_dados = StructType([
  StructField("id_sensor", StringType(), True),
  StructField("id_equipamento", StringType(), True),
  StructField("sensor", StringType(), True),
  StructField("data_evento", StringType(), True),
  StructField("padrao", esquema_dados_temp)
])

# Parse de Fonte de Dados
# Converte do formato JSON, para um formato tabular com o schema requerido

# Capturamos cada linha de dado (cada valor) como string
df_conversao = df.selectExpr("CAST(value AS  STRING)")

# Parse do formato JSON em dataframe
df_conversao = df_conversao.withColumn("jsonData", from_json(col("value"), esquema_dados)).select("jsonData.*")

df_conversao.printSchema()

In [0]:
# Preparando o Dataframe para o formato ideal para análise

# Renomenando as colunas para simplificar nossa análise
df_conversao_temp_sensor = df_conversao.select(col("padrao.leitura.temperatura").alias("temperatura"),
                                              col("sensor"))

df_conversao_temp_sensor.printSchema()

# Não podemos visualizar o dataframe, pois a fonte é de streaming
df_conversao_temp_sensor.head()

In [0]:
# Análise de Dados em Tempo Real
# Aqui temos o objeto que irá conter nossa análise, o cálculo da média das temperaturas por sensor
df_media_temp_sensor = df_conversao_temp_sensor.groupby("sensor").mean("temperatura")

# df_media_temp_sensor.printSchema()

# Renomeamos as colunas para simplificar nossa análise
df_media_temp_sensor = df_media_temp_sensor.select(col("sensor").alias("sensor"),
                                                 col("avg(temperatura)").alias("media_temp"))

df_media_temp_sensor.printSchema()

In [0]:
# Abaixo abrimos o streaming para análise de dados em tempo real, imprimindo o resultado no console

# Objeto que inicia a consulta ao streaming com formato de console
query = df_media_temp_sensor.writeStream.outputMode("complete").format("console").start()

O código anterior está rodando em segundo plano. Se o deixarmos inativo por muito tempo, derrubaremos o streaming. uma alternativa. é executar a query do seguinte modo: "query.awaitTermination()". É o que fizemos na cálula a seguir.

In [0]:
# Executa a query do streaming e evitamos que o processo seja encerrado
# Agora não executará em segundo plano. 
query.awaitTermination()

O problema do método anterior, é que ele somente encerra, quando clicarmos em stop. Mas veremos uma altertiva, para que consigamo parar e executar no momento que quisermos.

In [0]:
# Status da query
query.status

# Resumo de tudo que foi feito na ultima execução
query.lastProgress

# Explica a query
# Plano de execução da query
query.explain()

In [0]:
# SQL na Análise de Dados em Tempo Real
# Objeto que inicia a consulta ao streaming com formato de memória (cria tabela temporária)
query_memoria = df_media_temp_sensor \
  .writeStream \
  .queryName("Eric") \
  .outputMode("complete") \
  .format("memory") \
  .start()

# Streams ativados (quantos streams estão ativados)
spark.streams.active

In [0]:
# Vamos manter a query executando por algum tempo e aplicando SQL aos dados em tempo real
from time import sleep

for x in range(10):
  
  # select do sensor, arredonda a média para duas casas decimais, somente na onde a média de temperatura for maior que 65
  spark.sql("select sensor, round(media_temp, 2) as media from Eric where media_temp > 65").show
  sleep(3)

query_memoria.stop()
# FIM! 