# PySpark Streaming - Exemplos Práticos

Este notebook apresenta exemplos práticos de como utilizar o Spark Streaming com PySpark para o consumo de dados em tempo real.

## Configuração Inicial da Aplicação

In [None]:
# Importação das bibliotecas
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
# Criar SparkSession com configurações para streaming
spark = SparkSession.builder \
    .appName("SparkStreamingExamples") \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")

## Exemplo 1
## Consumindo dados em tempo-real a partir de arquivos CSV (File Source)

Neste exemplo, o código faz a leitura de dados de uma pasta contendo arquivos CSV, filtra apenas os registros de acordo com uma condição definida pelo usuário. 
Em seguida, a saída pode ser impressa em tela ou gravada no HDFS ou em um banco de dados. 

In [None]:
# Definir o schema dos dados
schema = StructType() \
    .add("id", IntegerType()) \
    .add("nome", StringType()) \
    .add("idade", IntegerType())

# Criar um dataframe a partir de uma pasta contendo arquivos csv
df_stream = spark.readStream \
    .option("sep", ",") \
    .schema(schema) \
    .csv("pasta_entrada")
# -> A pasta "pasta_entrada" é uma pasta armazenada no HDFS e fica sendo a pasta de interesse, onde os dados a serem consumidos pela aplicação devem ser inseridos/armazenados

# Aplicar alguma transformação sobre o dataframe de entrada (opcional)
df_filtrado = df_stream.filter("idade >= 18")

In [None]:
# Saída do streaming (em console)
query = df_filtrado.writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime="3 seconds") \
    .option("checkpointlocation", "pasta_checkpoint") \
    .option("truncate", False) \
    .start()

# -> O checkpointLocation permite que o sistema rastreie o progresso do processamento dos dados e possa retomar o processamento após uma falha.  Ele armazena metadados dos processamentos realizados em uma pasta, neste caso, no HDFS. 
# Evita que seja feito reprocessamento de dados anteriormente processados. 

# -> Neste exemplo de saída do Stream, os dados são apenas impressos em tela ("console"), não sendo salvos em nenhum lugar.

# Aguardar a finalização do streaming
query.awaitTermination()

# Aguarda o encerramento manual da aplicação (Ctrl+C)
#try:
#    query.awaitTermination()
#except KeyboardInterrupt:
#    query.stop()
#    print("Query parada com sucesso.")

In [None]:
# Encerrar o streaming (opcional)
query.stop()

In [None]:
# Saída do streaming (em csv no HDFS)
query = df_filtrado.writeStream \
        .format("csv") \
        .outputMode("append") \
        .trigger(processingTime="20 seconds") \
        .option("checkpointlocation", "pasta_checkpoint") \
        .option("path", "diretorio_saida") \
        .start()

In [None]:
# Saída do streaming (em um banco de dados PostgreSQL)

# Função para salvar os dados em um banco de dados PostgreSQL
def salva_postgresql(df, epoch_id):
    df.write.jdbc(
        url="jdbc:postgresql://db:5432/mydb",
        table="tabela",
        mode="append",
        properties={
            "user": "postgres",
            "password": "mypassword",
            "driver": "org.postgresql.Driver"
        }
    )

# Salvar o dataframe filtrado em um banco de dados PostgreSQL usando foreachBatch
# -> O método foreachBatch permite que você execute uma função personalizada em cada microbatch de dados.
# Neste caso, a função salva_postgresql é chamada para salvar os dados em um banco de dados PostgreSQL.
# Mas podem ser aplicadas outras lógicas de código dentro dessa função definida, não necessariamente para salvar dados.
query = df_filtrado.writeStream \
    .foreachBatch(salva_postgresql) \
    .option("checkpointLocation", "pasta_checkpoint") \
    .start()

query.awaitTermination()
# Complementos aos comandos anteriores:

