In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, StringType
import prometheus_client
from prometheus_client import Gauge, CollectorRegistry

In [2]:
# Usar o registro padrão ou criar um novo
registry = CollectorRegistry()

# Verificar se a métrica já está registrada
if 'temperatura_acima_limite' not in registry._names_to_collectors:
    temperatura_acima_limite = Gauge(
        'temperatura_acima_limite',
        'Contagem de eventos de temperatura acima do limite',
        registry=registry
    )
else:
    temperatura_acima_limite = registry._names_to_collectors['temperatura_acima_limite']

In [3]:
# Criar uma SparkSession
# spark = SparkSession.builder.appName("StreamProcessor").getOrCreate()

spark = SparkSession.builder \
    .appName("KafkaIntegration") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
    .getOrCreate()

24/12/01 18:13:11 WARN Utils: Your hostname, Mac-mini-de-Valter.local resolves to a loopback address: 127.0.0.1; using 192.168.0.100 instead (on interface en0)
24/12/01 18:13:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/valter/.ivy2/cache
The jars for the packages stored in: /Users/valter/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f2e62fba-6114-4af2-a88e-52ef5a161fe0;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/valter/anaconda3/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.9.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 201ms :: artifacts dl 5ms
	:: modules in use:
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-logging;1.1.3 from central in [default]
	org.apache.commons#commons-pool2;2.11.1 from central in [default]
	org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
	org.apache.hadoop#ha

In [4]:
# Esquema de dados do sensor
schema = StructType([
    StructField("sensor_id", IntegerType(), True),
    StructField("temperatura", DoubleType(), True),
    StructField("umidade", DoubleType(), True),
    StructField("timestamp", IntegerType(), True)
])

In [None]:
df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sensor-data") \
    .load()


# # Exibir o schema dos dados
# df.printSchema()

# # Selecionar mensagens do Kafka e exibir no console
# query = df.selectExpr("CAST(value AS STRING)") \
#     .writeStream \
#     .outputMode("append") \
#     .start()

# query.awaitTermination()

# Converter os dados de string para JSON e aplicar o esquema
df = df.selectExpr("CAST(value AS STRING)")
df = df.select(from_json(col("value"), schema).alias("dados")).select("dados.*")

# Detectar anomalias de temperatura acima de 25 graus
df_anomalias = df.filter(col("temperatura") > 25)

# Função para aumentar a métrica no Prometheus
def processar_anomalias(row):
    print(f"Anomalia detectada: {row.sensor_id} - Temperatura: {row.temperatura}")
    temperatura_acima_limite.inc() # Incrementa a métrica no Prometheus

def processar_batch(df, epoch_id):
    """
    Processa o batch de dados.
    """
    for row in df.collect():
        print(f"Anomalia detectada: {row.sensor_id} - Temperatura: {row.temperatura}")
        temperatura_acima_limite.inc()

# Aplicar a função para cada anomalia detectada
# query = df_anomalias.writeStream.foreach(processar_anomalias).start()
query = df_anomalias.writeStream.foreachBatch(processar_batch).start()

# Iniciar o servidor Prometheus
prometheus_client.start_http_server(8000)

# Aguardar até que o streaming seja finalizado
query.awaitTermination()


24/12/01 18:13:14 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/tr/w0rq3fxn30994p9q1_3rct700000gn/T/temporary-24c450c1-503c-4c80-8549-2a686ef98ef1. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/12/01 18:13:14 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/12/01 18:13:14 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

Anomalia detectada: 67 - Temperatura: 28.16
Anomalia detectada: 35 - Temperatura: 25.7
Anomalia detectada: 91 - Temperatura: 25.86


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/valter/anaconda3/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/valter/anaconda3/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/valter/anaconda3/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

24/12/01 22:38:28 WARN NetworkClient: [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/12/01 22:38:29 WARN NetworkClient: [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/12/01 22:38:31 WARN NetworkClient: [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/12/01 22:38:32 WARN NetworkClient: [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/12/01 22:38:33 WARN NetworkClient: [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
24/12/01 22:38:34 WARN NetworkClient: [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127