# Stock Market Advanced Indicator Engineering And Learning

We're going to perform cleansing, profiling and exploratory data analysis on the real stock market data. We will be using the Huge Stock Market Data from [NYSE, NASDAQ, and NYSE MKT](https://www.kaggle.com/datasets/borismarjanovic/price-volume-data-for-all-us-stocks-etfs?datasetId=4538&sortBy=voteCount).

<img src="https://storage.googleapis.com/kaggle-datasets-images/4538/7213/0ef205a10621870d2d873557864474ff/dataset-cover.jpg?t=2017-11-17-03-48-42"/>

## Reading Preprocessed Data From HDFS

In this step, we will read all of the preprocessedd data stored as parquet from the HDFS. Since the data is parquet, we will try to ensure that we infer the schema of the source data.

In [2]:
val input_path = "loudacre/Project/hist_stock_datapre.parquet"
val input_data = spark.read.format("parquet").load(input_path)

In [3]:
z.show(input_data)

In [4]:
input_data.columns.length

In [5]:
input_data.printSchema

## Machine Learning

In this step, we will perform basic machine engineering to explore the potential models for label prediction.

In [7]:
def charr = udf((st:String)=>{
    st.substring(0,1)
})

In [8]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
var df = input_data
df = df.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("date")))
df = df.withColumn("label_new", charr(col("Label")))

In [9]:
val train_data = df.where("rank <= .8").drop("rank")
val test_data = df.where("rank > .8").drop("rank")

In [10]:
print("train size = ", train_data.count)
print("test size = ", test_data.count)

In [11]:
z.show(train_data)

In [12]:
import org.apache.spark.ml.feature.VectorAssembler

val vecAssembler = new VectorAssembler()
  .setInputCols(Array("Open", "Volume"))
  .setOutputCol("features")
  
val vecTrainDF = vecAssembler.transform(train_data)
z.show(vecTrainDF.select("features", "Close"))

In [13]:
import org.apache.spark.ml.regression.LinearRegression

val lr = new LinearRegression()
  .setFeaturesCol("features")
  .setLabelCol("Close")

In [14]:
val lrModel = lr.fit(vecTrainDF)
val m = lrModel.coefficients(0)
val b = lrModel.intercept

In [15]:
import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline().setStages(Array(vecAssembler, lr))
val pipelineModel = pipeline.fit(train_data)

In [16]:
val predDF = pipelineModel.transform(test_data)
predDF.select("features", "Close", "prediction").show(10)

In [17]:
z.show(predDF.limit(100).toDF())

In [18]:
import org.apache.spark.ml.evaluation.RegressionEvaluator
val regressionEvaluator = new RegressionEvaluator()
 .setPredictionCol("prediction")
 .setLabelCol("Close")
 .setMetricName("rmse")
val rmse = regressionEvaluator.evaluate(predDF)
println(f"RMSE is $rmse%.1f")

val r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
println(s"R2 is $r2")

In [19]:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
val categoricalCols = train_data.dtypes.filter{ case (field, dataType) =>
 field == "label_new"}.map(_._1)
val indexOutputCols = categoricalCols.map(_ + "_index")
val oheOutputCols = categoricalCols.map(_ + "_OHE")

val stringIndexer = new StringIndexer()
 .setInputCols(categoricalCols)
 .setOutputCols(indexOutputCols)
 .setHandleInvalid("skip")
 
val oheEncoder = new OneHotEncoder()
 .setInputCols(indexOutputCols)
 .setOutputCols(oheOutputCols)

val numericCols = train_data.dtypes.filter{ case (field, dataType) =>
 field == "Open" || field == "Volume"}.map(_._1)

val assemblerInputs = oheOutputCols ++ numericCols
val vecAssembler = new VectorAssembler()
 .setInputCols(assemblerInputs)
 .setOutputCol("features")

In [20]:
val lr = new LinearRegression()
 .setLabelCol("Close")
 .setFeaturesCol("features")
val pipeline = new Pipeline()
 .setStages(Array(stringIndexer, oheEncoder, vecAssembler, lr))

