In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType
from pyspark.sql.functions import from_json, col, window, to_timestamp
import time

# Initialize Spark Session with more configurations
spark = SparkSession.builder \
    .appName("ClimateWatch Streaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.sql.shuffle.partitions", "1") \
    .config("spark.default.parallelism", "1") \
    .config("spark.streaming.kafka.consumer.cache.enabled", "false") \
    .getOrCreate()

# Set log level to see more details
spark.sparkContext.setLogLevel("DEBUG")

# Define schema for climate data
schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("location", StringType(), True),
    StructField("temperature", FloatType(), True),
    StructField("humidity", FloatType(), True),
    StructField("wind_speed", FloatType(), True),
    StructField("precipitation", FloatType(), True),
    StructField("pressure", FloatType(), True),
    StructField("snow", FloatType(), True)
])

print("\n" + "="*50)
print("Kafka'dan veri okumaya başlıyorum...")
print("="*50 + "\n")

# Read streaming data from Kafka with more options
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "climate-data") \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .option("kafka.security.protocol", "PLAINTEXT") \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("fetchOffset.numRetries", "5") \
    .option("maxOffsetsPerTrigger", "200") \
    .load()

print("\n" + "="*50)
print("Kafka bağlantısı kuruldu.")
print("="*50 + "\n")

# Add some debug information
print("Kafka Stream Schema:")
stream_df.printSchema()

# Parse JSON data and convert timestamp
parsed = stream_df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*") \
.withColumn("event_time", to_timestamp(col("timestamp")))

print("\nParsed Data Schema:")
parsed.printSchema()

# Analyze data in 5-minute windows
windowed_stats = parsed \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "5 minutes"),
        "location"
    ) \
    .agg({
        "temperature": "avg",
        "humidity": "avg",
        "precipitation": "sum",
        "wind_speed": "avg",
        "pressure": "avg",
        "snow": "sum"
    })

# Format results with more readable column names
formatted_stats = windowed_stats.select(
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("location"),
    col("avg(temperature)").cast("decimal(10,2)").alias("avg_temp_celsius"),
    col("avg(humidity)").cast("decimal(10,2)").alias("avg_humidity_percent"),
    col("sum(precipitation)").cast("decimal(10,2)").alias("total_precipitation_mm"),
    col("avg(wind_speed)").cast("decimal(10,2)").alias("avg_wind_speed_kmh"),
    col("avg(pressure)").cast("decimal(10,2)").alias("avg_pressure_hpa"),
    col("sum(snow)").cast("decimal(10,2)").alias("total_snow_mm")
)

print("\n" + "="*50)
print("Stream işleme başlıyor...")
print("="*50 + "\n")

# Write results to console with more visible formatting
query = formatted_stats \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .option("numRows", 100) \
    .trigger(processingTime="5 seconds") \
    .start()

print("\n" + "="*50)
print("Stream başlatıldı, veriler gelmeye başlayacak...")
print("Aktif streaming sorgularının sayısı:", len(spark.streams.active))
print("="*50 + "\n")

try:
    # Monitor the query status
    while query.isActive:
        print(f"\nSorgu durumu: {query.status}")
        print(f"Son işleme zamanı: {query.lastProgress}")
        time.sleep(5)
except KeyboardInterrupt:
    print("\nUygulama kullanıcı tarafından durduruldu.")
    query.stop()
except Exception as e:
    print(f"\nBir hata oluştu: {str(e)}")
    query.stop()
finally:
    # Keep the streaming query running
    query.awaitTermination() 