In [1]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars.packages", 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0') \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a4acf3e4-c8a6-4058-a05b-fd3d3c400cb0;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 258ms :: artifacts dl 4m

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType, DateType

schema = StructType(
      [
        StructField("name", StringType(), False),
        StructField("price", DoubleType(), False),
        StructField("timestamp", TimestampType(), False),
      ]
    )


In [3]:
kafka_server = "kafka1:9092"   
from pyspark.sql.functions import from_json

lines = (spark.readStream                        # Get the DataStreamReader
  .format("kafka")                                 # Specify the source format as "kafka"
  .option("kafka.bootstrap.servers", kafka_server) # Configure the Kafka server name and port
  .option("subscribe", "stock")                       # Subscribe to the "en" Kafka topic 
  .option("startingOffsets", "earliest")           # The start point when a query is started
  .option("maxOffsetsPerTrigger", 100)             # Rate limit on max offsets per trigger interval
  .load()
  .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
# Load the DataFrame
)
df = lines.select("parsed_value.*")


## The assignment starts here

You can create a

## Select the stocks that lost value between two windows

In [None]:
from pyspark.sql.functions import lag, window

# The function processes each batch of data
def process_batch(df, epoch_id):
    
    window_spec = Window.partitionBy("name").orderBy("window")

    # Create a column "previous_window" and "previous_price" using the lag function
    df = df.withColumn("previous_price", lag("avg(price)").over(window_spec))
    df = df.withColumn("previous_window", lag("window").over(window_spec))
    df = df.filter(df["previous_price"] > df["avg(price)"])
    
    # Show data
    df.show(truncate=False)
    

# Apply a time window to the data with a watermark of 30 seconds
# Group the data by a 5-minute window and the stock name
# Calculate the average price within each window for each stock

windowedDF_2 = df \
        .withWatermark("timestamp", "30 seconds") \
        .groupBy(window("timestamp", "5 minutes"), "name") \
        .agg({"price": "avg"})

# Order the results by average price in descending order 
lost_value_stocks = windowedDF_2.orderBy("avg(price)", ascending=False)


# Apply function process_batch, save and show result
query_2 = (lost_value_stocks.writeStream
           .outputMode("complete")
           .format("memory")
           .queryName("TheStocksThatLostValue1")
           .option("truncate", False)
           .foreachBatch(process_batch)
           .start())
    
query_2.awaitTermination()

23/11/06 20:00:30 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6d700f1c-cc93-4c37-bf51-638a7339d738. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.