In [21]:
val pipelineModel = pipeline.fit(train_data)
 
val predDF = pipelineModel.transform(test_data)
predDF.select("features", "Close", "prediction").show(5)

val rmse = regressionEvaluator.evaluate(predDF)
println(f"RMSE is $rmse%.1f")

val r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
println(s"R2 is $r2")

In [22]:
z.show(predDF.limit(100).toDF())

## Advanced Technical Indicator Engineering

In this step, we will perform advanced Technical Indicator engineering to explore the potential Technical Indicator to show on dashboard.

In [24]:
val exp_data = input_data.select("Date", "Close")
val fil = exp_data.where($"Label" === "bgr")
z.show(fil)

## Technical Indicators

In this step, we will perform advanced feature engineering to explore the potential features for our future model training.

In [26]:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.collection.mutable.ArrayBuffer

In [27]:
val w = org.apache.spark.sql.expressions.Window.orderBy("Date")  
import org.apache.spark.sql.functions.lag
val xy = input_data.where($"Label" === "odp").select("Date", "Close")
xy.show()

In [28]:
val mm = xy.withColumn("Close_new", lag("Close", 1, 0).over(w))
mm.show()

In [29]:
val momentum = mm.withColumn("momentum_1d",
    (col("Close") - col("Close_new"))
    )
momentum.show()

In [30]:

import scala.collection.mutable.WrappedArray
val rsi = udf((values: WrappedArray[Double])=> {
    val up_temp = values.filter(_ > 0)
    val down_temp = values.filter(_ < 0)
    val up: Double = up_temp.sum/up_temp.length
    var down: Double = -1* (down_temp.sum/down_temp.length)
    if(down_temp.length == 0){
       down = 0.0 
    }
    // print("up = ", up)
    // print("down = ", down)
    100 * up/(up+down)
})

In [31]:
import org.apache.spark.sql.expressions.WindowSpec
val df_rsi_temp = momentum.withColumn("RSI_IM", ((collect_list("momentum_1d"))
            .over(org.apache.spark.sql.expressions.Window.orderBy(col("Date")).rowsBetween(-14,0))))
z.show(df_rsi_temp)

In [32]:
val df_rsi = df_rsi_temp.withColumn("RSI", 
                rsi(col("RSI_IM")))
z.show(df_rsi)

In [33]:
df_rsi.limit(100).toDF().select("Close", "Close_new", "momentum_1d", "RSI").show()

In [34]:
def bbands()={
    val df_bb_temp = df_rsi.withColumn("BB_IM", ((collect_list("momentum_1d"))
            .over(org.apache.spark.sql.expressions.Window.orderBy(col("Date")).rowsBetween(-20,0))))
    df_bb_temp
}

In [35]:
val bb_df = bbands()
z.show(bb_df)

In [36]:
import scala.collection.mutable.WrappedArray
val bband_avg = udf((values: WrappedArray[Double])=> {
    values.sum/values.length
})
val bband_std = udf((a: WrappedArray[Double]) => {
    val mean = a.sum / a.length
    val squareErrors = a.map(x => x - mean).map(x => x * x)
    math.sqrt(squareErrors.sum / a.length)
})

In [37]:
val df_bb_avg = bb_df.withColumn("BB_AVG", 
                bband_avg(col("BB_IM")))
val df_bb_std = df_bb_avg.withColumn("BB_STD", 
                bband_std(col("BB_IM")))
z.show(df_bb_std)

In [38]:
val df_bb_lower = df_bb_std.withColumn("BB_Lower_Band", 
                (col("BB_AVG") - (col("BB_STD")*20))).drop("RSI_IM").drop("BB_IM")
z.show(df_bb_lower)

In [39]:
val df_bb_upper = df_bb_lower.withColumn("BB_Upper_Band", 
                (col("BB_AVG") + (col("BB_STD")*20))).drop("RSI_IM").drop("BB_IM")
z.show(df_bb_upper)

In [40]:
val df_bb = df_bb_upper.withColumn("BB_Middle_Band", 
                (col("BB_AVG"))).orderBy(desc("Date"))
z.show(df_bb)