In [0]:

# Orginal Workbook: Price Anomaly Detection for Stock Exchange Data using Structured Streaming 

######################################## Morphing into the stream of meteo data ##############################################

The goal of this exercise is to analyse real stock exchange data using Spark Structured Streaming and to detect anomalies in the price fluctuation. In order to achieve this, the Structured Streaming consumer (running in this notebook) will read financial data from a socket stream (make sure to start the Producer first in order to have this data available), compute a few aggregations on the stream and compare the new results with the historical values.

As a first step, we must parse the strings read from the socket stream, in order to be able to further on process the incoming data in a structured format. This parsing is already provided to you in the code block below. Have a look through the structure of the CSV input file used in the Producer to make sure you understand how the code below works.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import socket, time

# Simon's project  pull function for selecting the rows

from pyspark.sql.functions import col

# Too high share price increase in stock
#=======================================

# This code seeks through the NYSE stock exchange data to see which
# transactions are unusually priced, as compared to the overall previous price average.

# Look at the CSV input file to understand how the data is structured

# exchange company date price_open price_high price_low price_close stock_volume price_adj_close timestamp
# NYSE	ASP	2001-12-31	12.55	12.8	12.42	12.8	11300	6.91 2018-02-14 13:16:44.550444
   
  
# 1. Read input stream from socket (by default, sockets contain raw strings, which we must then parse in a structured format)

lines = spark.readStream.format("socket")\
  .option("host", "localhost")\
  .option("port", 9998)\
  .load()

structuredStream = lines.select(\
  split(lines.value, ",")[0].alias("starttime"),\
  split(lines.value, ",")[1].alias("site"),\
  split(lines.value, ",")[2].alias("masl"),\
  split(lines.value, ",")[3].alias("magl"),\
  split(lines.value, ",")[4].alias("x"),\
  split(lines.value, ",")[5].alias("y"),\
  split(lines.value, ",")[6].alias("sensor"),\
  split(lines.value, ",")[7].cast('Float').alias("temperature"),\
  split(lines.value, ",")[8].cast('Float').alias("humidity"))  

# Generate the running query

After reading in the raw stream, we need to first apply a few transformations in order to only keep the relevant data for the anomaly detection.

## 1. Window computation

First, we want to group and process streaming data in windows of time. For this exercise, we will apply transformations on each **4 seconds** of data. For this purpose, we need to group by a window of 4 seconds, based on the field that identifies the time of each row in the structured stream.

Fill in the code below in order to achieve this (you can see an example of how to achieve this in the lecture slides).

In [0]:
# Simon's project: move the slicing and dicing here as: 'GroupedData' object has no attribute 'select'

SliceDice01 = structuredStream.select("starttime","site","temperature")

SliceDice02 = SliceDice01.where(col("site") == "Winterthur Bahnhof")


In [0]:
# Generate window computation

# Simon's project streaming window ond sliced ond diced data

windowedStream = SliceDice02.groupBy(window("starttime","24 hours","24 hours"))


In [0]:
## 2. Aggregations

## Once we have generated a windowed computation on the raw stream, we need to also define the aggregations that we want to perform on the data. It is always good practice to use built-in aggregations of Spark whenever        possible (for example, using the *agg* function) and to only keep the relevant data from the structured stream, as oposed to keeping in memory the entire stream, which might lead to memory issues. 

## Discarding unrelevant data can be achieved in 2 ways:


## 1. always performing aggregations instead of keeping the raw data in memory (e.g. for a *price* column, only keep the average/max/min value per window instead of all the data points)


## Revelant for Simon's project:

## 2. in cases where this is not possible, always explicitly select only the fields of interest and drop the remaining ones

## These two measures ensure that you will not run into memory problems, even if the streaming dataframe grows continuously.

In this exercise we will use the approach 1. - namely, storing only aggregations instead of raw data. 

Fill in the transformations below, using the *avg*, *count* and *sum* aggregations. Keep in mind that the goal of the exercise is to:

  a. compute the average price per window - this can be done straightforwardly by using a Spark built-in function;

  b. compare the window average with the overall (historic) average (over all windows) - this will require also counting the number of elements in each window and their total price.
  
Finally, to make it easier to use the aggregation columns later on, make sure to also assign them intuitive names, by using the *alias* function.

In [0]:
#### Reactivated aggregation and adjusted to catch average temperature per day:

agregationsStream = windowedStream.agg(avg("temperature").alias("window_temp_average"), count("starttime").alias("count_temps"))

## Starting the stream

