## Project Template

In [1]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars.packages", 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0') \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6712703b-b25c-4e78-a1f4-d62662ca8729;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 762ms :: artifacts dl 34

Be sure to start the stream on Kafka!

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType, DateType

schema = StructType(
      [
        StructField("name", StringType(), False),
        StructField("price", DoubleType(), False),
        StructField("timestamp", TimestampType(), False),
      ]
    )

In [3]:
kafka_server = "kafka1:9092"   
from pyspark.sql.functions import from_json

lines = (spark.readStream                        # Get the DataStreamReader
  .format("kafka")                                 # Specify the source format as "kafka"
  .option("kafka.bootstrap.servers", kafka_server) # Configure the Kafka server name and port
  .option("subscribe", "stock")                       # Subscribe to the "en" Kafka topic 
  .option("startingOffsets", "earliest")           # The start point when a query is started
  .option("maxOffsetsPerTrigger", 100)             # Rate limit on max offsets per trigger interval
  .load()
  .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
# Load the DataFrame
)
df = lines.select("parsed_value.*")


## The assignment starts here

You can create a

## Select the N most valuable stocks in a window

In [5]:
from pyspark.sql.functions import window, avg
import time

# Configuration
N = 8
windowDuration = "5 minutes"
slideDuration = "2 minutes"

# Assign watermarks
df_with_watermark = df.withWatermark("timestamp", "5 minutes")

# Group the data by the defined window and the stock name, then compute the average price for each group.
topN_stocks_windowed = (df_with_watermark
    .groupBy(
        window(df_with_watermark.timestamp, windowDuration, slideDuration),
        df_with_watermark.name)
    .agg(avg("price").alias("avgPrice")))

# Write the aggregated result into memory for querying
query1 = (topN_stocks_windowed.writeStream
    .outputMode("update")
    .format("memory")
    .queryName("Aggr")
    .start())

# Allow the above query to populate some data
time.sleep(10)

# Fetch top N stocks from the in-memory table
result = spark.sql(f"SELECT window, name, avgPrice FROM Aggr ORDER BY window DESC, avgPrice DESC LIMIT {N}")

# Show the result
result.show()

23/11/06 11:32:34 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6ebfa87c-ee11-483b-a82d-b4dec7abde4d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

