## Project Template

In [None]:
from pyspark.sql.functions import explode, split, col, window

from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession


spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars.packages", 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0') \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3a9babfe-3c9f-4cd7-94a2-c024c18030bb;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 720ms :: artifacts dl 16

Be sure to start the stream on Kafka!

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType, DateType

schema = StructType(
      [
        StructField("name", StringType(), False),
        StructField("price", DoubleType(), False),
        StructField("timestamp", TimestampType(), False),
      ]
    )

In [None]:
kafka_server = "kafka1:9092"   
from pyspark.sql.functions import from_json

lines = (spark.readStream                        # Get the DataStreamReader
  .format("kafka")                                 # Specify the source format as "kafka"
  .option("kafka.bootstrap.servers", kafka_server) # Configure the Kafka server name and port
  .option("subscribe", "stock")                       # Subscribe to the "en" Kafka topic 
  .option("startingOffsets", "earliest")           # The start point when a query is started
  .option("maxOffsetsPerTrigger", 100)             # Rate limit on max offsets per trigger interval
  .load()
  .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
# Load the DataFrame
)
df = lines.select("parsed_value.*")


## The assignment starts here

You can create a

## Select the N most valuable stocks in a window

In [None]:
# Import required libraries
from pyspark.sql.functions import window, col, desc, rank
from pyspark.sql.window import Window

# Define the window duration
window_duration = "80 seconds"  # Set the window duration to 80 seconds

# Define the processing function
def process_batch_function(dataframe, epoch_id):
    # Group the data by window and name, and calculate the max price for each stock
    windowed_stocks = dataframe.groupBy(
        window(col("timestamp"), window_duration),
        col("name")
    ).agg({"price": "max"}).withColumnRenamed("max(price)", "max_price")

    # Define the window specification for ranking
    window_spec = Window.partitionBy("window").orderBy(desc("max_price"))

    # Apply the rank function over the window specification
    windowed_stocks = windowed_stocks.withColumn("rank", rank().over(window_spec))

    # Filter for the top N stocks
    N = 10  # For example, find the top 10 stocks
    top_n_stocks = windowed_stocks.where(col("rank") <= N)

    # Show the results for this micro-batch
    print(f"Batch ID: {epoch_id}")
    top_n_stocks.show(truncate=False)  # Avoid truncating the results for better readability

# Apply the function to each micro-batch
query = df.writeStream.foreachBatch(process_batch_function).start()

# Wait for the streaming query to terminate
query.awaitTermination()

## Select the stocks that lost value between two windows

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, window, lag

# Define the window duration
window_duration = "80 seconds"  # 80 seconds window

# Define the processing function
def process_batch_function(dataframe, epoch_id):
    # Group the data by window and name, and calculate the max price for each stock
    windowed_stocks = dataframe.groupBy(
        window(col("timestamp"), window_duration),
        col("name")
    ).agg({"price": "max"}).withColumnRenamed("max(price)", "max_price")

    # Extract window start for ordering
    windowed_stocks = windowed_stocks.withColumn("window_start", col("window.start"))

    # Define the window specification for calculating lag
    window_spec = Window.partitionBy("name").orderBy("window_start")

    # Add a lag column to calculate the previous max price
    windowed_stocks = windowed_stocks.withColumn("prev_max_price", lag("max_price").over(window_spec))

    # Filter for stocks that lost value
    lost_value_stocks = windowed_stocks.where(col("max_price") < col("prev_max_price"))

    # Show or store the results
    print(f"Batch ID: {epoch_id}")
    lost_value_stocks.show(truncate=False)  # Show full data without truncation

# Apply the function to each micro-batch
query = df.writeStream.foreachBatch(process_batch_function).start()
query.awaitTermination()

## Select the stock that gained the most (between windows)

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, window, lead

# Define the window duration
window_duration = "80 seconds"  # 80 seconds window

# Define the processing function
def process_batch_function(dataframe, epoch_id):
    # Group the data by window and name, and calculate the max price for each stock
    windowed_stocks = dataframe.groupBy(
        window(col("timestamp"), window_duration),
        col("name")
    ).agg({"price": "max"}).withColumnRenamed("max(price)", "max_price")

    # Extract window start for ordering
    windowed_stocks = windowed_stocks.withColumn("window_start", col("window.start"))

    # Define the window specification for calculating lead
    window_spec = Window.partitionBy("name").orderBy("window_start")

    # Add a lead column to calculate the next max price
    windowed_stocks = windowed_stocks.withColumn("next_max_price", lead("max_price").over(window_spec))

    # Filter for stocks that gained value
    gained_value_stocks = windowed_stocks.where(col("max_price") < col("next_max_price"))

    # Show or store the results
    print(f"Batch ID: {epoch_id}")
    gained_value_stocks.show(truncate=False)  # Show full data without truncation

# Apply the function to each micro-batch
query = df.writeStream.foreachBatch(process_batch_function).start()
query.awaitTermination()

## Implement a control that checks if a stock does not lose too much value in a period of time 

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, when, window

# Define the window duration
window_duration = "80 seconds"  # 80 seconds window

# Define the window specification for calculating lag
window_spec = Window.partitionBy("name").orderBy("window.start")

# Define the threshold for allowable loss (e.g., 5%)
loss_threshold = 0.05

# Define the processing function
def process_batch_function(dataframe, epoch_id):
    # Group the data by window and name, and calculate the max price for each stock
    windowed_stocks = dataframe.groupBy(
        window(col("timestamp"), window_duration),
        col("name")
    ).max("price").withColumnRenamed("max(price)", "max_price")

    # Extract window start for ordering
    windowed_stocks = windowed_stocks.withColumn("window_start", col("window.start"))

    # Add a lag column to calculate the previous max price
    windowed_stocks = windowed_stocks.withColumn(
        "prev_max_price",
        lag("max_price").over(window_spec)
    )

    # Calculate the loss percentage for each stock within the window
    windowed_stocks = windowed_stocks.withColumn(
        "loss_percentage",
        when(
            col("prev_max_price").isNotNull(),
            (col("max_price") - col("prev_max_price")) / col("prev_max_price")
        ).otherwise(0.0)
    )

    # Filter for stocks that exceed the loss threshold
    high_loss_stocks = windowed_stocks.where(col("loss_percentage") < -loss_threshold)

    # Show or store the results
    print(f"Batch ID: {epoch_id}")
    high_loss_stocks.show(truncate=False)

# Apply the function to each micro-batch
query = df.writeStream.foreachBatch(process_batch_function).start()
query.awaitTermination()

## Compute your assets

In [None]:
from pyspark.sql.functions import col, window

# Define the window duration
window_duration = "80 seconds"  # 80 seconds window

# Define the processing function
def process_batch_function(dataframe, epoch_id):
    # Group the data by window and calculate the total assets
    total_assets_df = dataframe.groupBy(
        window(col("timestamp"), window_duration)
    ).sum("price").withColumnRenamed("sum(price)", "total_assets")

    # Show the results with batch identifier
    print(f"Batch ID: {epoch_id}")
    total_assets_df.show(truncate=False)

# Apply the function to each micro-batch
query = df.writeStream.foreachBatch(process_batch_function).start()
query.awaitTermination()
