In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
import datetime
from pytz import timezone
import sys
from lib import sparkStructuredStreaming
import os

### Set-up to stream from Kafka topic + read and write from/to Elasticsearch

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.elasticsearch:elasticsearch-spark-20_2.11:7.6.2 pyspark-shell'

In [3]:
#"127.0.0.1:9092" (local) //"10.0.0.8:9092" (BACC)
bootstrap = "127.0.0.1:9092"

In [4]:
#use this for elasticsearch, otherwise it won't recognize date field
get_datetime_kafka = udf(lambda x : datetime.datetime.fromtimestamp((x-7200000)/ 1000.0).strftime("%Y-%m-%d"'T'"%H:%M:%S"))
get_datetime_yahoo = udf(lambda x : ((timezone('Europe/Berlin').localize(x)).astimezone(timezone('UTC'))).strftime("%Y-%m-%d"'T'"%H:%M:%S"))

In [5]:
spark = SparkSession \
            .builder \
            .appName("KafkaIEXStructuredStreaming") \
            .master("local[*]") \
            .getOrCreate()

sqlContext = SQLContext(spark)

## 1. Implement Strategy

In [157]:
def dummy_strategy(pe_ratio):
    if pe_ratio < 10:
        action = "buy"
    elif pe_ratio > 15:
        action = "sell"
    else:
        action = "hold"
        
    return action

## 2. Backtesting with static Dataframes with Historical Data from Yahoo Finance (Pyspark)

### 2.1 Read historical data from yahoo finance, write into Elasticsearch

In [14]:
import yfinance as yf
#from elasticsearch import Elasticsearch

def history(symbol,period,interval):
    #read historical data from yahoo finance into a pandas df
    ticker = yf.Ticker(symbol)
    pandas_history = ticker.history(period=period, interval=interval)
    #tranform into spark df
    pandas_history.reset_index(drop=False, inplace=True)
    spark_history = sqlContext.createDataFrame(pandas_history)
    #transform time, add symbol, add unique id
    spark_history = spark_history.select("Open","High","Low","Close","Volume",get_datetime_yahoo("Datetime").cast("String").alias("date"))
    spark_history = spark_history.withColumn("symbol", lit(symbol))
    spark_history = spark_history.withColumn('id',concat(col("date"),col("symbol")))
    #write into elasticsearch
    spark_history.write\
                .format("es")\
                .mode("append")\
                .option("es.resource", interval+"/history")\
                .option("es.mapping.id", "id")\
                .option("es.nodes", "127.0.0.1:9200") \
                .save()
    
history("AAPL","2d","1m")

### 2.2 Read historical data from Elasticsearch into Spark Dataframe and evaluate Strategy with it

## 3. Trading Simulation / Performance Evaluation with realtime Streams (Spark Streaming)

### Stream real time quotes from Kafka topic

In [None]:
sss = sparkStructuredStreaming.kafka_spark_stream(bootstrap)

parsedDF = sss.stream_quotes(spark)       

selectDF_es = parsedDF \
        .select(explode(array("quote_data")))\
        .select("col.*",get_datetime_kafka("col.latestUpdate").cast("String").alias("date"))

## 4. Visualize results, either here with Plotly or write results into Elasticsearch -> Kibana

### Some functions for analysis

In [7]:
def time_chart(df,interval):
    # use df with "timestamp", "latestPrice", "Watermark"
    # get open, high, low prices for each time interval
    interval_values = df.groupBy(
        window(df.timestamp, interval))\
        .agg(max("latestPrice").alias("high"),\
            min("latestPrice").alias("low"),\
            min("timestamp").alias("open_time"))\
        .select("window.start","window.end","high","low","open_time")\
        .withWatermark("start", interval)
    
    # join to get opening price from opening time
    chart = interval_values.join(df,interval_values.open_time == df.timestamp, "left")\
        .drop("open_time","timestamp")\
        .withColumnRenamed("latestPrice","open")
        
    return chart

In [8]:
def moving_average(spark, df, update, interval):
    # simple moving average for the interval "interval"
    
    windowdf = df.select(window(df.timestamp, interval, update), df.latestPrice)
    
    windowdf.createOrReplaceTempView("windowdf_sql")
    
    sma = spark.sql("""SELECT windowdf_sql.window AS time, avg(windowdf_sql.latestPrice) AS average
                    FROM windowdf_sql
                    Group BY windowdf_sql.window
                    """)   
    return sma

### Playground

In [9]:
import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import clear_output
%matplotlib inline

In [11]:
def plot_stream(df, epoch_id):
    df = df.orderBy("timestamp")
    df_pd = df.toPandas()
    clear_output(wait=True)
    df_pd

In [None]:
average = moving_average(spark, selectDF, "1 minutes" ,"8 minutes")
selectDF\
    .writeStream\
    .outputMode("append")\
    .trigger(processingTime = "60 seconds")\
    .foreachBatch(plot_stream)\
    .start()\
    .awaitTermination()