+--------------------+-----+--------+
|              window| name|avgPrice|
+--------------------+-----+--------+
|[2023-10-28 20:12...|   RE|  128.37|
|[2023-10-28 20:12...|  PXD|   128.0|
|[2023-10-28 20:12...| WYNN|  120.24|
|[2023-10-28 20:12...|  PVH|  114.07|
|[2023-10-28 20:12...|  FRT|  106.58|
|[2023-10-28 20:12...|  MTB|  104.87|
|[2023-10-28 20:12...|BRK.B|  102.57|
|[2023-10-28 20:12...|  AGN|   93.13|
+--------------------+-----+--------+



                                                                                

## Select the stocks that lost value between two windows

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, last
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, TimestampType

# Use a dictionary to hold the state of each stock
stock_states = {}
# Set to hold names of stocks that lost value
stocks_that_lost_value = set()

# Define a class to hold the state of each stock
class StockState:
    def __init__(self, name):
        self.name = name
        self.last_price = None
        self.last_window_end = None
        self.lost_value = False  # Add a flag to track if the stock lost value

    def update(self, price, window_end):
        if self.last_window_end is None or window_end > self.last_window_end:
            if self.last_price is not None and price < self.last_price:
                self.lost_value = True  # Stock lost value
                stocks_that_lost_value.add(self.name)
            else:
                self.lost_value = False  # Stock did not lose value
            self.last_price = price
            self.last_window_end = window_end
        # Return the state including whether the stock lost value
        return (self.name, self.last_price, self.last_window_end, self.lost_value)


# Define the watermark and window operation on the streaming DataFrame
windowedDF = df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        col("name"),
        window(col("timestamp"), "5 minutes")
    ) \
    .agg(
        last("price").alias("last_price")
    )

def process_batch(df, epoch_id):
    # Process the DataFrame row by row
    for row in df.collect():
        stock_name = row['name']
        price = row['last_price']
        window_end = row['window'].end

        # Get the stock state or create it if it doesn't exist
        stock_state = stock_states.get(stock_name, StockState(stock_name))

        # Update the state and check if the stock lost value
        result = stock_state.update(price, window_end)
        stock_states[stock_name] = stock_state  # Update the state in the dictionary

    print(stocks_that_lost_value)

# Define the streaming query using the process_batch function
query = windowedDF.writeStream \
    .outputMode("update") \
    .foreachBatch(process_batch) \
    .start()  # Start the streaming query

# Await termination of the streaming query (this is running continuously)
query.awaitTermination(timeout=60)
query.stop()

# Stop the Spark session
spark.stop()


23/11/06 13:21:30 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-843acb5f-2c7e-4ff1-aa68-1338757dc5ce. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

set()


                                                                                

set()


                                                                                

set()


                                                                                

set()


                                                                                

{'XYL', 'AVY', 'HD', 'LB', 'CAG', 'CME'}


                                                                                

{'APA', 'SCG', 'LB', 'ORCL', 'TIF', 'KO', 'XYL', 'CLX', 'AEE', 'SLB', 'SBAC', 'HCP', 'AVY', 'HD', 'CBG', 'BEN', 'CAG', 'MPC', 'CME'}


                                                                                

{'APA', 'PHM', 'SCG', 'LB', 'ORCL', 'TIF', 'KO', 'XYL', 'BLL', 'CLX', 'AEE', 'SLB', 'SBAC', 'HCP', 'DHI', 'AVY', 'VRSK', 'HD', 'CBG', 'MAR', 'PEP', 'BEN', 'CAG', 'MPC', 'CME', 'ACN'}


                                                                                

{'XEL', 'APA', 'PHM', 'SCG', 'LB', 'ORCL', 'TIF', 'KO', 'MAA', 'XYL', 'BLL', 'CLX', 'AEE', 'SLB', 'SBAC', 'GIS', 'HCP', 'DHI', 'AVY', 'VRSK', 'HD', 'CBG', 'AMT', 'MAR', 'BXP', 'PEP', 'KMI', 'PPL', 'BEN', 'JNPR', 'CAG', 'MPC', 'CME', 'ACN', 'CCL', 'HCN'}


                                                                                

{'XEL', 'APA', 'PHM', 'SCG', 'T', 'LB', 'ORCL', 'TIF', 'MON', 'KO', 'MAA', 'XYL', 'BLL', 'CLX', 'AEE', 'SLB', 'AIV', 'SBAC', 'GIS', 'HCP', 'DHI', 'AVY', 'VRSK', 'HD', 'CBG', 'AMT', 'XOM', 'MAR', 'BXP', 'PEP', 'KMI', 'SPG', 'PPL', 'BEN', 'JNPR', 'CAG', 'MPC', 'CME', 'ACN', 'CCL', 'HCN', 'MAC'}


                                                                                

{'XEL', 'MCD', 'APA', 'PHM', 'SCG', 'T', 'LB', 'ORCL', 'TIF', 'MON', 'KO', 'MAA', 'XYL', 'BLL', 'UDR', 'CLX', 'AEE', 'SLB', 'AIV', 'SBAC', 'GIS', 'HCP', 'DHI', 'AVY', 'VRSK', 'HD', 'CBG', 'AMT', 'XOM', 'IBM', 'MAR', 'BXP', 'PEP', 'KMI', 'SPG', 'PPL', 'BEN', 'JNPR', 'NEM', 'CAG', 'MPC', 'CME', 'ACN', 'CCL', 'VTR', 'HCN', 'MAC'}


                                                                                

{'XEL', 'MCD', 'CHRW', 'APA', 'PHM', 'SCG', 'T', 'LB', 'ETR', 'ORCL', 'TIF', 'MON', 'KO', 'MAA', 'XYL', 'BLL', 'NTAP', 'UDR', 'CLX', 'AEE', 'SLB', 'AIV', 'SBAC', 'GIS', 'HCP', 'DHI', 'AVY', 'VRSK', 'WMB', 'HD', 'CBG', 'AMT', 'XOM', 'IBM', 'MAR', 'BXP', 'EW', 'PEP', 'KMI', 'SPG', 'PPL', 'BEN', 'JNPR', 'NEM', 'CAG', 'MPC', 'CME', 'ACN', 'CCL', 'VTR', 'CINF', 'HCN', 'MAC'}


                                                                                

{'XEL', 'MCD', 'CHRW', 'APA', 'PHM', 'SCG', 'T', 'MRO', 'ARE', 'LB', 'ETR', 'ORCL', 'CMS', 'TIF', 'MON', 'KO', 'MAA', 'XYL', 'BLL', 'NTAP', 'UDR', 'CLX', 'HUM', 'RE', 'AEE', 'SLB', 'AIV', 'SBAC', 'KR', 'RL', 'GIS', 'SO', 'HCP', 'DHI', 'O', 'ES', 'AVY', 'VRSK', 'WMB', 'HD', 'CBG', 'AMT', 'XOM', 'IBM', 'MAR', 'BXP', 'EW', 'PEP', 'KMI', 'SPG', 'PPL', 'FRT', 'BEN', 'JNPR', 'KSU', 'ED', 'NEM', 'CAG', 'MPC', 'CME', 'ACN', 'CCL', 'VTR', 'CINF', 'HCN', 'MAC'}


                                                                                

{'XEL', 'MCD', 'CHRW', 'APA', 'PHM', 'SCG', 'T', 'MRO', 'ARE', 'LB', 'ETR', 'ORCL', 'CMS', 'TIF', 'MON', 'KO', 'MAA', 'XYL', 'BLL', 'NTAP', 'UDR', 'CLX', 'HUM', 'RE', 'AEE', 'FOX', 'SLB', 'AIV', 'SBAC', 'KR', 'RL', 'GIS', 'SO', 'HCP', 'DHI', 'O', 'PH', 'ES', 'AVY', 'VRSK', 'WMB', 'HD', 'CBG', 'AMT', 'XOM', 'IBM', 'EQIX', 'SWK', 'MAR', 'BXP', 'EW', 'PEP', 'AMD', 'KMI', 'SPG', 'PPL', 'FRT', 'COTY', 'BEN', 'IRM', 'JNPR', 'KSU', 'ED', 'NEM', 'CAG', 'MPC', 'CME', 'ACN', 'CCL', 'VTR', 'CINF', 'HCN', 'MAC'}


                                                                                

{'MCD', 'SCG', 'CTXS', 'T', 'MRO', 'LB', 'ETR', 'ORCL', 'CMS', 'NTAP', 'HUM', 'SBAC', 'O', 'VRSK', 'AMT', 'IBM', 'EQIX', 'DG', 'BXP', 'EW', 'FRT', 'NEM', 'CHRW', 'PHM', 'CNP', 'ARE', 'MON', 'KO', 'C', 'UDR', 'SLB', 'KR', 'HCP', 'DHI', 'ES', 'AVY', 'XOM', 'MAR', 'DOV', 'PEP', 'FE', 'PPL', 'IRM', 'BEN', 'MPC', 'CME', 'CINF', 'MAC', 'XEL', 'APA', 'MAA', 'XYL', 'BLL', 'CLX', 'AEE', 'AIV', 'RL', 'GIS', 'SO', 'WMB', 'CBG', 'SWK', 'AMD', 'SPG', 'COTY', 'JNPR', 'ED', 'CAG', 'ACN', 'CCL', 'TIF', 'RE', 'FOX', 'VZ', 'PH', 'HD', 'DISCA', 'KMI', 'KSU', 'RSG', 'HCN', 'VTR'}


                                                                                

{'MCD', 'SCG', 'CTXS', 'DLR', 'T', 'MRO', 'LB', 'ETR', 'ORCL', 'CMS', 'NTAP', 'HUM', 'SBAC', 'O', 'VRSK', 'AMT', 'IBM', 'EQIX', 'DG', 'GPS', 'BXP', 'EW', 'FRT', 'NEM', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'MON', 'KO', 'C', 'SRCL', 'UDR', 'SLB', 'KR', 'HCP', 'DHI', 'ES', 'AVY', 'XOM', 'MAR', 'DOV', 'PEP', 'FE', 'PPL', 'IRM', 'BEN', 'MPC', 'CME', 'CINF', 'MAC', 'XEL', 'APA', 'UHS', 'MAA', 'XYL', 'BLL', 'CLX', 'AEE', 'AIV', 'RL', 'GIS', 'SO', 'WMB', 'CBG', 'SWK', 'AMD', 'SPG', 'COTY', 'JNPR', 'ED', 'CAG', 'ACN', 'CCL', 'TIF', 'MAT', 'RE', 'FOX', 'VZ', 'PH', 'HD', 'DISCA', 'KMI', 'KSU', 'RSG', 'HCN', 'VTR'}


                                                                                

{'MCD', 'SCG', 'CTXS', 'DLR', 'T', 'MRO', 'LB', 'ETR', 'ORCL', 'CMS', 'NTAP', 'HUM', 'SBAC', 'O', 'VRSK', 'AMT', 'IBM', 'EQIX', 'DG', 'GPS', 'BXP', 'EW', 'FRT', 'NEM', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'BAC', 'MON', 'KO', 'C', 'SRCL', 'UDR', 'ZION', 'PGR', 'SLB', 'KR', 'HCP', 'DHI', 'ES', 'AVY', 'XOM', 'MAR', 'DOV', 'PEP', 'FE', 'PPL', 'IRM', 'BEN', 'MPC', 'CME', 'CINF', 'MAC', 'XEL', 'APA', 'UHS', 'MAA', 'XYL', 'BLL', 'CLX', 'AEE', 'AIV', 'RL', 'GIS', 'SO', 'WMB', 'CBG', 'SWK', 'AMD', 'SPG', 'COTY', 'JNPR', 'ED', 'CAG', 'ACN', 'CCL', 'ANSS', 'ULTA', 'TIF', 'MAT', 'RE', 'FOX', 'VZ', 'PH', 'HD', 'DISCA', 'KMI', 'KSU', 'RSG', 'HCN', 'VTR'}


                                                                                

{'MCD', 'SCG', 'CTXS', 'DLR', 'OMC', 'T', 'MRO', 'LB', 'ETR', 'ORCL', 'CMS', 'NTAP', 'HUM', 'SBAC', 'O', 'BSX', 'VRSK', 'TJX', 'AMT', 'IBM', 'EQIX', 'DG', 'GPS', 'BXP', 'EW', 'PVH', 'FRT', 'MKC', 'NEM', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'BAC', 'MON', 'KO', 'C', 'SRCL', 'JPM', 'STT', 'UDR', 'ZION', 'PGR', 'SLB', 'KR', 'EXC', 'HCP', 'DHI', 'ES', 'AVY', 'XOM', 'MAR', 'DOV', 'PEP', 'CA', 'FE', 'PPL', 'IRM', 'BEN', 'MPC', 'CME', 'CINF', 'MAC', 'XEL', 'APA', 'UHS', 'MAA', 'LOW', 'XYL', 'BLL', 'CLX', 'AEE', 'LKQ', 'AIV', 'RL', 'GIS', 'SO', 'WMB', 'CBG', 'SWK', 'FLR', 'WU', 'AMD', 'SPG', 'COTY', 'BAX', 'JNPR', 'ED', 'CAG', 'ACN', 'PNW', 'CCL', 'ANSS', 'NOV', 'ULTA', 'TIF', 'CSCO', 'MAT', 'RE', 'FOX', 'VZ', 'PH', 'HD', 'DISCA', 'KMI', 'KSU', 'RSG', 'HCN', 'VTR'}


                                                                                

{'MCD', 'SCG', 'CTXS', 'DLR', 'OMC', 'T', 'VRSN', 'MRO', 'LB', 'ETR', 'ORCL', 'CMS', 'PKG', 'NTAP', 'HUM', 'SBAC', 'O', 'BSX', 'VRSK', 'K', 'TJX', 'AMT', 'IBM', 'EQIX', 'DG', 'GPS', 'BXP', 'EW', 'PVH', 'FRT', 'MKC', 'NEM', 'RHT', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'BAC', 'MON', 'KO', 'C', 'SRCL', 'JPM', 'STT', 'UDR', 'ZION', 'PGR', 'SLB', 'KR', 'EXC', 'HCP', 'DHI', 'ES', 'AVY', 'XOM', 'MAR', 'DOV', 'PEP', 'CA', 'FE', 'PPL', 'IRM', 'BEN', 'MPC', 'CME', 'CINF', 'MAC', 'XEL', 'APA', 'RJF', 'UHS', 'MAA', 'LOW', 'XYL', 'BLL', 'CLX', 'ALXN', 'AEE', 'LKQ', 'AIV', 'RL', 'DE', 'GIS', 'SO', 'HSY', 'MOS', 'WMB', 'CBG', 'SWK', 'FLR', 'WU', 'AMD', 'SPG', 'COTY', 'BAX', 'JNPR', 'ED', 'CAG', 'ACN', 'PNW', 'CCL', 'ANSS', 'NOV', 'ULTA', 'TIF', 'CSCO', 'MAT', 'RE', 'FOX', 'VZ', 'PH', 'HD', 'DISCA', 'KMI', 'KSU', 'RSG', 'HCN', 'VTR'}


                                                                                

{'MCD', 'EL', 'SCG', 'CTXS', 'DLR', 'OMC', 'T', 'VRSN', 'MRO', 'LB', 'ETR', 'ORCL', 'CMS', 'PKG', 'PG', 'NTAP', 'HUM', 'SBAC', 'O', 'BSX', 'VRSK', 'K', 'TJX', 'AMT', 'IBM', 'EQIX', 'DG', 'GPN', 'GPS', 'CTSH', 'BXP', 'EW', 'PVH', 'FRT', 'MKC', 'NEM', 'RHT', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'BAC', 'MON', 'KO', 'C', 'SRCL', 'JPM', 'STT', 'UDR', 'ZION', 'PGR', 'SLB', 'KR', 'EXC', 'HCP', 'DHI', 'PDCO', 'ES', 'AVY', 'XOM', 'MAR', 'DOV', 'PEP', 'CA', 'FE', 'PPL', 'IRM', 'BEN', 'MPC', 'CME', 'CINF', 'MAC', 'XEL', 'APA', 'RJF', 'CBOE', 'UHS', 'MAA', 'LOW', 'XYL', 'BLL', 'CLX', 'ALXN', 'AEE', 'LKQ', 'AIV', 'RL', 'DE', 'GIS', 'SO', 'HSY', 'MOS', 'WMB', 'CBG', 'SWK', 'FLR', 'WU', 'ORLY', 'AMD', 'SPG', 'COTY', 'BAX', 'JNPR', 'ED', 'CAG', 'ACN', 'PNW', 'CCL', 'ANSS', 'NOV', 'ULTA', 'TIF', 'CSCO', 'MAT', 'RE', 'FOX', 'VZ', 'CERN', 'PH', 'HD', 'DISCA', 'FBHS', 'LEN', 'KMI', 'KSU', 'RSG', 'HCN', 'VTR'}


                                                                                

{'MCD', 'EL', 'SCG', 'CTXS', 'DLR', 'OMC', 'T', 'VRSN', 'MRO', 'LB', 'ETR', 'ORCL', 'CMS', 'PKG', 'PG', 'NTAP', 'HUM', 'SBAC', 'O', 'BSX', 'VRSK', 'K', 'TJX', 'AMT', 'IBM', 'EQIX', 'DG', 'GPN', 'GPS', 'CTSH', 'BXP', 'EW', 'PVH', 'FRT', 'MKC', 'NEM', 'RHT', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'BAC', 'MON', 'KO', 'C', 'SRCL', 'JPM', 'STT', 'UDR', 'ZION', 'PGR', 'SLB', 'VIAB', 'KR', 'EXC', 'HCP', 'DHI', 'PDCO', 'ES', 'AVY', 'XOM', 'HP', 'MAR', 'DOV', 'PEP', 'CA', 'FE', 'PPL', 'IRM', 'BEN', 'MPC', 'CME', 'CINF', 'MAC', 'XEL', 'APA', 'XLNX', 'RJF', 'CBOE', 'UHS', 'MAA', 'LOW', 'XYL', 'BLL', 'CLX', 'ALXN', 'AEE', 'LKQ', 'AIV', 'RL', 'DE', 'GIS', 'SO', 'HSY', 'MOS', 'WMB', 'CBG', 'SWK', 'FLR', 'WU', 'ORLY', 'COP', 'AMD', 'SPG', 'COTY', 'BAX', 'JNPR', 'ED', 'CAG', 'ACN', 'PNW', 'CCL', 'ADI', 'ANSS', 'NOV', 'ULTA', 'GE', 'A', 'FITB', 'TIF', 'CSCO', 'MAT', 'AFL', 'RE', 'FOX', 'VZ', 'CERN', 'GM', 'PH', 'HD', 'DISCA', 'FBHS', 'LEN', 'KMI', 'KSU', 'RSG', 'ETN', 'HCN', 'VTR'}


                                                                                

{'MCD', 'EL', 'SCG', 'CTXS', 'DLR', 'OMC', 'T', 'VRSN', 'MRO', 'LB', 'ETR', 'ORCL', 'CMS', 'PKG', 'FLIR', 'PG', 'NTAP', 'HUM', 'LLL', 'SBAC', 'O', 'BSX', 'VRSK', 'K', 'TJX', 'AMT', 'IBM', 'EQIX', 'DG', 'GPN', 'GPS', 'CTSH', 'BXP', 'EW', 'PVH', 'FRT', 'MKC', 'NEM', 'BWA', 'RHT', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'BAC', 'MON', 'KO', 'C', 'SRCL', 'JPM', 'LNC', 'STT', 'UDR', 'ZION', 'UTX', 'PGR', 'SLB', 'VIAB', 'KR', 'EXC', 'HCP', 'DHI', 'PDCO', 'ES', 'AVY', 'TGT', 'XOM', 'HP', 'MAR', 'DOV', 'PEP', 'CA', 'FE', 'PPL', 'IRM', 'BEN', 'MPC', 'CME', 'CINF', 'PCAR', 'MAC', 'XEL', 'APA', 'XLNX', 'RJF', 'CBOE', 'UHS', 'MAA', 'LOW', 'XYL', 'BLL', 'CLX', 'ALXN', 'AEE', 'LKQ', 'AIV', 'RL', 'DE', 'GIS', 'SO', 'HSY', 'MOS', 'WMB', 'CBG', 'SWK', 'FLR', 'WU', 'GWW', 'ORLY', 'COP', 'AMD', 'SPG', 'COTY', 'KIM', 'BAX', 'JNPR', 'ED', 'CAG', 'ACN', 'PNW', 'CCL', 'ADI', 'ANSS', 'NOV', 'ULTA', 'GE', 'A', 'FITB', 'TIF', 'CSCO', 'MAT', 'UPS', 'AFL', 'RE', 'FOX', 'F', 'VZ', 'CERN', 'GM', 'PH', 'HD', 'DISCA', 'FB

                                                                                

{'MCD', 'EL', 'SCG', 'CTXS', 'DLR', 'OMC', 'T', 'VRSN', 'PNR', 'MRO', 'LB', 'ETR', 'ORCL', 'CMS', 'PKG', 'FLIR', 'PG', 'MDLZ', 'NTAP', 'HUM', 'LLL', 'SBAC', 'O', 'BSX', 'VRSK', 'K', 'TJX', 'AMT', 'IBM', 'EQIX', 'DG', 'GPN', 'GPS', 'CTSH', 'BXP', 'EW', 'PVH', 'FRT', 'MKC', 'NEM', 'BWA', 'NBL', 'EMN', 'RHT', 'COF', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'CMA', 'BAC', 'MON', 'KO', 'C', 'SRCL', 'JPM', 'LNC', 'STT', 'UDR', 'ZION', 'UTX', 'PGR', 'SLB', 'VIAB', 'KR', 'EXC', 'HCP', 'DHI', 'PDCO', 'PRU', 'ES', 'AVY', 'TGT', 'XOM', 'PKI', 'CPB', 'HP', 'MAR', 'DOV', 'CVX', 'PEP', 'CA', 'FE', 'PPL', 'IRM', 'BEN', 'FLS', 'MPC', 'CME', 'CINF', 'PCAR', 'MAC', 'XEL', 'APA', 'XLNX', 'RJF', 'CBOE', 'PWR', 'MCHP', 'UHS', 'MAA', 'LOW', 'XYL', 'BLL', 'CHK', 'CLX', 'ALXN', 'AEE', 'LKQ', 'EMR', 'AIV', 'RL', 'DE', 'GIS', 'SO', 'HSY', 'MOS', 'WMB', 'CBG', 'SWK', 'FLR', 'WU', 'GWW', 'ORLY', 'COP', 'AMD', 'SPG', 'COTY', 'KIM', 'BAX', 'JNPR', 'ED', 'CAG', 'ACN', 'PNW', 'CCL', 'ADI', 'ANSS', 'NOV', 'ULTA', 'GE', 'A',

                                                                                

{'MCD', 'EL', 'SCG', 'CTXS', 'DLR', 'OMC', 'T', 'VRSN', 'PNR', 'MRO', 'LB', 'ETR', 'FAST', 'ORCL', 'CMS', 'PKG', 'FLIR', 'PG', 'MDLZ', 'NTAP', 'HUM', 'LLL', 'SBAC', 'O', 'BSX', 'VRSK', 'K', 'TJX', 'AMT', 'IBM', 'EQIX', 'DG', 'GPN', 'GPS', 'CTSH', 'BXP', 'EW', 'PVH', 'FRT', 'MKC', 'NEM', 'BWA', 'NBL', 'EMN', 'RHT', 'COF', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'CMA', 'BAC', 'MON', 'KO', 'C', 'SRCL', 'JPM', 'LNC', 'STT', 'UDR', 'ZION', 'UTX', 'PGR', 'SLB', 'FTI', 'VIAB', 'KR', 'EXC', 'HCP', 'DHI', 'PDCO', 'PRU', 'ES', 'AVY', 'TGT', 'XOM', 'PKI', 'CPB', 'HP', 'MLM', 'MAR', 'DOV', 'CVX', 'PEP', 'CA', 'FE', 'PPL', 'IRM', 'BEN', 'FLS', 'MPC', 'CME', 'CINF', 'PCAR', 'MAC', 'XEL', 'APA', 'SNI', 'XLNX', 'RJF', 'KORS', 'CBOE', 'PWR', 'MCHP', 'UHS', 'MAA', 'LOW', 'XYL', 'BLL', 'CHK', 'CLX', 'ALXN', 'AEE', 'LKQ', 'EMR', 'AIV', 'RL', 'DE', 'GIS', 'SO', 'HSY', 'MOS', 'WMB', 'CBG', 'SWK', 'FLR', 'WU', 'GWW', 'ORLY', 'COP', 'AMD', 'SPG', 'COTY', 'KIM', 'BAX', 'JNPR', 'ED', 'CAG', 'ACN', 'PNW', 'CCL', 'AD

                                                                                

{'MCD', 'EL', 'CMI', 'SCG', 'CTXS', 'DLR', 'OMC', 'T', 'VRSN', 'PNR', 'MRO', 'LB', 'ETR', 'FAST', 'ORCL', 'CMS', 'PKG', 'FLIR', 'PG', 'MDLZ', 'NTAP', 'HUM', 'LLL', 'SBAC', 'O', 'BSX', 'VRSK', 'K', 'TJX', 'AMT', 'IBM', 'EQIX', 'DG', 'GPN', 'GPS', 'CTSH', 'BXP', 'EW', 'PVH', 'FRT', 'MKC', 'NEM', 'BWA', 'NBL', 'EMN', 'RHT', 'COF', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'CMA', 'BAC', 'MON', 'KO', 'C', 'SRCL', 'JPM', 'LNC', 'STT', 'UDR', 'ZION', 'UTX', 'PGR', 'SLB', 'FTI', 'VIAB', 'KR', 'EXC', 'HCP', 'DHI', 'PDCO', 'PRU', 'ES', 'AVY', 'TGT', 'XOM', 'PKI', 'ALB', 'CPB', 'HP', 'MLM', 'MAR', 'DOV', 'CVX', 'PEP', 'CA', 'FMC', 'FE', 'PPL', 'IRM', 'BEN', 'FLS', 'MPC', 'CME', 'CINF', 'PCAR', 'CAT', 'MAC', 'XEL', 'APA', 'SNI', 'XLNX', 'RJF', 'KORS', 'OKE', 'CBOE', 'PWR', 'MCHP', 'UHS', 'MAA', 'LOW', 'XYL', 'BLL', 'CHK', 'CLX', 'ALXN', 'AEE', 'LKQ', 'EMR', 'ALLE', 'HRS', 'AIV', 'RL', 'DE', 'GIS', 'SO', 'HSY', 'MOS', 'WMB', 'CBG', 'SWK', 'FLR', 'WU', 'GWW', 'ORLY', 'COP', 'AMD', 'SPG', 'COTY', 'KIM', 'B

                                                                                

{'MCD', 'EL', 'CMI', 'SCG', 'CTXS', 'DLR', 'OMC', 'T', 'VRSN', 'PNR', 'MRO', 'LB', 'ETR', 'FAST', 'ORCL', 'CMS', 'PKG', 'FLIR', 'PG', 'MDLZ', 'NTAP', 'HUM', 'LLL', 'SBAC', 'O', 'BSX', 'VRSK', 'K', 'TJX', 'AMT', 'IBM', 'EQIX', 'DG', 'GPN', 'GPS', 'CTSH', 'BXP', 'EW', 'PVH', 'FRT', 'MKC', 'NEM', 'BWA', 'NBL', 'EMN', 'RHT', 'COF', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'CMA', 'BAC', 'MON', 'KO', 'C', 'SRCL', 'JPM', 'LNC', 'STT', 'UDR', 'ZION', 'UTX', 'PGR', 'SLB', 'FTI', 'VIAB', 'KR', 'EXC', 'HCP', 'DHI', 'PDCO', 'PRU', 'ES', 'AVY', 'TGT', 'XOM', 'PKI', 'ALB', 'CPB', 'HP', 'MLM', 'MAR', 'DOV', 'CVX', 'PEP', 'CA', 'FMC', 'FE', 'PPL', 'IRM', 'BEN', 'FLS', 'MPC', 'CME', 'CINF', 'PCAR', 'CAT', 'MAC', 'XEL', 'APA', 'SNI', 'XLNX', 'RJF', 'KORS', 'OKE', 'CBOE', 'PWR', 'MCHP', 'UHS', 'MAA', 'LOW', 'XYL', 'BLL', 'CHK', 'CLX', 'ALXN', 'AEE', 'LKQ', 'EMR', 'ALLE', 'HRS', 'AIV', 'RL', 'DE', 'GOOGL', 'GIS', 'SO', 'HSY', 'MOS', 'WMB', 'CBG', 'SWK', 'FLR', 'WU', 'GWW', 'ORLY', 'HAL', 'COP', 'AMD', 'SPG', '

                                                                                

{'MCD', 'EL', 'CMI', 'SCG', 'CTXS', 'DLR', 'OMC', 'T', 'VRSN', 'PNR', 'MRO', 'LB', 'ETR', 'FAST', 'ORCL', 'CMS', 'PKG', 'FLIR', 'PG', 'MDLZ', 'NTAP', 'HUM', 'DISCK', 'LLL', 'PX', 'SBAC', 'O', 'BSX', 'VRSK', 'K', 'TJX', 'AMT', 'IBM', 'EQIX', 'DG', 'GPN', 'GPS', 'CTSH', 'BXP', 'EW', 'PVH', 'FRT', 'MKC', 'NEM', 'BWA', 'NBL', 'EMN', 'RHT', 'COF', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'CMA', 'BAC', 'MON', 'KO', 'C', 'DVN', 'SRCL', 'JPM', 'LNC', 'STT', 'UDR', 'ZION', 'UTX', 'PGR', 'JNJ', 'SLB', 'FTI', 'VIAB', 'KR', 'EXC', 'HCP', 'DHI', 'PDCO', 'PRU', 'ES', 'AVY', 'TGT', 'XOM', 'PKI', 'ALB', 'CPB', 'HP', 'MLM', 'MAR', 'DOV', 'CVX', 'PEP', 'CA', 'FMC', 'FE', 'PPL', 'IRM', 'BEN', 'FLS', 'MPC', 'CME', 'CINF', 'PCAR', 'CAT', 'MAC', 'XEL', 'APA', 'SNI', 'XLNX', 'RJF', 'KORS', 'OKE', 'CBOE', 'PWR', 'MCHP', 'HOG', 'UHS', 'MAA', 'LOW', 'XYL', 'BLL', 'CHK', 'CLX', 'ALXN', 'AEE', 'LKQ', 'EMR', 'ALLE', 'HRS', 'AIV', 'RL', 'DE', 'GOOGL', 'GIS', 'SO', 'HSY', 'MOS', 'WMB', 'CBG', 'SWK', 'FLR', 'WU', 'GWW', '

                                                                                

{'MCD', 'EL', 'CMI', 'SCG', 'CTXS', 'DLR', 'OMC', 'T', 'VRSN', 'PNR', 'MRO', 'LB', 'ETR', 'FAST', 'ORCL', 'CMS', 'PKG', 'FLIR', 'PG', 'MDLZ', 'NTAP', 'HUM', 'DISCK', 'LLL', 'PX', 'SBAC', 'O', 'BSX', 'VRSK', 'K', 'TJX', 'AMT', 'IBM', 'EQIX', 'DG', 'GPN', 'GPS', 'CTSH', 'APC', 'BXP', 'EW', 'PVH', 'FRT', 'MKC', 'NEM', 'BWA', 'NBL', 'EMN', 'RHT', 'COF', 'CHRW', 'PHM', 'CNP', 'ARE', 'ZTS', 'CMA', 'BAC', 'MON', 'KO', 'C', 'DVN', 'SRCL', 'JPM', 'LNC', 'STT', 'UDR', 'ZION', 'UTX', 'PGR', 'JNJ', 'SLB', 'FTI', 'VIAB', 'KR', 'EXC', 'HCP', 'DHI', 'PDCO', 'PRU', 'ES', 'AVY', 'TGT', 'CVS', 'XOM', 'PKI', 'ALB', 'AIZ', 'CPB', 'HP', 'MLM', 'ABBV', 'MAR', 'DOV', 'CVX', 'PEP', 'CA', 'FMC', 'FE', 'PPL', 'IRM', 'BEN', 'FLS', 'MPC', 'CME', 'CINF', 'PCAR', 'CAT', 'MAC', 'XEL', 'RF', 'APA', 'SNI', 'XLNX', 'RJF', 'KORS', 'OKE', 'CBOE', 'PWR', 'MCHP', 'HOG', 'UHS', 'MAA', 'LOW', 'XYL', 'BLL', 'CHK', 'CLX', 'ALXN', 'AEE', 'LKQ', 'EMR', 'ALLE', 'HRS', 'AIV', 'RL', 'DE', 'GOOGL', 'GIS', 'SO', 'HSY', 'CF', 'MOS', '

23/11/06 13:22:45 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:422)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:408)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:324)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor

23/11/06 11:38:28 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-505e894d-1478-4ff9-9639-a5fc214c1877. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
[Stage 479:(184 + 8) / 200][Stage 481:>  (0 + 0) / 1][Stage 483:>(0 + 0) / 200] 

+----+--------+-----------+
|name|avgPrice|priceChange|
+----+--------+-----------+
| PPG|  69.885|        0.0|
| WEC|    39.9|        0.0|
| HAS|   40.93|        0.0|
| PNW|   54.63|        0.0|
|KORS|   64.04|        0.0|
| TEL|   41.17|        0.0|
| EOG|   63.53|        0.0|
| DHR|   61.46|        0.0|
+----+--------+-----------+



                                                                                

## Select the stock that gained the most (between windows)

In [8]:
from pyspark.sql.functions import from_json, col, max, avg
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
import time


# Assign watermarks
df_with_watermark = df.withWatermark("timestamp", "5 minutes")

# Calculate the price change for each stock within the time window
price_change = df_with_watermark.groupBy("name").agg(max(col("price")).alias("maxPriceChange"))

# Define the query with Complete output mode
query1 = (price_change.writeStream
    .outputMode("complete")
    .format("memory")
    .queryName("MOST_GAINED_STOCK13")
    .start())

# Allow the above query to populate some data
time.sleep(60)

# Fetch the stock with the maximum price change from the in-memory table
result = spark.sql(f"SELECT name, maxPriceChange FROM MOST_GAINED_STOCK13 ORDER BY maxPriceChange DESC LIMIT 1")

# Show the result
result.show()

23/11/06 12:37:43 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2d65840a-7d43-4962-90ef-bcebd3672b9b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

+----+--------------+
|name|maxPriceChange|
+----+--------------+
|PCLN|       1165.58|
+----+--------------+



[Stage 423:====>         (62 + 8) / 200][Stage 424:>                (0 + 0) / 1]

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, last
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, TimestampType

# Use a dictionary to hold the state of each stock
stock_states = {}

# Define a class to hold the state of each stock
class StockState:
    def __init__(self, name):
        self.name = name
        self.last_price = None
        self.max_gain = 0.0

    def update(self, price):
        if self.last_price is not None:
            gain = price - self.last_price
            if gain > self.max_gain:
                self.max_gain = gain
        self.last_price = price

# Define the watermark and window operation on the streaming DataFrame
windowedDF = df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        col("name"),
        window(col("timestamp"), "5 minutes")
    ) \
    .agg(
        last("price").alias("last_price")
    )

def process_batch(df, epoch_id):
    # Process the DataFrame row by row
    for row in df.collect():
        stock_name = row['name']
        price = row['last_price']

        # Get the stock state or create it if it doesn't exist
        stock_state = stock_states.get(stock_name, StockState(stock_name))

        # Update the state
        stock_state.update(price)
        stock_states[stock_name] = stock_state  # Update the state in the dictionary

    # Find the stock with the maximum gain
    max_gain_stock = max(stock_states, key=lambda stock_name: stock_states[stock_name].max_gain)
    max_gain = stock_states[max_gain_stock].max_gain

    print(f"Stock with the maximum gain: {max_gain_stock}, Gain: {max_gain}")

# Define the streaming query using the process_batch function
query = windowedDF.writeStream \
    .outputMode("update") \
    .foreachBatch(process_batch) \
    .start()  # Start the streaming query

# Await termination of the streaming query (this is running continuously)
query.awaitTermination(timeout=60)
query.stop()

23/11/06 13:32:23 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b1165f76-5b6f-4843-9ce3-57a8defa145e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

Stock with the maximum gain: LLY, Gain: 0.0


                                                                                

Stock with the maximum gain: TGT, Gain: 5.619999999999997


                                                                                

Stock with the maximum gain: ADP, Gain: 6.0249999999999915


                                                                                

Stock with the maximum gain: GWW, Gain: 32.66999999999999


                                                                                

Stock with the maximum gain: GWW, Gain: 32.66999999999999


                                                                                

Stock with the maximum gain: GWW, Gain: 32.66999999999999


                                                                                

Stock with the maximum gain: GWW, Gain: 32.66999999999999


                                                                                

Stock with the maximum gain: PCLN, Gain: 162.14980000000003


                                                                                

Stock with the maximum gain: PCLN, Gain: 162.14980000000003


                                                                                

Stock with the maximum gain: PCLN, Gain: 183.45019999999988


                                                                                

Stock with the maximum gain: PCLN, Gain: 183.45019999999988


                                                                                

Stock with the maximum gain: PCLN, Gain: 183.45019999999988


                                                                                

Stock with the maximum gain: PCLN, Gain: 183.45019999999988


                                                                                

Stock with the maximum gain: PCLN, Gain: 183.45019999999988


                                                                                

Stock with the maximum gain: PCLN, Gain: 183.45019999999988


                                                                                

Stock with the maximum gain: PCLN, Gain: 183.45019999999988


                                                                                

Stock with the maximum gain: PCLN, Gain: 183.45019999999988


                                                                                

Stock with the maximum gain: PCLN, Gain: 183.45019999999988


                                                                                

Stock with the maximum gain: PCLN, Gain: 183.45019999999988


                                                                                

Stock with the maximum gain: PCLN, Gain: 183.45019999999988


## Implement a control that checks if a stock does not lose too much value in a period of time (feel free to choose the value you prefer).

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, last
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, TimestampType

# Use a dictionary to hold the state of each stock
stock_states = {}

# Set to hold names of stocks that exceeded the maximum allowable percentage loss
stocks_with_large_losses = set()

# Define the maximum allowable percentage loss (e.g., 5%)
max_allowable_percentage_loss = 5.0

class StockState:
    def __init__(self, name):
        self.name = name
        self.last_price = None
        self.last_window_end = None

    def update(self, price, window_end):
        if self.last_window_end is not None and window_end > self.last_window_end:
            if self.last_price is not None:
                price_change = price - self.last_price
                percentage_loss = (price_change / self.last_price) * 100
                if percentage_loss < -max_allowable_percentage_loss:
                    stocks_with_large_losses.add(self.name)
            self.last_price = price
        self.last_window_end = window_end

# Modify the watermark and window operation on the streaming DataFrame
windowedDF = df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        col("name"),
        window(col("timestamp"), "5 minutes")
    ) \
    .agg(
        last("price").alias("last_price")
    )

def process_batch(df, epoch_id):
    # Process the DataFrame row by row
    for row in df.collect():
        stock_name = row['name']
        price = row['last_price']
        window_end = row['window'].end

        # Get the stock state or create it if it doesn't exist
        stock_state = stock_states.get(stock_name, StockState(stock_name))

        # Update the state and check for large percentage losses
        stock_state.update(price, window_end)
        stock_states[stock_name] = stock_state  # Update the state in the dictionary

    # Log or take action on stocks with large losses
    for stock in stocks_with_large_losses:
        print(f"Stock '{stock}' exceeded the maximum allowable percentage loss within the time window.")
    stocks_with_large_losses.clear()

# Define the streaming query using the process_batch function
query = windowedDF.writeStream \
    .outputMode("update") \
    .foreachBatch(process_batch) \
    .start()  # Start the streaming query

# Await termination of the streaming query (this is running continuously)
query.awaitTermination(timeout=60)
query.stop()


23/11/06 19:40:42 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6f5379f1-698b-428a-9050-f14cb3451ade. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

Stock 'MRO' exceeded the maximum allowable percentage loss within the time window.
Stock 'CMS' exceeded the maximum allowable percentage loss within the time window.
Stock 'SCG' exceeded the maximum allowable percentage loss within the time window.
Stock 'NEM' exceeded the maximum allowable percentage loss within the time window.
Stock 'KR' exceeded the maximum allowable percentage loss within the time window.
Stock 'RE' exceeded the maximum allowable percentage loss within the time window.


                                                                                

Stock 'UPS' exceeded the maximum allowable percentage loss within the time window.
Stock 'KO' exceeded the maximum allowable percentage loss within the time window.


                                                                                

Stock 'DOV' exceeded the maximum allowable percentage loss within the time window.


                                                                                

Stock 'KMI' exceeded the maximum allowable percentage loss within the time window.




Stock 'ULTA' exceeded the maximum allowable percentage loss within the time window.
Stock 'ANSS' exceeded the maximum allowable percentage loss within the time window.
Stock 'ZION' exceeded the maximum allowable percentage loss within the time window.


                                                                                

## Imagine you own some stocks (stored in a data frame with the schema <name,amount of stocks owned>). Compute how your asset changes with the fluctuation of the market

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, last, sum

# Initialize a Spark session
spark = SparkSession.builder.appName("AssetFluctuation").getOrCreate()

# Load stock price data as a streaming DataFrame
stock_price_streaming = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9092") \
    .option("subscribe", "stock_prices") \
    .load()

# Define the schema for the streaming DataFrame
schema = StructType([
    StructField("name", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("timestamp", TimestampType(), True)
])

# Parse the JSON value and select relevant columns
stock_price_data = stock_price_streaming.selectExpr("CAST(value AS STRING) as json") \
    .selectExpr("from_json(json, 'name STRING, price DOUBLE, timestamp TIMESTAMP') as parsed_value") \
    .select("parsed_value.*")

# Load the data frame with the amount of stocks you own
your_stocks = spark.read.csv("your_stocks.csv", header=True, schema=schema)

# Join the streaming DataFrame with your stocks data using 'name' as the common key
joined_data = stock_price_data.join(your_stocks, "name")

# Calculate the total value of your assets
asset_value = joined_data.withColumn("asset_value", col("price") * col("amount"))

# Group by timestamp to calculate the sum of your assets over time
agg_data = asset_value.groupBy(window(col("timestamp"), "5 minutes")).agg(sum("asset_value").alias("total_asset_value"))

# Define the query to write the results to an output sink (e.g., console)
query = agg_data.writeStream.outputMode("update") \
    .format("console") \
    .start()

# Await termination of the streaming query
query.awaitTermination()

AnalysisException: Path does not exist: file:/opt/workspace/your_stocks.csv;