# Existem três modos de saída disponíveis para o writeStream
# Define-se .outputMode(parâmetro), onde o parâmetro pode ser:
# - "append": apenas as novas linhas acrescentadas à tabela
#             de resultados desde o último acionamento serão
#             escritas no armazenamento externo. 
# - "update": apenas as linhas que foram atualizadas na tabela
#             de resultados desde o último acionamento serão
#             alteradas no armazenamento externo. 
# - "complete": a tabela de resultados atualizada inteira será
#               escrita no armazenamento externo. 
#
# Sintaxe:
# query.writeStream.outputMode("MODO")

# É possível definir a frequência de acionamento do processamento do fluxo de dados usando o método trigger(). 
# Por padrão, o Spark Structured Streaming executa o processamento de fluxo de dados sempre que novos dados chegam à fonte de dados. 
# No entanto, é possível definir essa frequência de acionamento, como no exemplo a seguir: 
#
# Sintaxe:
# query.writeStream.trigger(processingTime='10 seconds')

# Para parar um fuxo de processamento de dados
# Sintaxe:
#query.stop()

## Exemplo 2
## Consumindo streaming de dados em tempo-real a partir de sockets (Socket Streaming)

Este exemplo conecta a um socket TCP e processa dados de texto em tempo real. Este servidor socket simula a recepção de dados em tempo real, enviando mensagens a cada 3 segundos.

In [None]:
# Importação das bibliotecas necessárias para o servidor socket
import time
import threading
import socket
import random

# Função para criar um servidor socket simples
def socket_server():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('localhost', 9999))
    server.listen(1)
    
    print("Servidor socket iniciado na porta 9999")
    
    try:
        conn, addr = server.accept()
        print(f"Conexão aceita de {addr}")
        
        # Enviar dados de exemplo
        messages = [
            "ola mundo spark streaming",
            "apache spark eh legal",
            "streaming em tempo real",
            "tempo-real dados",
            "dados spark streaming"
        ]
        
        for msg in messages:
            conn.send(f"{msg}\n".encode())
            time.sleep(5)
            
    except Exception as e:
        print(f"Erro no servidor: {e}")
    finally:
        server.close()

# Iniciar servidor em thread separada
server_thread = threading.Thread(target=socket_server)
server_thread.daemon = True
server_thread.start()

# Aguardar servidor iniciar
time.sleep(2)
print("Servidor socket em execução...")

In [None]:
# Criar streaming DataFrame que lê do socket
socket_stream = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Processar dados: dividir linhas em palavras e contar
words = socket_stream.select(
    explode(split(socket_stream.value, " ")).alias("word")
)

word_counts = words.groupBy("word").count().orderBy("count", ascending=False)

print("Stream de socket configurado!")

In [None]:
# Iniciar query de streaming
socket_query = word_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", False) \
    .trigger(processingTime="5 seconds") \
    .start()

print("Query de socket streaming iniciada! Aguarde...")

# Aguardar para ver os resultados
time.sleep(20)

# Parar a query
#socket_query.stop()
#print("Query de socket parada.")

## Exemplo 3
## Geração de fluxos de dados (e consumo) baseado em um Rate Source (recomendado para testes)

O rate source gera registros em uma taxa constante e configurável. Ideal para testes de desempenho.

Cada registro geralmente contém:
- timestamp: O momento em que o registro foi gerado.
- value: Um número sequencial, que aumenta a cada novo registro.

In [None]:
# Criar streaming DataFrame com rate source
rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 2) \
    .option("rampUpTime", "5s") \
    .load()

# Adicionar colunas computadas
enriched_stream = rate_stream.select(
    "timestamp",
    "value",
    (col("value") % 3).alias("categoria"),
    (col("value") * 10).alias("resultado"),
    when(col("value") % 2 == 0, "par").otherwise("impar").alias("paridade")
)

print("Rate stream configurado!")
enriched_stream.printSchema()

In [None]:
# Agregar dados por categoria
aggregated = enriched_stream \
    .groupBy("categoria", "paridade") \
    .agg(
        count("*").alias("contador"),
        avg("resultado").alias("media"),
        max("value").alias("maximoe")
    )

# Executar streaming query
rate_query = aggregated.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", False) \
    .trigger(processingTime="3 seconds") \
    .start()

