### <b style="color:blue;"> Photovoltaic Energy Production Prediction using Structured Streaming </b>

**Goal**: receive current production and current weather data, apply prediction model (from other notebook) to predict energy production in the next hour

In [0]:
import socket, time
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler 

In [0]:
# Data structure PV data:

# 	timestamp	production
# 127776	01/11/2022 00:00	
# 127777	01/11/2022 00:15	

# 1. Read input stream from socket (by default, sockets contain raw strings, which we must then parse in a structured format)
lines_PV = spark.readStream.format("socket")\
  .option("host", "localhost")\
  .option("port", 9996)\
  .load()

# 2. Split the lines by comma,, select relevant columns, assign column names, cast to proper type (default = String)
structuredStream_PV = lines_PV.select(\
  split(lines_PV.value, ",")[1].cast('Timestamp').alias("DateTime"),\
  split(lines_PV.value, ",")[2].cast('Float').alias("production"))

In [0]:
# Data structure weather data:

### CURRENT:
# 	DateTime	rr_SMA	ss_SMA	dd_SMA	ff_SMA
# 0	2022-11-01 00:00:00+00:00	0	0	171.3333333	2
# 1	2022-11-01 00:30:00+00:00	0	0	230.3333333	1.6
# 2	2022-11-01 01:00:00+00:00	0	0	183.6666667	2.766666667

# 1. Read input stream from socket (by default, sockets contain raw strings, which we must then parse in a structured format)

lines_SMN = spark.readStream.format("socket")\
  .option("host", "localhost")\
  .option("port", 9998)\
  .load()

# 2. Split the lines by comma,, select relevant columns, assign column names, cast to proper type (default = String)
structuredStream_SMN = lines_SMN.select(\
  split(lines_SMN.value, ",")[1].cast('Timestamp').alias("DateTime"),\
  split(lines_SMN.value, ",")[2].cast('Float').alias("rr_SMA"),\
  split(lines_SMN.value, ",")[3].cast('Float').alias("ss_SMA"),\
  split(lines_SMN.value, ",")[4].cast('Float').alias("dd_SMA"),\
  split(lines_SMN.value, ",")[5].cast('Float').alias("ff_SMA"))

In [0]:
#display(structuredStream_PV)

In [0]:
#display(structuredStream_SMN)

#### <b style="color:blue;"> Generate the running query </b>

Group and process streaming data in windows of time, aggregate in order that windows for two streams result in the same timerange:
- Streamed SolarEdge data with updates every 15' (real time), 1.5 seconds for the purpose of this exercise
- Streamed weather data with updates every 30' (real time), 3.0 seconds for the purpose of this exercise

i.e. aggregate every 60' (real time), 60 seconds for the purpose of this exercise

In [0]:
# Generate window computations
windowedStream_PV = structuredStream_PV.groupBy(window("DateTime", "60 minutes", "60 minutes"))
windowedStream_SMN = structuredStream_SMN.groupBy(window("DateTime", "60 minutes", "60 minutes"))

In [0]:
# Aggregate the windowed computations
agregationsStream_PV = windowedStream_PV.agg(sum("production").alias("production"), count("DateTime").alias("count_PV_items"))

agregationsStream_SMN = (windowedStream_SMN.agg(
    sum("rr_SMA").alias("rr_SMA_w"),
    sum("ss_SMA").alias("ss_SMA_w"),
    avg("dd_SMA").alias("dd_SMA_w"),
    avg("ff_SMA").alias("ff_SMA_w"),
    count("DateTime").alias("count_SMN_items")))

#agregationsStream = windowedStream.agg(avg("price_high").alias("window_price_average"), sum("price_high").alias("price_sum"), count("timestamp").alias("count_prices"))

In [0]:
# NOTE: always use 'complete' outputMode for aggregations and 'append' outputMode to get complete records (the entire stream)

streamingETLQuery_PV =\
agregationsStream_PV \
  .writeStream \
  .format("memory") \
  .queryName("aggDF_PV") \
  .outputMode("complete")\
  .start()  

In [0]:
streamingETLQuery_SMN =\
agregationsStream_SMN \
  .writeStream \
  .format("memory") \
  .queryName("aggDF_SMN") \
  .outputMode("complete")\
  .start()

In [0]:
#display(dbutils.fs)

In [0]:
#display(agregationsStream_PV.sort(desc("window.start")))

In [0]:
#display(agregationsStream_SMN.sort(desc("window.start")))

#### <b style="color:blue;"> Load the model </b>

In [0]:
mPath =  "/path/to/model/folder/rf_model_wlag1h"  # chosen model
model = PipelineModel.load(mPath)

