In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
def spark_init():
    spark = ( SparkSession.builder.appName("KafkaStreamReader")
        .master("local[2]")  
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.postgresql:postgresql:42.6.0")")
        .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
        .config("spark.executor.memory", "4g")
        .config("spark.driver.memory", "4g")
        .config("spark.sql.shuffle.partitions", "2")
        .getOrCreate()
        )
    spark.sparkContext.setLogLevel("WARN")
    return spark 



def read_stream(spark, topic):
    df = ( spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:9092")
        .option("subscribe", topic)
        .option("startingOffsets", "earliest")
        .load() 
                )
    return df.select(
        col("key").cast("string")
        ,col("value").cast("string")
        ,"topic"
        ,"partition"
        ,"timestamp"
        ,"offset"
    )

def kafka_value_parsed(df):
    schema = StructType([
    StructField("Price", DoubleType(), True),
    StructField("District", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Town", StringType(), True),
    StructField("Type", StringType(), True),
    StructField("EnergyCertificate", StringType(), True),
    StructField("GrossArea", DoubleType(), True),
    StructField("TotalArea", DoubleType(), True),
    StructField("Parking", DoubleType(), True),
    StructField("HasParking", BooleanType(), True),
    StructField("Floor", StringType(), True),
    StructField("ConstructionYear", DoubleType(), True),
    StructField("EnergyEfficiencyLevel", StringType(), True),
    StructField("PublishDate", TimestampType(), True),
    StructField("Garage", BooleanType(), True),
    StructField("Elevator", BooleanType(), True),
    StructField("ElectricCarsCharging", BooleanType(), True),
    StructField("TotalRooms", DoubleType(), True),
    StructField("NumberOfBedrooms", DoubleType(), True),
    StructField("NumberOfWC", DoubleType(), True),
    StructField("ConservationStatus", StringType(), True),
    StructField("LivingArea", DoubleType(), True),
    StructField("LotSize", DoubleType(), True),
    StructField("BuiltArea", DoubleType(), True),
    StructField("NumberOfBathrooms", DoubleType(), True)
])
    df = df.select(
        "key"
        ,from_json(col("value"), schema).alias("parsed_data")
        ,col("timestamp").alias("kafka_timestamp")
        ,"topic"
        ,"partition"
        ,"offset"
    )
    return df.select(
        "key"
        ,"parsed_data.*"
        ,"kafka_timestamp"
        ,"offset"
        ,"partition"
    )

def clean_and_filter_kafka(df):
    df = df.replace(['nan', 'NaN', 'null', ''], None)

    df = df.withColumn("price", 
        when(isnan(col("price")) | isnull(col("price")), None).otherwise(col("price")))
    
    df = df.withColumn("main_area",
        when(col("type") == "Land", 
             coalesce(col("lotsize"), col("totalarea"), col("grossarea")))
        .when(col("type") == "Farm", 
             coalesce(col("lotsize"), col("totalarea"), col("grossarea")))
        .when(col("type").isin(["Apartment", "Studio", "Duplex"]), 
             coalesce(col("livingarea"), col("totalarea"), col("grossarea")))
        .when(col("type").isin(["House", "Mansion", "Manor", "Estate"]), 
             coalesce(col("livingarea"), col("builtarea"), col("totalarea"), col("grossarea")))
        .when(col("type").isin(["Office", "Store", "Warehouse", "Industrial", "Storage", "Hotel"]), 
             coalesce(col("totalarea"), col("grossarea"), col("builtarea")))
        .when(col("type") == "Garage", 
             coalesce(col("builtarea"), col("totalarea"), col("grossarea")))
        .when(col("type") == "Building", 
             coalesce(col("totalarea"), col("grossarea"), col("builtarea")))
        .otherwise(coalesce(col("totalarea"), col("grossarea"), col("livingarea")))
    ).filter(
        col("price").isNotNull() & 
        (col("price") > 50000) &
        col("main_area").isNotNull() & 
        ~isnan(col("main_area")) &
        (col("main_area") > 20) &
        col("district").isNotNull() &
        col("type").isNotNull()
    )
    
    return df.select(col("type").alias("property_type"), "district", "price", "main_area")

def calculate_aggregations(df):
    aggregate_df = df.groupBy(["property_type", "district"]) \
                .agg(
                    round(avg("price"), 2).alias("avg_price")
                    ,round(avg("main_area"), 2).alias("avg_area")
                    ,count("*").alias("total_ads")
                    ,round((sum("price") / sum("main_area")), 2).alias("avg_price_per_sqm")
                ).orderBy(["district", "property_type"])
    return aggregate_df



    
topic = "csv-data"
spark = spark_init()
stream = read_stream(spark, topic)
parsed_df = kafka_value_parsed(stream)
cleaned_kafka = clean_and_filter_kafka(parsed_df)
aggregations = calculate_aggregations(cleaned_kafka)

query = ( aggregations.writeStream
         .outputMode("complete")
         .format("console")
         .trigger(processingTime="30 seconds")
         .start()
)

query.awaitTermination()