<!-- Projeto Desenvolvido na Data Science Academy - www.datascienceacademy.com.br -->
# <font color='blue'>Data Science Academy</font>
## <font color='blue'>PySpark e Apache Kafka Para Processamento de Dados em Batch e Streaming</font>
## <font color='blue'>Projeto 8</font>
### <font color='blue'>Simulação de Erros e Recuperação de Falhas em Multi-Node Kafka Cluster</font>
### <font color='blue'>Kafka Consumer</font>

In [None]:
# Imports
import os
import psycopg2
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, sum, to_timestamp, window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from datetime import datetime

In [None]:
# Cria a sessão Spark com as classes para o Kafka
spark = SparkSession.builder \
    .appName("DSAProjeto8") \
    .master("spark://spark-master:7077") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.0") \
    .config("spark.streaming.kafka.consumer.cache.enabled", "false") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .getOrCreate()

In [None]:
# Altera o nível de log para ERROR
spark.sparkContext.setLogLevel("ERROR")

## Leitura do Stream do Kafka em Tempo Real

In [None]:
# Cria o schema para os dados
schema = StructType([
    StructField("timestamp", StringType()),
    StructField("id_produto", IntegerType()),
    StructField("quantidade", IntegerType()),
    StructField("preco", DoubleType())
])

In [None]:
# Extrai os nomes dos brokers
kafka_broker = os.environ.get('KAFKA_BROKER')

In [None]:
# Nome do tópico
kafka_topic = "dsa_p8_topico"

In [None]:
# Cria o readStream para ler os dados do Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("kafka.security.protocol", "PLAINTEXT") \
    .load()

## Processamento do Stream com PySpark em Tempo Real

In [None]:
# Parse do formato JSON para preparar o dataframe
parsed_df = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

In [None]:
# Converte o registro de data e hora para o formato adequado e calcula o total acumulado para 1 dia 
# (você pode alterar a janela para o período de sua preferência)
df_vendas = parsed_df \
    .withColumn("timestamp", to_timestamp("timestamp")) \
    .withColumn("vendas", col("quantidade") * col("preco")) \
    .groupBy(window("timestamp", "1 day")) \
    .agg(sum("vendas").alias("total_acumulado")) \
    .select(col("window.end").alias("window_end"), col("total_acumulado"))

In [None]:
# Variáveis de conexão do PostgreSQL
pg_host = os.environ.get('POSTGRES_HOST')
pg_db = os.environ.get('POSTGRES_DB')
pg_user = os.environ.get('POSTGRES_USER')
pg_password = os.environ.get('POSTGRES_PASSWORD')

In [None]:
# Função para salvar o stream de dados no banco de dados (após o processamento)
def dsa_salva_stream_database(df, epoch_id):

    # Cria conexão ao banco de dados
    conn = psycopg2.connect(host = pg_host, 
                            database = pg_db, 
                            user = pg_user, 
                            password = pg_password)

    # Cria o cursor
    cur = conn.cursor()

    # Cria a tabela no banco de dados (se não existir)
    cur.execute("""
    CREATE TABLE IF NOT EXISTS dsa_total_acumulado (
        timestamp TIMESTAMP PRIMARY KEY,
        total_acumulado_real_time DOUBLE PRECISION
    )
    """)

    print(f"\n--- Total Acumulado Atualizado em {datetime.now()} ---\n")
    print("Timestamp | Total Acumulado Real Time")
    print("-----------+---------------")

    # Loop pelas linhas extraídas do stream de dados
    for row in df.collect():

        # window_end se refere ao timestamp que marca o final de uma janela de tempo usada no processamento de streaming.
        # Ele é gerado automaticamente pelo Spark ao realizar agregações em janelas de tempo.
        window_end = row.window_end

        # Como não especificamos o tamanho da janela, cada evento gerado será acumulado com o próximo gerando o valor agregado.
        total_acumulado = row.total_acumulado

        # Se diferente de None
        if window_end is not None and total_acumulado is not None:

            # Insere os dados na tabela
            cur.execute("""
            INSERT INTO dsa_total_acumulado (timestamp, total_acumulado_real_time)
            VALUES (%s, %s)
            ON CONFLICT (timestamp) DO UPDATE
            SET total_acumulado_real_time = EXCLUDED.total_acumulado_real_time
            """, (window_end, total_acumulado))
            
            print(f"{window_end} | {total_acumulado:.2f}")
        else:
            print(f"Ignorando linha devido a valores None: window_end = {window_end}, total_acumulado = {total_acumulado}")

    conn.commit()
    cur.close()
    conn.close()
    
    print("\n")

In [None]:
# Inicia a extração do writeStream e gravação no banco de dados
query = df_vendas.writeStream \
    .outputMode("complete") \
    .foreachBatch(dsa_salva_stream_database) \
    .trigger(processingTime = '10 seconds') \
    .start()

query.awaitTermination()

# Fim