## Project Template

In [2]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars.packages", 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0') \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2f94de01-7ac3-4e77-9d0e-e3c4fa0ce4d6;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 370ms :: artifacts dl 6m

Be sure to start the stream on Kafka!

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType, DateType

schema = StructType(
      [
        StructField("name", StringType(), False),
        StructField("price", DoubleType(), False),
        StructField("timestamp", TimestampType(), False),
      ]
    )

In [4]:
kafka_server = "kafka1:9092"   
from pyspark.sql.functions import from_json

lines = (spark.readStream                        # Get the DataStreamReader
  .format("kafka")                                 # Specify the source format as "kafka"
  .option("kafka.bootstrap.servers", kafka_server) # Configure the Kafka server name and port
  .option("subscribe", "stock")                       # Subscribe to the "en" Kafka topic 
  .option("startingOffsets", "earliest")           # The start point when a query is started
  .option("maxOffsetsPerTrigger", 100)             # Rate limit on max offsets per trigger interval
  .load()
  .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
# Load the DataFrame
)
df = lines.select("parsed_value.*")


## The assignment starts here

You can create a

## Select the N most valuable stocks in a window

In [6]:
#import required class
from pyspark.sql.functions import window
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
from pyspark.sql.functions import rank


#Define the window operation
window_duration = "80 seconds"  # 80 seconds window

#Define the processing function
def process_batch_function(dataframe, epoch_id):
    # Group the data by window and name, and calculate the max price for each stock
    windowedStocks = dataframe.groupBy(
        window(col("timestamp"), window_duration),
        col("name")
    ).max("price").withColumnRenamed("max(price)", "max_price")

    #Define the window specification for ranking
    windowSpec = Window.partitionBy("window").orderBy(desc("max_price"))

    #Apply the rank function over the window specification
    windowedStocks = windowedStocks.withColumn("rank", rank().over(windowSpec))

    #Filter for the top N stocks
    N = 10  # For example, find the top 10 stocks
    topNStocks = windowedStocks.where(col("rank") <= N)

    #Show the results
    topNStocks.show()
    
    
#Apply the function to each micro-batch
query = df.writeStream.foreachBatch(process_batch_function)\
        .start()

query.awaitTermination()

23/11/07 14:24:15 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-bf6bd9dd-3529-4bb2-945c-56d48f552050. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

