In [1]:
import os 

import pandas as pd
from datetime import timedelta
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, TimestampType
from pyspark.sql.functions import window, col, avg, from_json, from_unixtime, current_timestamp
from pyspark.sql.streaming import StreamingQueryListener

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [2]:

# Define your StreamingQueryListener
class MyStreamingQueryListener(StreamingQueryListener):
    
    def __init__(self, first_window_dict):
        self.first_window_dict = first_window_dict
     
    def onQueryStarted(self, query):
        self.load_data_from_csv()

    def onQueryTerminated(self, query):
        ...

    def onQueryProgress(self, query):
        ...

    def onQueryIdle(self, query):
        ...

    def load_data_from_csv(self):
        csv_path = '../shared/output/stream_result.csv'
        if os.path.exists(csv_path):
            pandas_df = pd.read_csv(csv_path)
    
            for index, row in pandas_df.iterrows():
                group_key = (row['brand'], row['model'], row['year'], row['color'],row["mileage"], row["transmission"], row["body_health"], row["engine_health"], row["tires_health"])
                if group_key not in self.first_window_dict:
                    self.first_window_dict[group_key] = {
                        'brand':row['brand'],
                        'model':row['model'],
                        'year':row['year'],
                        'color':row['color'],
                        'mileage':row['mileage'],
                        'transmission':row['transmission'],
                        'body_health':row['body_health'],
                        'engine_health':row['engine_health'],
                        'tires_health':row['tires_health'],
                        'start': row['start'],
                        'end': row['end'],
                        'average_price': row['average_price'],
                        'processed_at': row['processed_at']
                    }


In [3]:
SUBMIT_ARGS = f'--packages ' \
              f'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,' \
              f'org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,' \
              f'org.apache.kafka:kafka-clients:2.8.1 ' \
              f'pyspark-shell'

os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

spark = SparkSession.builder.appName("CarPricesStreaming").getOrCreate()

:: loading settings :: url = jar:file:/opt/spark-3.5.0/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/hso/.ivy2/cache
The jars for the packages stored in: /Users/hso/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ad5b2238-4b61-436f-8fa5-58555c1e5a3b;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central


In [4]:
# Initialize your first_window_dict
first_window_dict = {}

# Add the custom listener to Spark session
listener = MyStreamingQueryListener(first_window_dict)
spark.streams.addListener(listener)

In [5]:
stream_message_schema = StructType([
    StructField("brand", StringType(), True),
    StructField("model", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("price", IntegerType(), True),
    StructField("additional_info", MapType(StringType(), StringType()), True),
    StructField("ad_publish_time", IntegerType(), True),
    StructField("producer_id", StringType(), True)
])

In [6]:
# Read streaming data from Kafka
kafka_bootstrap_server = "127.0.0.1:9092"
kafka_topic = "car_prices"

kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_server) \
    .option("subscribe", kafka_topic) \
    .option("startingOffset", "earliest") \
    .option("auto.offset.reset", "earliest") \
    .option("includeHeaders", "true") \
    .option("failOnDataLoss", "false") \
    .load()

In [7]:
car_prices_raw_df = kafka_stream_df.selectExpr("CAST(value AS STRING)").select(from_json("value", stream_message_schema).alias('car_prices_data'))
processed_df = car_prices_raw_df.withColumn("ad_publish_time", from_unixtime(col("car_prices_data.ad_publish_time").cast("double")).cast(TimestampType()))

In [8]:
processed_df = processed_df.select(
    "car_prices_data.brand",
    "car_prices_data.model",
    "car_prices_data.year",
    "car_prices_data.price",
    "car_prices_data.additional_info.color",
    "car_prices_data.additional_info.mileage",
    "car_prices_data.additional_info.transmission",
    "car_prices_data.additional_info.body_health",
    "car_prices_data.additional_info.engine_health",
    "car_prices_data.additional_info.tires_health",
    "ad_publish_time"
)

In [9]:
def write_to_csv(df, epoch_id):
    global first_window_dict
    
    pandas_df = df.toPandas()

    # Sort the DataFrame by start time
    pandas_df = pandas_df.sort_values(by=['start'])

    # Get the first row for each group
    first_window_df = pandas_df.groupby(["brand", "model", "year", "color", "mileage", "transmission", "body_health", "engine_health", "tires_health"]).first().reset_index()
    
    # Update the first window information in the global dictionary
    for index, row in first_window_df.iterrows():
        group_key = (row['brand'], row['model'], row['year'], row['color'],row["mileage"], row["transmission"], row["body_health"], row["engine_health"], row["tires_health"])
        first_window_dict[group_key] = {
            'brand':row['brand'],
            'model':row['model'],
            'year':row['year'],
            'color':row['color'],
            'mileage':row['mileage'],
            'transmission':row['transmission'],
            'body_health':row['body_health'],
            'engine_health':row['engine_health'],
            'tires_health':row['tires_health'],
            'start': row['start'],
            'end': row['end'],
            'average_price': row['average_price'],
            'processed_at': row['processed_at']
        }

    # Specify the path to the output CSV file
    output_path = "../shared/output/stream_result.csv"
    
    # Write the aggregated data for all groups to the CSV file
    first_window_all_groups_df = pd.DataFrame(list(first_window_dict.values()))
    first_window_all_groups_df.to_csv(output_path, mode="w", index=False)

In [None]:
window_duration = "5 minutes"
slide_duration = "1 minute"
sliding_window_spec = window(col("ad_publish_time"), window_duration, slide_duration).alias("window")

# Define the watermark duration
watermark_duration = "24 hours"

# Define the watermark to only consider data within the last 5 minutes
grouped_df = processed_df \
    .withWatermark("ad_publish_time", "5 minutes") \
    .groupBy("brand", "model", "year", "color", "mileage", "transmission", "body_health", "engine_health", "tires_health", sliding_window_spec) \
    .agg(avg("price").alias("average_price"))


result_df = grouped_df \
    .select("brand", "model", "year", "color", "mileage", "transmission", "body_health", "engine_health", "tires_health", "window.start", "window.end", "average_price")

result_df = result_df.withColumn("processed_at", current_timestamp().cast(TimestampType()))

# Start the streaming query
query = result_df.writeStream \
    .outputMode("update") \
    .foreachBatch(write_to_csv) \
    .option("checkpointLocation", "./checkpoints") \
    .start()

# Wait for the streaming query to terminate
query.awaitTermination()

                                                                                