# Revenue Fluctuation modelling

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import socket, time


lines = spark.readStream.format("socket")\
  .option("host", "localhost")\
  .option("port", 9975)\
  .load()

structuredStream = lines.select(\
  split(lines.value, ",")[0].cast('Timestamp').alias("Timestamp"),\
  split(lines.value, ",")[1].alias("vertical"),\
  split(lines.value, ",")[2].cast('Float').alias("revenue"),\
  split(lines.value, ",")[3].cast('Int').alias("impressions"))


## 1. Define the window computation

4 days for overall computation period

In [0]:
# Generate window computation for 4 days

windowedStream = structuredStream.groupBy(window("Timestamp", "96 hours", "96 hours"))


## 2. Aggregations

Define aggregations for the window stream

In [0]:
agregationsStream = windowedStream.agg(avg("revenue").alias("revenue_average"), sum("revenue").alias("total_revenue"), count("Timestamp").alias("count"))


## 3. Starting the stream

In [0]:
streamingETLQuery =\
agregationsStream \
  .writeStream \
  .format("memory") \
  .queryName("aggDF") \
  .outputMode("complete")\
  .start()

## 4. Stopping the stream after 1 iteration to stop data from looping over (didn't work because the aggregation computation below has priority over the stop function)

In [0]:
#Defining the timeout
streamingETLQuery.awaitTermination(timeout = 210) 
streamingETLQuery.stop() 

## 5. Visualizing the stream contents to see if it makes sense 

Check if the stream works correctly and if the data is aggregated correctly

In [0]:
from pyspark.sql.functions import desc
#display(agregationsStream)
display(agregationsStream.sort(asc("window.start")))

## 6. Computation and visualization of the resulting streaming dataframe

In [0]:
import math
import sys
from pyspark.sql.functions import asc
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id,row_number

df = spark.sql("select * from aggDF")
print("Running SQL....")

#Defining variables for iteration and the sliding window 
iter = 1
n = 1 
x = 4


prev_avg = 1.0
old_count = 0

while True:
  
  #
  if(df.count() != 0):
    while(old_count == df.count()):
      #
      continue
      
    # 
    old_count = df.count()
    
    print("*********************************************************************************************************")
    print("\nIteration no. "+ str(iter) + "\n")
    print("Sample data from streaming dataframe:\n")
    
    # set the time to be the end of the window and give an ID to each window
    df = df.withColumn("time", df.window.end)
    df = df.withColumn("row_idx",row_number().over(Window.orderBy(monotonically_increasing_id())))
       
    # Sort ascendingly in batches of 4
    df = df.sort(asc("time"))
    df.filter(df.row_idx.between(n,x)).show(truncate=False)

    print("Current number of windows processed in stream: "+ str(old_count) + "\n")

    #Aggregate the revenue  
    val = df.select('total_revenue').agg(sum('total_revenue')).collect()[0][0]

    #Calculate historical average
    counts = df.select('count').agg(sum('count')).collect()[0][0]
    if(counts != None and counts != 0):
      # update the historic average taking into account the new aggregated values
      historic_avg = val/counts

    #Use pretty printing
    for indent in range(iter - 1):
      sys.stdout.write("|****|")
    print ("|----|  Average price before current window: " + str(prev_avg))
    sys.stdout.flush()
    
    
    #Create a rule to iterate over 
    if iter == 1: 
        current_avg = df.select("revenue_average").take(1)[0][0]
    else: 
        current_avg = df.select("revenue_average").take(n)[n-1][0]
    
    #Use pretty printing
    for indent in range(iter - 1):
      sys.stdout.write("      ") 
    sys.stdout.write("|****| " + " Current window average price: " + str(current_avg))
    print

    #Calculate revenue 
    if((math.fabs(current_avg - prev_avg)/prev_avg) > 0.45 and iter > 1):
        print("\n\t!!! PRICE FLUCTUATION: price fluctuated by "+ str(float("{0:.2f}".format(math.fabs(current_avg - prev_avg)/prev_avg)) * 100) + " % !!!\n\n\n")
        
    #Add 4 to the variables for next batch    
    prev_avg = historic_avg
    n += 4
    x += 4
    iter += 1

Running SQL....
*********************************************************************************************************

Iteration no. 1

Sample data from streaming dataframe:

+------------------------------------------+------------------+------------------+-----+-------------------+-------+
|window                                    |revenue_average   |total_revenue     |count|time               |row_idx|
+------------------------------------------+------------------+------------------+-----+-------------------+-------+
|{2022-09-01 00:00:00, 2022-09-05 00:00:00}|242.5999959309896 |727.7999877929688 |3    |2022-09-05 00:00:00|1      |
|{2022-09-17 00:00:00, 2022-09-21 00:00:00}|284.8249969482422 |1139.2999877929688|4    |2022-09-21 00:00:00|2      |
|{2022-09-29 00:00:00, 2022-10-03 00:00:00}|246.70000076293945|986.8000030517578 |4    |2022-10-03 00:00:00|4      |
|{2022-10-15 00:00:00, 2022-10-19 00:00:00}|327.2249984741211 |1308.8999938964844|4    |2022-10-19 00:00:00|3      |
+-