# Spark Streaming com Kafka para Geração de Streaming

Configurando o Spark Streaming para ler dados da fonte e publicar no Kafka.

In [1]:
import findspark
findspark.init()

In [7]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json
import os


In [3]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

In [4]:
spark = SparkSession.builder.appName("SmartPipeNet").getOrCreate()

In [None]:
# Lendo dados da fonte 

# Cada tópico é lido em um DataFrame separado e, em seguida, cada um desses DataFrames é 
# escrito de volta para o tópico correspondente. 
from pyspark.sql.functions import to_json, struct
from pyspark.sql import SparkSession

# Inicializando a sessão Spark
spark = SparkSession.builder.appName("SmartPipeNet").getOrCreate()

def write_to_kafka_topic(df, topic, checkpoint_dir):
    # Convertendo todas as colunas do DataFrame em uma única coluna de string JSON
    json_df = df.select(to_json(struct(*[df[col] for col in df.columns])).alias("value"))

    json_df.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", topic) \
        .option("checkpointLocation", checkpoint_dir) \
        .start()

# Definindo os DataFrames de streaming
df_leak_detection = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "leak-detection") \
    .load()

df_predictive_maintenance = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "predictive-maintenance") \
    .load()

df_flow_regulation = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "flow-regulation") \
  .load()

df_system_control = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "system-control") \
  .load()

# Escrevendo para os tópicos Kafka com localizações de checkpoint especificadas
write_to_kafka_topic(df_leak_detection, "leak-detection", "C:\\Projeto12\checkpoint\leak-detection")
write_to_kafka_topic(df_predictive_maintenance, "predictive-maintenance", "C:\\Projeto12\checkpoint\predictive-maintenance")
write_to_kafka_topic(df_flow_regulation, "flow-regulation", "C:\\Projeto12\checkpoint\ flow-regulation")
write_to_kafka_topic(df_system_control, "system-control", "C:\\Projeto12\checkpoint\system-control")

spark.streams.awaitAnyTermination()



