In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,TimestampType
from pyspark.sql.functions import from_json, col, window, avg, count
import pandas as pd
import matplotlib.pyplot as plt
import time


# Initialize Spark session

spark = SparkSession.builder \
    .appName("KafkaCassandraIntegration") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,"
            "com.datastax.spark:spark-cassandra-connector_2.12:3.3.0") \
    .config("spark.cassandra.connection.host", "localhost") \
    .config("spark.cassandra.connection.port", "9042") \
    .getOrCreate()


# spark = SparkSession.builder \
#     .appName("KafkaTemperatureTopic") \
#         .config("spark.jars.packages", 
#             "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,"
#             "com.datastax.spark:spark-cassandra-connector_2.12:3.3.0") \
#     .getOrCreate()

# Read data from Kafka
temperature_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "temperature") \
    .load()

humidity_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "humidity") \
    .load()

# Define the schema for the JSON data
schema = StructType([
    StructField("measurementId", StringType(), True,),
    StructField("reading", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("device", IntegerType(), True )
])


In [None]:
# Cast key and value to string
temperature_messages = temperature_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
humidity_messages = humidity_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


# Parseanding
temperature_stream = temperature_messages.withColumn("parsed_value", from_json(col("value"), schema))
humidity_stream = humidity_messages.withColumn("parsed_value", from_json(col("value"), schema))

#temperature_stream = temperature_stream.withColumn("parsed_value.timestamp", col("parsed_value.timestamp").cast("timestamp"))
temperature_stream = temperature_stream.withColumn("timestamp", col("parsed_value.timestamp"))
humidity_stream = humidity_stream.withColumn("timestamp", col("parsed_value.timestamp"))

#Le damos un watermark de 5 segundos

temperature_stream_with_watermark = temperature_stream.withWatermark("timestamp", "1 seconds")
humidity_stream_with_watermark = humidity_stream.withWatermark("timestamp", "1 seconds")


# Extraemos solo las columnas que nos son utiles para los plots

temperature_processed_stream = temperature_stream_with_watermark.select(
    col("parsed_value.measurementId").alias("id"),
    col("timestamp").alias("timestamp_t"),
    col("parsed_value.reading").alias("temperature"),
    col("parsed_value.device").alias("device")
)

humidity_processed_stream = humidity_stream_with_watermark.select(
    col("parsed_value.measurementId").alias("id"),
    col("timestamp").alias("timestamp_h"),
    col("parsed_value.reading").alias("humidity"),
    col("parsed_value.device").alias("device")
)


#Cassandra Keyspace

CREATE KEYSPACE IF NOT EXISTS postgradoBigData 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};

#Cassandra Table

CREATE TABLE IF NOT EXISTS postgradoBigData.sensor_data (
    window_start TIMESTAMP PRIMARY KEY,
    temperature_average DOUBLE,
    humidity_average DOUBLE
);

In [None]:
#Joineamos ambos DF por el campo que tienen en comun: ID

joined_stream = temperature_processed_stream.join(humidity_processed_stream, on="id", how="inner")

#filtramos para solo mostrar las columnas que queremos y usamos el nombre de la columna renombrada: 

joined_stream_cols = joined_stream.select(
    col("timestamp_T"), \
    col("temperature"), \
    col("humidity")
)

joined_stream_windowed = joined_stream_cols.groupBy(window(col("timestamp_T"), "1 seconds"))    \
    .agg(avg("temperature").alias("temperature_average"),
        (avg("humidity").alias("humidity_average")))


df_to_write = joined_stream_windowed.withColumn("window_start", col("window.start")) \
    .select("window_start", "temperature_average", "humidity_average")

#mandemos todo a Cassandra 

def write_to_cassandra(batch_df, batch_id):
    batch_df.withColumn("window_start", col("window.start")) \
        .select("window_start", "temperature_average", "humidity_average") \
        .write \
        .format("org.apache.spark.sql.cassandra") \
        .mode("append") \
        .options(table="sensor_data", keyspace="postgradobigdata") \
        .save()

joined_stream_windowed.writeStream \
    .foreachBatch(write_to_cassandra) \
    .outputMode("append") \
    .start() \
    .awaitTermination()


# query = joined_stream_windowed.writeStream \
#     .outputMode("append") \
#     .format("console") \
#      .start()

#query.awaitTermination()