+--------------------+----+---------+----+
|              window|name|max_price|rank|
+--------------------+----+---------+----+
|[2023-11-06 05:54...| AZO|   384.01|   1|
|[2023-11-06 05:54...|ISRG| 192.6465|   2|
|[2023-11-06 05:54...|  RE|   128.37|   3|
|[2023-11-06 05:54...| PXD|    128.0|   4|
|[2023-11-06 05:54...| CMI|   119.79|   5|
|[2023-11-06 05:54...| KMB|    90.08|   6|
|[2023-11-06 05:54...|  LH|   90.065|   7|
|[2023-11-06 05:54...| ACN|     77.0|   8|
|[2023-11-06 05:54...| VTR|     72.1|   9|
|[2023-11-06 05:54...| ARE|   71.155|  10|
+--------------------+----+---------+----+

+--------------------+-----+---------+----+
|              window| name|max_price|rank|
+--------------------+-----+---------+----+
|[2023-11-06 05:56...|  GWW|   228.67|   1|
|[2023-11-06 05:56...|  BXP|    110.1|   2|
|[2023-11-06 05:56...|  AGN|    93.13|   3|
|[2023-11-06 05:56...|  CXO|    92.59|   4|
|[2023-11-06 05:56...|   CB|    90.07|   5|
|[2023-11-06 05:56...|  SNA|    82.26|   6|
|

KeyboardInterrupt: 

+--------------------+----+---------+----+
|              window|name|max_price|rank|
+--------------------+----+---------+----+
|[2023-11-07 14:41...| CCL|    48.07|   1|
+--------------------+----+---------+----+

+--------------------+----+---------+----+
|              window|name|max_price|rank|
+--------------------+----+---------+----+
|[2023-11-07 14:41...| PFG|    51.66|   1|
+--------------------+----+---------+----+

+--------------------+----+---------+----+
|              window|name|max_price|rank|
+--------------------+----+---------+----+
|[2023-11-07 14:41...| PNC|    93.12|   1|
+--------------------+----+---------+----+

+--------------------+----+---------+----+
|              window|name|max_price|rank|
+--------------------+----+---------+----+
|[2023-11-07 14:41...|CDNS|    18.78|   1|
+--------------------+----+---------+----+

+--------------------+----+---------+----+
|              window|name|max_price|rank|
+--------------------+----+---------+----+
|[2023-

## Select the stocks that lost value between two windows

In [None]:
# remember you can register another stream


In [7]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, from_json, window, lag

# Define the window specification
window_duration = "80 seconds"  # 80 seconds window
windowSpec = Window.partitionBy("name").orderBy("window")

# Define the processing function
def process_batch_function(dataframe, epoch_id):
    # Group the data by window and name, and calculate the max price for each stock
    windowedStocks = dataframe.groupBy(
        window(col("timestamp"), window_duration),
        col("name")
    ).max("price").withColumnRenamed("max(price)", "max_price")

    # Add a lag column to calculate the previous max price
    windowedStocks = windowedStocks.withColumn("prev_max_price", lag("max_price").over(windowSpec))

    # Filter for stocks that lost value
    lostValueStocks = windowedStocks.where(col("max_price") < col("prev_max_price"))
    
    # Show or store the results
    lostValueStocks.show()

# Apply the function to each micro-batch
query = df.writeStream.foreachBatch(process_batch_function).start()
query.awaitTermination()


23/11/07 10:23:50 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a59af894-cb35-4a71-913b-8859c48817e1. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+--------------------+----+---------+--------------+
|              window|name|max_price|prev_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 05:56...| DHI|    23.44|          24.5|
|[2023-11-06 05:56...|NFLX|  26.0499|       28.1514|
|[2023-11-06 05:56...| DAL|    15.69|         17.04|
+--------------------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+



                                                                                

+--------------------+----+---------+--------------+
|              window|name|max_price|prev_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 14:05...| MCD|    120.8|       123.965|
|[2023-11-06 14:05...| HPQ|    12.62|         13.36|
+--------------------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+--------------------+----+---------+--------------+
|              window|name|max_price|prev_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 14:10...| DHI|    23.44|          24.5|
+--------------------+----+---------+--------------+

+--------------------+----+---------+--------------+
|              window|name|max_price|prev_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 14:12...|  SO|    46.36|         48.35|
+--------------------+----+---------+---------

                                                                                

+--------------------+----+---------+--------------+
|              window|name|max_price|prev_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 14:40...| PKI|    53.12|         54.37|
|[2023-11-06 14:40...| COP|    59.76|         63.35|
|[2023-11-06 14:40...|  DE|    96.46|         96.89|
+--------------------+----+---------+--------------+

+--------------------+----+---------+--------------+
|              window|name|max_price|prev_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 14:52...|ALXN| 198.2299|        205.11|
|[2023-11-06 14:52...| COG|     25.6|         27.23|
|[2023-11-06 14:53...| STX|   51.115|       51.8372|
+--------------------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+



                                                                                

+--------------------+----+---------+--------------+
|              window|name|max_price|prev_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 14:54...| ADS|   254.55|        254.99|
|[2023-11-06 14:54...|LRCX|    65.54|         65.82|
|[2023-11-06 14:54...| ALK|    80.49|          80.6|
|[2023-11-06 14:54...| MCO|   100.04|        103.24|
+--------------------+----+---------+--------------+

+--------------------+----+---------+--------------+
|              window|name|max_price|prev_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 14:56...|NCLH|   58.128|         60.67|
|[2023-11-06 14:56...| APC|    60.86|         72.26|
|[2023-11-06 14:56...| CHD|    41.81|        44.685|
+--------------------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+--------------------+----+---------+---------

KeyboardInterrupt: 

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+--------------------+-----+---------+--------------+
|              window| name|max_price|prev_max_price|
+--------------------+-----+---------+--------------+
|[2023-11-07 10:28...| FLIR|    31.79|        33.229|
|[2023-11-07 10:28...|  ADM|    34.88|         37.57|
|[2023-11-07 10:28...|DISCK|    72.94|         75.95|
|[2023-11-07 10:28...|  RMD|    48.91|         50.43|
|[2023-11-07 10:28...| TSCO|    60.66|         60.99|
+--------------------+-----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|wi

## Select the stock that gained the most (between windows)

In [None]:
# remember you can register another stream


In [9]:
from pyspark.sql.functions import lead

# Define the window specification
window_duration = "80 seconds"  # 80 seconds window
windowSpec = Window.partitionBy("name").orderBy("window")

# Define the processing function
def process_batch_function(dataframe, epoch_id):
    # Group the data by window and name, and calculate the max price for each stock
    windowedStocks = dataframe.groupBy(
        window(col("timestamp"), window_duration),
        col("name")
    ).max("price").withColumnRenamed("max(price)", "max_price")

    # Add a lead column to calculate the next max price
    windowedStocks = windowedStocks.withColumn("next_max_price", lead("max_price").over(windowSpec))

    # Filter for stocks that gained value
    gainedValueStocks = windowedStocks.where(col("max_price") < col("next_max_price"))
    
    # Show or store the results
    gainedValueStocks.show()

# Apply the function to each micro-batch
query = df.writeStream.foreachBatch(process_batch_function).start()
query.awaitTermination()


+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+



23/11/07 10:32:24 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0a2dae31-5485-4406-8cc2-d66c0d6ddf8c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


+--------------------+----+---------+--------------+
|              window|name|max_price|next_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 08:42...| VMC|    78.15|         83.19|
+--------------------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+--------------------+----+---------+--------------+
|              window|name|max_price|next_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 08:50...| EMN|  69.7399|         70.12|
|[2023-11-06 08:50...| AEP|    55.85|         57.75|
|[2023-11-06 08:50...| AJG|    47.48|         47.72|
+--------------------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|next_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|n

                                                                                

+------+----+---------+--------------+
|window|name|max_price|next_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+--------------------+----+---------+--------------+
|              window|name|max_price|next_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 06:20...| DRI|    69.01|         70.03|
+--------------------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|next_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|next_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+-----

                                                                                20]

+--------------------+----+---------+--------------+
|              window|name|max_price|next_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 06:33...| EXR|    72.72|         73.08|
|[2023-11-06 06:33...| NSC|    105.3|        107.17|
|[2023-11-06 06:33...|EBAY|    28.95|         29.87|
+--------------------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+--------------------+----+---------+--------------+
|              window|name|max_price|next_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 09:41...| UNM|    31.44|         32.95|
|[2023-11-06 09:41...| YUM|    66.95|         71.74|
+--------------------+----+---------+--------------+

+--------------------+----+---------+--------------+
|              window|name|max_price|next_max_price|
+--------------------+----+---------+---------

KeyboardInterrupt: 

+------+----+---------+--------------+
|window|name|max_price|next_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|next_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|next_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|next_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+--------------------+----+---------+--------------+
|    

                                                                                

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+--------------------+----+---------+--------------+
|              window|name|max_price|next_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 06:42...| PFG|     68.9|        69.455|
|[2023-11-06 06:42...|MCHP|    91.87|         95.62|
|[2023-11-06 06:42...| CCI|   105.13|        107.27|
|[2023-11-06 06:42...|  IP|    57.56|         57.65|
+--------------------+----+---------+--------------+

+--------------------+----+---------+--------------+
|              window|name|max_price|next_max_price|
+--------------------+----+---------+--------------+
|[2023-11-06 09:56...|VRTX|   107.79|        113.35|
+--------------------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+------------

## Compute your assets

In [10]:
# Define the window specification
window_duration = "80 seconds"  # 80 seconds window
windowSpec = Window.partitionBy("window")

# Define the processing function
def process_batch_function(dataframe, epoch_id):
    # Group the data by window and calculate the total assets
    assets = dataframe.groupBy(window(col("timestamp"), window_duration)).sum("price").withColumnRenamed("sum(price)", "total_assets")
    
    # Show or store the results
    assets.show()

# Apply the function to each micro-batch
query = df.writeStream.foreachBatch(process_batch_function).start()
query.awaitTermination()


23/11/07 10:34:15 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-3d758731-3541-4ebd-b5bd-1e0c2621401e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


+--------------------+-----+---------+--------------+
|              window| name|max_price|next_max_price|
+--------------------+-----+---------+--------------+
|[2023-11-06 06:56...|GOOGL| 432.8925|       436.436|
|[2023-11-06 06:56...|  EQT|    84.24|         84.93|
+--------------------+-----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|next_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+

+------+----+---------+--------------+
|window|name|max_price|prev_max_price|
+------+----+---------+--------------+
+------+----+---------+--------------+



23/11/07 10:34:16 ERROR MicroBatchExecution: Query [id = c85e597e-dd46-49a6-bd13-638eb3f1b408, runId = 5fcdd2fd-b54f-4b59-9e69-96a98968aacd] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", line 210, in call
    raise e
  File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", line 207, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "/tmp/ipykernel_19/4088841042.py", line 16, in process_batch
    windowedStocks = windowedStocks.withColumn("next_max_price", lead("max_price").over(windowSpec))
  File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/dataframe.py", line 2096, in withColumn
    return DataFrame(self._jdf.withColumn(colName, c

+--------------------+------------+
|              window|total_assets|
+--------------------+------------+
|[2023-11-06 05:54...|   5003.1127|
+--------------------+------------+



23/11/07 10:34:17 ERROR MicroBatchExecution: Query [id = 4c3d094d-6ae6-4e0d-81a2-c0714272429c, runId = 618d8965-9833-4fef-bb36-fcd6d86821ac] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", line 210, in call
    raise e
  File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", line 207, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "/tmp/ipykernel_19/2772434745.py", line 17, in process_batch
    windowedStocks = windowedStocks.withColumn("prev_max_price", lag("max_price").over(windowSpec))
  File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/dataframe.py", line 2096, in withColumn
    return DataFrame(self._jdf.withColumn(colName, co

+--------------------+------------------+
|              window|      total_assets|
+--------------------+------------------+
|[2023-11-06 05:56...|2860.9793000000004|
|[2023-11-06 05:54...|2437.3747000000003|
+--------------------+------------------+

+--------------------+------------------+
|              window|      total_assets|
+--------------------+------------------+
|[2023-11-06 05:56...|7870.3907999999965|
+--------------------+------------------+

+--------------------+-----------------+
|              window|     total_assets|
+--------------------+-----------------+
|[2023-11-06 05:56...|           107.03|
|[2023-11-06 05:57...|7746.949400000001|
+--------------------+-----------------+

+--------------------+------------------+
|              window|      total_assets|
+--------------------+------------------+
|[2023-11-06 05:57...|3738.5724999999993|
|[2023-11-06 05:58...|2522.3180000000007|
+--------------------+------------------+

+--------------------+--------------

KeyboardInterrupt: 

+--------------------+------------+
|              window|total_assets|
+--------------------+------------+
|[2023-11-06 07:37...|   9845.6981|
+--------------------+------------+

+--------------------+-----------------+
|              window|     total_assets|
+--------------------+-----------------+
|[2023-11-06 07:38...|6535.310199999997|
|[2023-11-06 07:37...|          3065.25|
+--------------------+-----------------+

+--------------------+------------------+
|              window|      total_assets|
+--------------------+------------------+
|[2023-11-06 07:38...|10858.612299999999|
+--------------------+------------------+

+--------------------+------------+
|              window|total_assets|
+--------------------+------------+
|[2023-11-06 07:38...|     223.845|
|[2023-11-06 07:40...|  10625.7716|
+--------------------+------------+

+--------------------+------------+
|              window|total_assets|
+--------------------+------------+
|[2023-11-06 07:41...|   4414.9519|


## 4. Implement a control that checks if a stock does not lose too much value in a period of time (feel free to choose the value you prefer).

In [6]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, when
from pyspark.sql.functions import window
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
from pyspark.sql.functions import rank

# Define the window specification
window_duration = "80 seconds"  # 80 seconds window
windowSpec = Window.partitionBy("name").orderBy("window")

# Define the threshold for allowable loss (e.g., 5%)
loss_threshold = 0.05

# Define the processing function
def process_batch_function(dataframe, epoch_id):
    # Group the data by window and name, and calculate the max price for each stock
    windowedStocks = dataframe.groupBy(
        window(col("timestamp"), window_duration),
        col("name")
    ).max("price").withColumnRenamed("max(price)", "max_price")

    # Add a lag column to calculate the previous max price
    windowedStocks = windowedStocks.withColumn("prev_max_price", lag("max_price").over(windowSpec))

    # Calculate the loss percentage for each stock within the window
    windowedStocks = windowedStocks.withColumn(
        "loss_percentage",
        when(col("prev_max_price").isNotNull(), (col("max_price") - col("prev_max_price")) / col("prev_max_price")).otherwise(0.0)
    )

    # Filter for stocks that exceed the loss threshold
    highLossStocks = windowedStocks.where(col("loss_percentage") < -loss_threshold)

    # Show or store the results
    highLossStocks.show()

# Apply the function to each micro-batch
query = df.writeStream.foreachBatch(process_batch_function).start()
query.awaitTermination()




23/11/07 14:56:58 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b8f27ff7-5090-460d-82e0-5cae5e2dbfa0. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

+------+----+---------+--------------+---------------+
|window|name|max_price|prev_max_price|loss_percentage|
+------+----+---------+--------------+---------------+
+------+----+---------+--------------+---------------+

+--------------------+----+---------+--------------+--------------------+
|              window|name|max_price|prev_max_price|     loss_percentage|
+--------------------+----+---------+--------------+--------------------+
|[2023-11-06 05:56...|NFLX|  26.0499|       28.1514|-0.07464992860035373|
|[2023-11-06 05:56...| DAL|    15.69|         17.04|-0.07922535211267603|
+--------------------+----+---------+--------------+--------------------+

+------+----+---------+--------------+---------------+
|window|name|max_price|prev_max_price|loss_percentage|
+------+----+---------+--------------+---------------+
+------+----+---------+--------------+---------------+

+------+----+---------+--------------+---------------+
|window|name|max_price|prev_max_price|loss_percentage|
+--

KeyboardInterrupt: 

+------+----+---------+--------------+---------------+
|window|name|max_price|prev_max_price|loss_percentage|
+------+----+---------+--------------+---------------+
+------+----+---------+--------------+---------------+

+------+----+---------+--------------+---------------+
|window|name|max_price|prev_max_price|loss_percentage|
+------+----+---------+--------------+---------------+
+------+----+---------+--------------+---------------+

+------+----+---------+--------------+---------------+
|window|name|max_price|prev_max_price|loss_percentage|
+------+----+---------+--------------+---------------+
+------+----+---------+--------------+---------------+

+------+----+---------+--------------+---------------+
|window|name|max_price|prev_max_price|loss_percentage|
+------+----+---------+--------------+---------------+
+------+----+---------+--------------+---------------+

+------+----+---------+--------------+---------------+
|window|name|max_price|prev_max_price|loss_percentage|
+-----

## 5. Imagine you own some stocks (stored in a data frame with the schema <name,amount of stocks owned>). Compute how your asset changes with the fluctuation of the market.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col
from pyspark.sql.functions import window
from pyspark.sql.functions import desc
from pyspark.sql.functions import rank



# Define the window specification
window_duration = "80 seconds"  # 80 seconds window
windowSpec = Window.partitionBy("name").orderBy("window")

# Define the processing function
def process_batch_function(dataframe, epoch_id):
    # Group the data by window and name, and calculate the max price for each stock
    windowedStocks = dataframe.groupBy(
        window(col("timestamp"), window_duration),
        col("name")
    ).max("price").withColumnRenamed("max(price)", "max_price")

    # Add a lag column to calculate the previous max price
    windowedStocks = windowedStocks.withColumn("prev_max_price", lag("max_price").over(windowSpec))

    # Calculate the number of stocks owned
    stocks_owned = dataframe.groupBy("name").agg({"amount": "sum"}).withColumnRenamed("sum(amount)", "stocks_owned")

    # Join the dataframes to compute asset value
    asset_change = windowedStocks.join(stocks_owned, "name", "inner")
    asset_change = asset_change.withColumn("asset_change", (col("max_price") - col("prev_max_price")) * col("stocks_owned"))

    # Show or store the results
    asset_change.show()

# Apply the function to each micro-batch
query = df.writeStream.foreachBatch(process_batch_function).start()
query.awaitTermination()
