In [None]:
import findspark
findspark.init()

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.3" pyspark-shell'

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName("Binance - Hyper price increase detection")
    .getOrCreate())

#PySpark
#Import some specific pyspark sql functions and the rest one are stored in F
from pyspark.sql.functions import udf, split, col, from_json
import pyspark.sql.functions as F 


from pyspark.sql.types import DoubleType, StringType, IntegerType, DateType, TimestampType, StructType, StructField, FloatType

#PySpark ML


#From Python
import pandas as pd

#UDF
from trade import create_orders #This will connect the outcome of th consumer to the Alpaca API
#udf that converts the milisecond timestamp into datetime, in order to extract further information.
@udf("Timestamp")
def datetime(col):
    new_date_col = pd.to_datetime(col, unit="ms")
    return new_date_col

modelPath = "hdfs://localhost:9000/model-registry/BTC"
from pyspark.ml import PipelineModel
savedModel = PipelineModel.load(modelPath)



ohlcvDF = spark.readStream\
               .format("kafka") \
               .option("kafka.bootstrap.servers", "localhost:9092") \
               .option("subscribe", "klines") \
               .option("startingOffsets", "latest") \
               .option("kafka.group.id", "IE") \
               .load() \
               .selectExpr("CAST(value AS STRING)") \
               .select(split("value",'\|').alias("fields")) \
               .withColumn("timestamp_str",col("fields").getItem(0).cast(StringType())) \
               .withColumn("symbol",col("fields").getItem(1)) \
               .withColumn("Open",col("fields").getItem(3).cast(DoubleType())) \
               .withColumn("Close",col("fields").getItem(4).cast(DoubleType())) \
               .withColumn("High",col("fields").getItem(5).cast(DoubleType())) \
               .withColumn("Low",col("fields").getItem(6).cast(DoubleType())) \
               .withColumn("Volume",col("fields").getItem(7).cast(DoubleType())) \
               .withColumn("timestamp",datetime(col("timestamp_str"))) \
               .withColumn("Minute", F.minute("timestamp").cast("double")) \
               .withColumn("Hour", F.hour("timestamp").cast("double")) \
               .withColumn("DayofMonth", F.dayofmonth("timestamp").cast("double")) \
               .withColumn("DayofYear", F.dayofyear("timestamp").cast("double")) \
               .withColumn("DayofWeek", F.dayofweek("timestamp").cast("double")) \
               .withColumn("Year", F.year("timestamp").cast("double")) \
               .withColumn("WeekofYear", F.weekofyear("timestamp").cast("double")) \
               .withColumn("Quarter", F.quarter("timestamp").cast("double")) \
               .withColumn("Postive_Candlestick", F.greatest(col("Open"), col("Close")).cast("double")) \
               .withColumn("Negative_Candlestick", F.least(col("Open"), col("Close")).cast("double")) \
               .withColumn("Upper_shade", col("High")-col("Postive_Candlestick").cast("double")) \
               .withColumn("Lower_shade", col("Negative_Candlestick")-col("Low").cast("double")) \
               .withColumn("High2Low", col("High")/col("Low").cast("double")) \
               .select(["Open", "Close", "High", "Low", "Volume", "Hour", "Minute",
               "DayofMonth", "DayofYear", "DayofWeek", "Year", "WeekofYear", "Quarter",
               "Postive_Candlestick","Negative_Candlestick", 
               "Upper_shade", "Lower_shade", "High2Low"])

predicts = savedModel.transform(ohlcvDF)
side = F.when(
    col("prediction")<0, "sell").otherwise("buy")

symbol = F.when(
    col("prediction")>0, "BTCUSD").otherwise("BTCUSD")

qty = F.when(
    col("prediction")<0, 0.001).otherwise(0)

type_ = F.when(
    col("prediction")>0, "market").otherwise("market")

time_in_force = F.when(
    col("prediction")>0, "gtc").otherwise("gtc")

predicts.withColumn("symbol", symbol) \
        .withColumn("qty", qty) \
        .withColumn("side", side) \
        .withColumn("type", type_) \
        .withColumn("time_in_force", time_in_force) \
        .select(col("symbol"),
                col("qty"),
                col("side"),
                col("type"),
                col("time_in_force"),
                col("prediction"),
                col("minute"),
                col("hour")) \
        .writeStream \
        .foreach(create_orders) \
        .start() \
        .awaitTermination()