#### <b style="color:blue;"> Computation and visualization of the resulting streaming dataframe</b>

In [0]:
lag_window = Window.orderBy("DateTime")

In [0]:
df_PV = spark.sql("select * from aggDF_PV")
df_SMN = spark.sql("select * from aggDF_SMN")
print("Running SQL (initialization may take a while)...")

iter = 1
old_count = 0

while True:
  # only start computing once some data has been collected (note that the dataframe is automatically updated by Spark)
  if(df_SMN.count() != 0):
    #while(old_count == df.count()):
    #  # don't do anything while there is no new data
    while(old_count == df_SMN.count()):
        # don't do anything while there is no new data
        continue
    
    # update the total count in order to be able to use the condition above in the next iteration
    old_count = df_SMN.count()

    df = df_PV.join(df_SMN, ['window'], 'left')
    
    print("*********************************************************************************************************")
    print("\nIteration no. "+ str(iter) + "\n")
    print("Sample data from streaming dataframe:\n")
    
    # set the time to be the end of the window (just to have a visual indication of time)
    df = df.withColumn("DateTime", df.window.start)
       
    # we sort by time first, because by default there is no ordering in the streaming dataframe and we want to only show the most recent results
    # the following allows showing the most recent 5 windows (the False parameter instructs Spark not to truncate the output)
    df = df.sort(desc("DateTime"))
    df = df.withColumn("count_items", col("count_PV_items")+col("count_SMN_items"))
    df = df.drop("count_PV_items", "count_SMN_items")
    df = df.filter(df.count_items >= 4)
    df.show(5, False)
    # set the time to be the end of the window (just to have a visual indication of time)
    df = df.withColumn("time", df.window.end)

    print("Current number of windows processed in stream: "+ str(old_count) + "\n")

    # prepare DF for prediction
    input_df = df.select("DateTime","production", df.rr_SMA_w.alias("rr_SMA"), df.ss_SMA_w.alias("ss_SMA"), df.dd_SMA_w.alias("dd_SMA"), df.ff_SMA_w.alias("ff_SMA")).limit(5) # select and rename columns
    input_df = input_df.fillna(0) # Replace missing values
    input_df = input_df.withColumn('date', col('DateTime').cast('date'))
    input_df = input_df.withColumn("year",year("DateTime")) # To Get Year from date or Time column
    input_df = input_df.withColumn("month",month("DateTime"))
    input_df = input_df.withColumn("day",dayofmonth("DateTime"))
    input_df = input_df.withColumn("hour",hour("DateTime"))
    input_df = input_df.withColumn("quarter-of-year",quarter("DateTime"))
    input_df = input_df.withColumn("week-of-year",weekofyear("DateTime"))
    input_df = input_df.withColumn("lag_1h",lag("production",1).over(lag_window))
    input_df = input_df.sort(desc("DateTime"))
    input_df = input_df.limit(4)
    input_df.show()

    predictions_df = model.transform(input_df)
    predictions_df = predictions_df.sort(desc("DateTime"))
    predictions_df.show()
    
    for indent in range(iter - 1):
      sys.stdout.write("|****|")
    print ("|----|  Last hour's predicted production: ", str(predictions_df.collect()[1][15]))
    print ("|----|  Last hour's effective production: ", str(predictions_df.collect()[1][1]))
    print ("|----|  Next hour's predicted production: ", str(predictions_df.collect()[0][15]))
    #print ("|----|  test 1: " + str(df.select("production").take(2))[0][0])
    #print ("|----|  test 2: " + str(df.select("production").take(2))[0][1])
    #print ("|----|  test 3: " + str(df.select("production").take(2))[1][0])
    sys.stdout.flush()

    # #TODO: take the most recent window average - use the field name you assigned for the average price per window
    # # since we already ordered by time, you can simply take the first element of the result
    # current_avg = df.select("window_price_average").take(1)[0][0]
    
    # # the following lines just handle pretty-printing the output...
    # for indent in range(iter - 1):
    #   sys.stdout.write("      ") 
    # sys.stdout.write("|****| " + " Current window average price: " + str(current_avg))
    # print

    # # compute the percentage change from the historic average up until this window (ignoring the first iteration, where the historic average is a random number)
    # if((math.fabs(current_avg - prev_avg)/prev_avg) > 0.3 and iter > 1):
    #     print("\n\t!!! ANOMALY DETECTED: price fluctuated by "+ str(float("{0:.2f}".format(math.fabs(current_avg - prev_avg)/prev_avg)) * 100) + " % !!!\n\n\n")
        
    # prev_avg = historic_avg
    
    iter += 1