In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.types import *
from pyspark.sql.functions import col, to_timestamp, unix_timestamp, substring, from_json
from decimal import Decimal

spark = SparkSession \
    .builder \
    .appName("StreamingPlot") \
    .getOrCreate()

tickData_schema = StructType([
    StructField('timestamp', StringType(), True),
    StructField('symbol', StringType(), True),
    StructField('side', StringType(), True),
    StructField('size', StringType(), True),
    StructField('price', StringType(), True),
    StructField('tickDirection', StringType(), True),
    StructField('trdMatchID', StringType(), True),
    StructField('grossValue', StringType(), True),
    StructField('homeNotional', StringType(), True),
    StructField('foreignNotional', StringType(), True)
])



In [2]:
df_raw = spark.readStream.option("header","false").schema(tickData_schema).csv("./output/tick_data/destination")

df_raw.createOrReplaceTempView("trade_history")

aggInterval = "5 minutes"

def createSQLStatement(interval):
    switcher = {
        "15 seconds" : ",cast(concat(date(timestamp),' ',hour(timestamp),':',minute(timestamp),':',floor(second(timestamp)/15)*15) as timestamp) as group_column",
        "30 seconds" : ",cast(concat(date(timestamp),' ',hour(timestamp),':',minute(timestamp),':',floor(second(timestamp)/30)*30) as timestamp) as group_column",
        "1 minutes"  : ",cast(concat(date(timestamp),' ',hour(timestamp),':',minute(timestamp)) as timestamp) as group_column",
        "5 minutes"  : ",cast(concat(date(timestamp),' ',hour(timestamp),':',floor(minute(timestamp)/5)*5) as timestamp) as group_column",
        "15 minutes" : ",cast(concat(date(timestamp),' ',hour(timestamp),':',floor(minute(timestamp)/15)*15) as timestamp) as group_column",
        "1 hours"  : ",cast(concat(date(timestamp),' ',hour(timestamp),':00') as timestamp) as group_column",
        "4 hours"  : ",cast(concat(date(timestamp),' ',floor(hour(timestamp)/4)*4,':00') as timestamp) as group_column",
        "6 hours"  : ",cast(concat(date(timestamp),' ',floor(hour(timestamp)/6)*6,':00') as timestamp) as group_column",
        "12 hours" : ",cast(concat(date(timestamp),' ',floor(hour(timestamp)/12)*12,':00') as timestamp) as group_column",
        "1 days"  : ",cast(concat(date(timestamp),' ','00:00') as timestamp) as group_column"
    }
    return switcher.get(interval, "Please input correct interval")

df_group_temp = spark.sql('SELECT timestamp, symbol, price, size '+ \
                             createSQLStatement(aggInterval)+ \
                             ' FROM trade_history')

df_group_temp.createOrReplaceTempView("hist_group")
df_interval = spark.sql('''SELECT group_column as timestamp,
                           symbol,
                           first(price) as open, 
                           max(price) as high, 
                           min(price) as low, 
                           last(price) as close,
                           sum(size) as volumn
                           FROM hist_group
                           group by symbol, group_column
                           ''')




In [3]:
query = df_interval.writeStream.queryName("interval_data").outputMode("complete").format("memory").start()

query = df_raw.groupBy("symbol").count().writeStream.queryName("tick_count").outputMode("complete").format("memory").start()







In [5]:
tick_count = spark.sql( 'Select symbol, count from tick_count' )

tick_count.show()


+------+-----+
|symbol|count|
+------+-----+
|XBTU20| 5237|
|XBTM20| 2589|
+------+-----+



In [8]:
#from pyspark.sql.functions import *

df_XBTU20 = spark.sql('''Select timestamp, cast(timestamp as string) as ts_str, close as XBTU20_Close 
                            from interval_data where symbol = 'XBTU20'
                            ''')

df_XBTM20 = spark.sql('''Select cast(timestamp as string) as ts_str, close as XBTM20_Close 
                            from interval_data where symbol = 'XBTM20'
                            ''')

df_agg = df_XBTU20.join(df_XBTM20, df_XBTU20.ts_str == df_XBTM20.ts_str, 'inner'). \
                        orderBy('timestamp').drop(df_XBTU20.ts_str).drop(df_XBTM20.ts_str)
df_agg.printSchema()
df_agg.show()

root
 |-- timestamp: timestamp (nullable = true)
 |-- XBTU20_Close: string (nullable = true)
 |-- XBTM20_Close: string (nullable = true)

+-------------------+------------+------------+
|          timestamp|XBTU20_Close|XBTM20_Close|
+-------------------+------------+------------+
|2020-06-17 03:35:00|      9511.5|      9481.0|
|2020-06-17 03:40:00|      9510.0|      9473.0|
|2020-06-17 03:45:00|      9514.5|      9476.5|
|2020-06-17 03:50:00|      9509.5|      9472.5|
|2020-06-17 03:55:00|      9507.0|      9475.0|
|2020-06-17 04:00:00|      9502.0|      9469.5|
|2020-06-17 04:05:00|      9516.5|      9477.0|
|2020-06-17 04:10:00|      9517.5|      9481.0|
|2020-06-17 04:15:00|      9516.5|      9480.5|
|2020-06-17 04:20:00|      9521.0|      9483.5|
|2020-06-17 04:25:00|      9515.0|      9477.0|
|2020-06-17 04:30:00|      9510.5|      9474.0|
|2020-06-17 04:35:00|      9513.5|      9487.0|
|2020-06-17 04:40:00|      9521.5|      9485.5|
|2020-06-17 04:45:00|      9526.5|      9486.5

In [None]:
'''import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
# Only works for Jupyter Notebooks!
%matplotlib inline '''

In [None]:
'''count = 0
while count < 10:
    tick_count = spark.sql( 'Select symbol, count from tick_count' )
    pd_tick_count = tick_count.toPandas()
    display.clear_output(wait=True)
    sns.set()
    plt.figure( figsize = ( 10, 8 ) )
    sns.barplot( x="count", y="symbol", data=pd_tick_count)
    plt.show()
    count = count + 1
    time.sleep( 600 )

    
query.awaitTermination()'''