# DEBS 2022 Grand Challenge 
## Real-time Market Analytics for Rapid Deployment and High Accessiblity using PySpark and Docker

Group 4: Suyeon Wang, JaeKyeong Kim, Sejin Chun

This code is also available at [github](https://github.com/developsu/debs2022gc)

Before executing codes,
- upload the **challenger.proto** file 
- login with your google account 
- give your authorization to google drive for drive mount
- set up your token key
- set up the **folder dir** to store streams of csv files retrieved from challenger system
- Last, move **Runtime**-> Click [**Run all**] (Ctrl + F9)


**Dataset License.** Sebastian Frischbier, Jawad Tahir, Christoph Doblander, Arne Hormann, Ruben Mayer. 
The DEBS 2021 grand challenge: Detecting Trading Trends in Real Financial Tick Data. 
Proceedings of the 16th ACM International Conference on Distributed and Event-based Systems (DEBS'22) 

In [1]:
TOKEN_KEY = 'sdpiwsqcjhlimgruqdhyckrgwjwlawvh'

## Installation

In [2]:
!apt-get install openjdk-8-jdk-headless # jdk

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 2 newly installed, 0 to remove and 39 not upgraded.
Need to get 36.5 MB of archives.
After this operation, 143 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jre-headless amd64 8u312-b07-0ubuntu1~18.04 [28.2 MB]
Get:2 http://archive.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jdk-headless amd64 8u312-b07-0ubuntu1~18.04 [8,298 kB]
Fetched 36.5 MB in 1s (51.8 MB/s)
Selecting previously unselected package openjdk-8-jre-headless:amd64.
(Reading database

downloads and unzips the apache spark 

In [3]:
!wget -q -nc https://mirror.navercorp.com/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar -xvf spark-3.1.2-bin-hadoop3.2.tgz

spark-3.1.2-bin-hadoop3.2/
spark-3.1.2-bin-hadoop3.2/R/
spark-3.1.2-bin-hadoop3.2/R/lib/
spark-3.1.2-bin-hadoop3.2/R/lib/sparkr.zip
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/worker/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/worker/worker.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/worker/daemon.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/tests/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/tests/testthat/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/tests/testthat/test_basic.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/profile/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/profile/shell.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/profile/general.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/sparkr-vignettes.html
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/sparkr-vignettes.Rmd
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/sparkr-vignettes.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/index.html
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/R/
spark-3.1.2-

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [5]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [6]:
import findspark
findspark.init()

mounts google drive for the usage of cloud storage

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
%cd /content/drive/MyDrive/Colab Notebooks

/content/drive/MyDrive/Colab Notebooks


installs gRPC library in Python

In [9]:
!pip install grpcio



setting up libraries

In [10]:
import pyspark
import pandas
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql import *
from tqdm import tqdm
import ast
import sys
import os
import time
from datetime import datetime
from google.protobuf.timestamp_pb2 import Timestamp
import csv
import pandas as pd
# from dependencies.spark import start_spark
from pyspark.sql.functions import pandas_udf, PandasUDFType
# If grpc is missing: pip install grpcio
import grpc
from google.protobuf import empty_pb2
# If the classes below are missing, generate them:
# You need to install the grpcio-tools to generate the stubs: pip install grpcio-tools
# python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. challenger.proto

In [11]:
!python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. challenger.proto

/usr/bin/python3: Error while finding module specification for 'grpc_tools.protoc' (ModuleNotFoundError: No module named 'grpc_tools')


In [12]:
import challenger_pb2 as ch
import challenger_pb2_grpc as api

## Core functions

This function reads the stream of csv files from remote benchmark system

In [13]:
"""## Core functions
This function reads the stream of csv files from remote benchmark system
"""
def read_stream(folderDir):
    """
    reads Streaming data and sets up the schema 
    :param spark session
    :return: data stream
    """
    global spark
    stream_df = spark.readStream.format("csv") \
        .option("header", "true") \
        .schema(get_schema()) \
        .load(path=folderDir)
    
    return stream_df

This function represents the schema of stock market data

In [14]:
"""This function represents the schema of stock market data"""
def get_schema():
    """
    return schema for DEBS 2022
    :return: StructType
    """
    return StructType([
        StructField("id", LongType(), True),
        StructField("symbol", StringType(), True),
        StructField("Sectype", StringType(), True),
        StructField("lasttradeprice", DoubleType(), True),
        StructField("lastTrade", TimestampType(), True),
        StructField("seconds", LongType(), True),
        StructField("nanos", LongType(), True)
    ])

### Calc EMA

In [15]:
"""The following functions calculate EMA38 and EMA100 indicators, respectively."""
def calcEMA_func(symbol, lastTradePrice, prev_ema, days):
  current_EMA = ( lastTradePrice * (2 / (days + 1)) ) + ( prev_ema  * (1 - 2 / (days + 1)) )
  return current_EMA

### EMA_Comp

In [16]:
"""This function determines the choice of buy, sell, or stay(=hold)."""
def EMA_Comp(nEMA38_MINUS_EMA100, pEMA38_MINUS_EMA100):
    minus_prev = pEMA38_MINUS_EMA100
    minus_now = nEMA38_MINUS_EMA100
    if minus_now * minus_prev >= 0:
      return "Stay"
    else:
      if minus_now > minus_prev:
          return "Buy"

    return "Sell"

## transformation

In [37]:
def transform_calcEMA(stream):
    # global spark, prevEMA
    
    stream_df = stream.select("symbol", "Sectype", "lastTrade", "lasttradeprice", "seconds", "nanos").withWatermark("lastTrade", "5 minutes") \
        .groupBy(window("lastTrade", "5 minutes", "5 minutes"), col("symbol")) \
        .agg(lit(last("lasttradeprice")).alias("lasttradeprice"), lit(last("Sectype")).alias("Sectype"), lit(last("seconds")).alias("seconds"), lit(last("nanos")).alias("nanos"))\
        .orderBy("window")

    stream_df = stream_df.withColumn("lastWindowTime", to_timestamp(stream_df.window.end, "yyyy-MM-dd HH:mm:ss"))
    
    return stream_df

## results of Query 1

In [18]:
def send_to_message_query1(lookup_symbols):
  global prevEMA
  result_Q1 = ch.ResultQ1()
  list_indicators = list()

  # serialize
  for s in lookup_symbols: # search for symbols required
    if s in prevEMA.index: # if prevEMA tables has information about previous EMAs.
        row = prevEMA.loc[s]
        indicator = ch.Indicator(symbol = row['symbol'], \
                                ema_38 = row['EMA38'], \
                                ema_100 =  row['EMA100'] )
        
    else: # else , send with 0s
        indicator = ch.Indicator(symbol = s, \
                                ema_38 = 0, \
                                ema_100 =  0 )
    list_indicators.append(indicator)
  
  result_Q1.indicators.extend(list_indicators)
  
  return result_Q1.indicators

## results of Query 2

In [19]:
def send_to_message_query2(lookup_symbols):
    global crossover
    crossover = crossover[crossover['BuyOrSell'] != 'Stay']
    tail_crossover = crossover.groupby(['symbol']).tail(3)
    tail_crossover =tail_crossover.reset_index(drop=True)
    
    result_Q2 = ch.ResultQ2()
    list_crossover = list()

    # serialize
    for i in range(len(tail_crossover)): # search for symbols required
      cross_over_event = ch.CrossoverEvent()
      row = tail_crossover.loc[i,:]
      cross_over_event.symbol = row["symbol"]

      if row["BuyOrSell"] == "Buy":
        cross_over_event.signal_type = ch.CrossoverEvent.SignalType.Buy
      elif row["BuyOrSell"] == "Sell":
        cross_over_event.signal_type = ch.CrossoverEvent.SignalType.Sell

      if row["Sectype"] == "E" or row["Sectype"] == "0":
        cross_over_event.security_type = ch.SecurityType.Equity
      else:
        cross_over_event.security_type = ch.SecurityType.Index
      
      cross_over_event.ts.seconds = row['seconds']
      cross_over_event.ts.nanos = row['nanos']
      list_crossover.append(cross_over_event)
    
    result_Q2.crossover_events.extend(list_crossover)
    return result_Q2.crossover_events

## Streamed file processing
This funtions saves the stream of stock market data as csv files.

In [20]:
"""## Streamed file processing
This funtions saves the stream of stock market data as csv files.
"""
def make_CSV_file(batch_no, events):
    fieldnames = ["id", "symbol", "Sectype", "lasttradeprice", "lastTrade", "seconds", "nanos"]
    event_list_dict = []
    for i in range(len(events)):
            temp_dict = {}
            temp_dict['id'] = i
            temp_dict['symbol'] = events[i].symbol
            temp_dict['Sectype'] = events[i].security_type
            temp_dict['lasttradeprice'] = events[i].last_trade_price
            temp_dict['lastTrade'] = datetime.fromtimestamp(events[i].last_trade.seconds).strftime("%Y-%m-%d %H:%M:%S")
            temp_dict['seconds'] = events[i].last_trade.seconds
            temp_dict['nanos'] = events[i].last_trade.nanos
            event_list_dict.append(temp_dict)

    with open(folderDir + f'/stock_stream-{batch_no}.csv', 'w+', encoding='UTF8', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(event_list_dict)

## Main Program

setup global variables

In [21]:
global spark, sc
global calcEMA, detectCrossover
global prevEMA
global prevCrossover, crossover

### initialize spark

In [22]:
# start Spark application and get Spark session, logger and config
spark = SparkSession.builder.master("local[*]") \
        .config("spark.executor.instances", "2") \
        .config("spark.executor.cores", "2") \
        .config('spark.ui.port', '4050') \
        .config("spark.driver.memory", "20g")\
        .config("spark.executor.memory", "20g")\
        .config("spark.dynamicAllocation.enabled", "true") \
        .appName("Spark for DEBS 2020 GC").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

sc = spark.sparkContext

folderDir = "/content/drive/MyDrive/debs2022/nano_second_stream/"

### Define streaming input sources

In [23]:
# execute ETL pipeline
stream = read_stream(folderDir)

In [24]:
prevEMA = pd.DataFrame({'symbol': pd.Series(dtype='str'),
                   'EMA38': pd.Series(dtype='float'),
                   'EMA100': pd.Series(dtype='float')})

prevEMA.set_index('symbol')

Unnamed: 0_level_0,EMA38,EMA100
symbol,Unnamed: 1_level_1,Unnamed: 2_level_1


In [25]:
prevEMA = prevEMA.iloc[0:0]
prevEMA

Unnamed: 0,symbol,EMA38,EMA100


In [26]:
crossover = pd.DataFrame({'symbol': pd.Series(dtype='str'),
                   'lastWindowTime': pd.Series(dtype='timedelta64[ns]'),
                   'EMA38': pd.Series(dtype='float'),
                   'EMA100': pd.Series(dtype='float'),
                   'Sectype': pd.Series(dtype='str'),
                   'BuyOrSell': pd.Series(dtype='str'),
                   'seconds': pd.Series(dtype='int'),
                   'nanos': pd.Series(dtype='int')})


In [27]:
prevCrossover = pd.DataFrame({'symbol': pd.Series(dtype='str'),
                   'Diff': pd.Series(dtype='float')})

prevCrossover.set_index('symbol')

Unnamed: 0_level_0,Diff
symbol,Unnamed: 1_level_1


In [28]:
crossover = crossover.iloc[0:0]
crossover

Unnamed: 0,symbol,lastWindowTime,EMA38,EMA100,Sectype,BuyOrSell,seconds,nanos


### 2. Transform streaming data and define output sink&mode
queries 1 and 2

In [29]:
calcEMA = transform_calcEMA(stream)

writer1 =  calcEMA.writeStream.format("memory") \
    .outputMode("complete") \
    .option("truncate", False) \
    .queryName("EMA") # table

### Start the queries

In [30]:
streamingQuery1 = writer1.start()

In [31]:
def execute_streamingQuery1(df_a):
  global prevEMA, crossover
  for idx, row in tqdm(df_a.iterrows(), total = df_a.shape[0], disable=True):
    symbol = row['symbol']
    if symbol in prevEMA.index:
      existing_row = prevEMA.loc[symbol]
      new_EMA38 = calcEMA_func(symbol, row['lasttradeprice'], existing_row['EMA38'], 38 )
      new_EMA100 = calcEMA_func(symbol, row['lasttradeprice'], existing_row['EMA100'], 100 )
      
      prevEMA.loc[symbol] = { 'symbol': symbol, 'EMA38': new_EMA38, 'EMA100': new_EMA100}

      new_row = { 'symbol': symbol, 'lastWindowTime': row['lastWindowTime'],'EMA38': new_EMA38, 'EMA100': new_EMA100, 'Sectype': row['Sectype'], 'BuyOrSell' : 'Stay',  'seconds': row['seconds'], 'nanos': row['nanos'] }
      crossover = crossover.append(pd.Series(new_row, index=crossover.columns, name=symbol))
      
    else: 
      new_EMA38 = calcEMA_func(symbol, row['lasttradeprice'], 0.0, 38 )
      new_EMA100 = calcEMA_func(symbol, row['lasttradeprice'], 0.0, 100 )
      new_row = { 'symbol': symbol, 'lastWindowTime': row['lastWindowTime'],'EMA38': new_EMA38, 'EMA100': new_EMA100, 'Sectype': row['Sectype'], 'BuyOrSell' : 'Stay', 'seconds': row['seconds'], 'nanos': row['nanos'] }
      crossover = crossover.append(pd.Series(new_row, index=crossover.columns, name=symbol))

      new_row = { 'symbol': symbol, 'EMA38': new_EMA38, 'EMA100': new_EMA100}
      prevEMA = prevEMA.append(pd.Series(new_row, index=prevEMA.columns, name=symbol))
      

In [32]:
def execute_streamingQuery2():
  global crossover, prevCrossover
  for idx, row in tqdm(crossover.iterrows(), total = crossover.shape[0], disable=True):
    symbol = row['symbol']
    if symbol not in prevCrossover.index:
      new_row = { 'symbol': symbol, 'Diff': row['EMA38'] - row['EMA100']}
      prevCrossover = prevCrossover.append(pd.Series(new_row, index=prevCrossover.columns, name=symbol))
    
    existing_row = prevCrossover.loc[symbol]
    BuyOrSell = EMA_Comp(row['EMA38']-row["EMA100"], existing_row['Diff'])
    cond = (crossover.symbol == symbol) & (crossover.lastWindowTime == row['lastWindowTime'])
    crossover.loc[cond, ('BuyOrSell')] = BuyOrSell
    prevCrossover.loc[symbol] = { 'symbol': symbol, 'Diff':row['EMA38']-row["EMA100"]}
   

In [33]:
def reset_dataframe(lookup_symbols):
  global prevEMA, crossover, prevCrossover
  for s in range(len(lookup_symbols)):
    spec = prevEMA['symbol'].str.contains(lookup_symbols[s]).index
    if lookup_symbols[s] in prevEMA.index:
      prevEMA.drop(spec,inplace = True)

    spec = crossover['symbol'].str.contains(lookup_symbols[s]).index
    if lookup_symbols[s] in crossover.index:
      crossover.drop(spec,inplace = True)
      
    spec = prevCrossover['symbol'].str.contains(lookup_symbols[s]).index
    if lookup_symbols[s] in prevCrossover.index:
      prevCrossover.drop(spec,inplace = True)

## Challenger system

In [38]:
# %%timeit
# setting the option for GRPC
op = [('grpc.max_send_message_length', 100 * 1024 * 1024),
      ('grpc.max_receive_message_length', 1000 * 1024 * 1024)]

# Submit results with benchmark
with grpc.insecure_channel('challenge.msrg.in.tum.de:5023', options=op) as channel:
    stub = api.ChallengerStub(channel)

    # Create a new Benchmark
    benchmarkconfiguration = ch.BenchmarkConfiguration(token= TOKEN_KEY,
                                                        benchmark_name="Dept. Computer Engineering, DongA Univ.",
                                                        # This name is used here: https://challenge.msrg.in.tum.de/benchmarks/
                                                        benchmark_type="test",
                                                        # Test or Evaluation, Evaluation will be available end of January. Test can be used to start implementing
                                                        queries=[ch.Query.Q1, ch.Query.Q2])
    benchmark = stub.createNewBenchmark(benchmarkconfiguration)

    stub.startBenchmark(benchmark)
    event_count = 0

    while True:
        batch = stub.nextBatch(benchmark)
        event_count = event_count + len(batch.events)

        lookup_symbols = list(batch.lookup_symbols)
        print("batch seq_id", batch.seq_id)
        
        # 1. every batch data is stored as csv files in the specific folder
        make_CSV_file(batch.seq_id, batch.events)
        
        # 2. update the previous EMA table
        lastTrades = spark.sql("select * from EMA").filter(col('symbol').isin(lookup_symbols)).toPandas()
        
        execute_streamingQuery1(lastTrades)
        
        # 3. send the result of query 1 to the challenger system
        resultQ1 = ch.ResultQ1(
            benchmark_id=benchmark.id,  # The id of the benchmark
            batch_seq_id=batch.seq_id,  # The sequence id of the batch
            indicators= send_to_message_query1(lookup_symbols))  # Query1을 계산하여 challenge 서버에 보낼 message 생성
        stub.resultQ1(resultQ1)  # send the result of query 1 back
        
        # 4. update the previous crossover table
        execute_streamingQuery2()

        # 5. send the result of query 2 to the challenger system
        resultQ2 = ch.ResultQ2(
            benchmark_id=benchmark.id,  # The id of the benchmark
            batch_seq_id=batch.seq_id,  # The sequence id of the batch
            crossover_events= send_to_message_query2(lookup_symbols)) # 마지막 3개 크로스오버를 계산하여 challenge 서버에 보낼 message 생성
        
        stub.resultQ2(resultQ2)  # submit the results of Q2

        # prevEMA, Crossover 테이블 리셋
        reset_dataframe(lookup_symbols)
        
        # Step 4 - once the last event is received, stop the clock
        # See the statistics within ~5min here: https://challenge.msrg.in.tum.de/benchmarks/
        if batch.last:
            print(f"received last batch, total batches: {event_count}")
            stub.endBenchmark(benchmark)
            break
    

batch seq_id 0
batch seq_id 1
batch seq_id 2
batch seq_id 3
batch seq_id 4
batch seq_id 5


KeyboardInterrupt: ignored

In [None]:
print(f"Completed.")
spark.stop()

# Spark UI

In [None]:
!pip install pyngrok
from pyngrok import ngrok

In [None]:
NGROK_AUTH_TOKEN = "20Ayd0ezBx9EMWrfVGhPdif1skf_2h2MfcootBnk2WTBJf6GQ"
ngrok.set_auth_token(NGROK_AUTH_TOKEN)

In [None]:
public_url = ngrok.connect(4050, "http")
public_url

await terminiation

In [None]:
streamingQuery1.awaitTermination()
streamingQuery2.awaitTermination()

spark.stop()