print("Rate streaming iniciado! Aguarde...")

# Aguardar para ver os resultados
time.sleep(15)

# Parar a query
rate_query.stop()
print("Rate streaming parado.")

## Exemplo 4
## Watermarks (Marca d'água) para lidar com dados atrasados

Suponha que estamos processando eventos que enviam dados a cada minuto, e queremos que os resultados de processamento estejam sempre atualizados com no máximo um atraso de 5 minutos em relação aos eventos mais recentes. Para isso, podemos definir uma janela (watermark) de 5 minutos usando o withWatermark(). 

Isso significa que, se um evento chegar com um timestamp que está mais de 5 minutos atrasado em relação ao watermark, ele será descartado, já que não é considerado como atualizado. 

Quando um dado é recebido pelo Spark Streaming, ele é carimbado com um timestamp que indica quando esse dado foi gerado. Com o withWatermark(), é possível especificar uma janela de tempo a partir desse timestamp, dentro da qual os dados são considerados atualizados e são processados.

In [None]:
# Define o schema dos dados de entrada, com os tipos esperados para cada coluna
meuschema = StructType([
    StructField("ID_TRANSACAO", StringType(), True),
    StructField("ID_USUARIO", StringType(), True),
    StructField("DATA_HORA_EVENTO", TimestampType(), True),
    StructField("TIPO_EVENTO", StringType(), True),
    StructField("VALOR_TRANSACAO", DoubleType(), True),
    StructField("MOEDA", StringType(), True),
    StructField("ITEM_ID", StringType(), True),
    StructField("QUANTIDADE", IntegerType(), True),
    StructField("LOCALIZACAO_IP", StringType(), True),
    StructField("STATUS_EVENTO", StringType(), True)
])

# Cria um DataFrame de streaming lendo arquivos CSV de uma pasta
# O Spark vai processar 1 arquivo por vez a cada trigger
dfEntrada = spark.readStream \
    .format("csv") \
    .schema(meuschema) \
    .option("header", "true") \
    .option("sep", ",") \
    .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
    .option("maxFilesPerTrigger", 1) \
    .load("pasta_csv")

# Aplica watermark para considerar dados com atraso de até 1 hora, usando a coluna de tempo do evento
# Em seguida, agrupa os eventos em janelas de 2 minutos e conta quantos eventos de cada tipo ocorreram
# O resultado inclui início e fim da janela, tipo do evento e total de eventos
dfEventTime = dfEntrada \
    .withWatermark("DATA_HORA_EVENTO", "1 hours") \
    .groupBy(
        window("DATA_HORA_EVENTO", "2 minutes"),
        "TIPO_EVENTO"
    ) \
    .count() \
    .select(
        col("window.start").alias("janela_inicio"),
        col("window.end").alias("janela_fim"),
        col("TIPO_EVENTO"),
        col("count").alias("total_eventos")
    ).orderBy("janela_inicio", "TIPO_EVENTO")

# Define a saída do stream para o console
# O modo "complete" imprime o resultado completo a cada trigger
# A trigger está configurada para rodar a cada 20 segundos
# O checkpoint é usado para manter o estado entre execuções (falhas/reinícios)
query = dfEventTime.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .option("numRows", 100) \
    .trigger(processingTime='20 seconds') \
    .option("checkpointLocation", "pasta_checkpoint") \
    .start()

# Aguarda o encerramento manual da aplicação (Ctrl+C)
try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    print("Query parada com sucesso.")

## Limpeza e Encerramento

In [None]:
# Parar todas as queries ativas
for query in spark.streams.active:
    print(f"Parando query: {query.name or query.id}")
    query.stop()

print("Todas as queries foram paradas.")

# Parar SparkSession
spark.stop()
print("SparkSession encerrada.")

### Recursos Adicionais

- [Documentação Oficial Spark Streaming](https://spark.apache.org/docs/latest/streaming/index.html)
- [PySpark API Reference](https://spark.apache.org/docs/latest/api/python/)
- [Spark Streaming + Kafka Integration](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)