In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Esquema para os dados JSON dentro da mensagem Kafka (ajuste conforme seu dado)
# Por exemplo, se sua mensagem Kafka for um JSON como {"nome": "Teste", "valor": 123}
schema = StructType([
    StructField("nome", StringType(), True),
    StructField("valor", IntegerType(), True)
])

# 1. Inicializa a SparkSession
try:
    spark = SparkSession.builder \
        .appName("KafkaSparkIntegration") \
        .master("spark://spark-master:7077") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
        .getOrCreate()

    print("SparkSession para integração Kafka-Spark criada com sucesso!")
    print(f"Versão do Spark: {spark.version}")

    # 2. Ler dados do Kafka usando Structured Streaming
    # Certifique-se de que 'kafka-broker:9092' é acessível pelo container Spark
    kafka_stream_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka-broker:9092") \
        .option("subscribe", "test_topic") \
        .option("startingOffsets", "earliest") \
        .load()

    print("\nStreaming DataFrame do Kafka criado. Colunas brutas:")
    kafka_stream_df.printSchema()

    # O Kafka retorna as mensagens como binário. Precisamos convertê-las.
    # 'value' é a mensagem, 'key' é a chave (se houver), 'topic', 'partition', 'offset', 'timestamp'
    # Decodificando a coluna 'value' de binário para string e aplicando o esquema JSON
    processed_df = kafka_stream_df.selectExpr("CAST(value AS STRING) as json_value") \
        .select(from_json(col("json_value"), schema).alias("data")) \
        .select("data.*") # Expande o JSON para colunas individuais

    print("\nStreaming DataFrame processado (JSON decodificado):")
    processed_df.printSchema()

    # 3. Exibir os dados no console para teste
    # Para um ambiente de produção, você salvaria em HDFS, Delta Lake, etc.
    query = processed_df \
        .writeStream \
        .outputMode("append") \
        .format("console") \
        .option("truncate", "false") \
        .start()

    print("\nStreaming de dados do Kafka iniciado. Novas mensagens aparecerão abaixo.")
    print("Para parar o streaming, execute 'query.stop()' em uma nova célula.")

    # Opcional: Esperar por um tempo para ver algumas mensagens
    # query.awaitTermination(60) # Espera 60 segundos ou até a query ser terminada manualmente
    # print("Aguardando novas mensagens ou terminação manual...")

except Exception as e:
    print(f"Erro na integração Kafka-Spark: {e}")
    print("Verifique se o Kafka Broker está ativo e se 'spark-sql-kafka' está no classpath do Spark.")

# Lembre-se de parar a query de streaming quando terminar para liberar recursos:
# query.stop()

:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ae936707-4724-4e1a-b575-8337a007cd45;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.6 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.6 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found o

In [2]:
spark

In [3]:
spark.stop()