Note that so far, all we have done is to create a logical plan for Spark to handle the structured stream. In order to start the actual computation, we need to use the *writeStream* function and then explicitly *start()* the corresponding stream. See an example in the lectures on how to achieve this. Additionally, make sure to assign a name for the resulting query handle (using the *queryName()* function), in order to be able to directly reference it as a table using the SQL context later on.

In [0]:
# NOTE: always use 'complete' outputMode for aggregations and 'append' outputMode to get complete records (the entire stream)

# Simon's project: last modification is the windowedStream, the slicing and dicing that replace the aggregations were performed prior to the windowedStream

# The original algorithm with aggregation as last data modification

streamingETLQuery =\
agregationsStream \
  .writeStream \
  .format("memory") \
  .queryName("aggDF") \
  .outputMode("complete")\
  .start()  

#Visualizing the stream contents

You can display a structured stream dataframe in the same way you would a regular dataframe. Databricks will automatically refresh the table displayed.

To try this out, first cancel the previous command (which started writing the stream) and then run the display command below on the stream variable. Once the stream starts, you will see an ID replacing the "Stream initializing" message (it might take a while). Click on the green icon that will appear underneath the command, in order to display statistics about the stream as well as its contents. 

Notice that data will not necessarily be displayed in order of arrival. In order to see the most recent data first, also sort the stream by the window start time when displaying.

NOTE: the *display* command will only work as presented above in the Databricks environment (not in a local installation).

In [0]:
from pyspark.sql.functions import desc
#display(agregationsStream)
display(agregationsStream.sort(desc("window.start")))

## Computation and visualization of the resulting streaming dataframe

Finally we are able to use the structured stream in order to detect anomalies in the stock exchange prices. Fill in the final TODOs below in order to achieve this. Make sure to start the Producer before running all the code blocks in this Notebook to get the results.

In [0]:
import math
import sys
from pyspark.sql.functions import desc

#TODO: complete here the SQL statement required to get all the contents of the stream (use the name assigned earlier)
df = spark.sql("select * from aggDF")
print("Running SQL (initialization may take a while)...")

iter = 1

# initialize the historic average with some random value
prev_avg = 1.0
old_count = 0

while True:
  
  # only start computing once some data has been collected (note that the dataframe is automatically updated by Spark)
  if(df.count() != 0):
    while(old_count == df.count()):
      # don't do anything while there is no new data
      continue
      
    # update the total count in order to be able to use the condition above in the next iteration
    old_count = df.count()
    
    print("*********************************************************************************************************")
    print("\nIteration no. "+ str(iter) + "\n")
    print("Sample data from streaming dataframe:\n")
    
    # set the time to be the end of the window (just to have a visual indication of time)
    df = df.withColumn("time", df.window.end)
       
    # we sort by time first, because by default there is no ordering in the streaming dataframe and we want to only show the most recent results
    # the following allows showing the most recent 5 windows (the False parameter instructs Spark not to truncate the output)
    df = df.sort(desc("time"))
    df.show(5, False)

    print("Current number of windows processed in stream: "+ str(old_count) + "\n")

    #TODO: use here the field that denotes the sum of prices per window.
    # In order to get the overall historic sum, you must add all the prices per window (use a new aggregate to do this)
    val = df.select('price_sum').agg(sum('price_sum')).collect()[0][0]

    #TODO: the same for the counts of all items with prices
    counts = df.select('count_prices').agg(sum('count_prices')).collect()[0][0]
    if(counts != None and counts != 0):
      # update the historic average taking into account the new aggregated values
      historic_avg = val/counts
     
   
    # the following lines just handle pretty-printing the output...
    for indent in range(iter - 1):
      sys.stdout.write("|****|")
    print ("|----|  Average price before current window: " + str(prev_avg))
    sys.stdout.flush()
    
    #TODO: take the most recent window average - use the field name you assigned for the average price per window
    # since we already ordered by time, you can simply take the first element of the result
    current_avg = df.select("window_price_average").take(1)[0][0]
    
    # the following lines just handle pretty-printing the output...
    for indent in range(iter - 1):
      sys.stdout.write("      ") 
    sys.stdout.write("|****| " + " Current window average price: " + str(current_avg))
    print

    # compute the percentage change from the historic average up until this window (ignoring the first iteration, where the historic average is a random number)
    if((math.fabs(current_avg - prev_avg)/prev_avg) > 0.3 and iter > 1):
        print("\n\t!!! ANOMALY DETECTED: price fluctuated by "+ str(float("{0:.2f}".format(math.fabs(current_avg - prev_avg)/prev_avg)) * 100) + " % !!!\n\n\n")
        
    prev_avg = historic_avg
    iter += 1