## AAPL Stock Price Prediction by Rolling ARIMA Model on Spark
#### Author: ixihao
#### Environment: Databricks
#### Language: Python (pyspark) & Scala
#### Package: sparkts_0.4.1
#### Data: Stock price of Apple from Yahoo finance.
#### Description: In this project, I try to build a ARIMA model to predict the AAPL stock price and evaluate the predict result. First, I will load and preprocess the data download from Yahoo Finance. Then, initialize the ARIMA model in scala.

In [2]:
#import math
#import matplotlib
#import numpy as np
import pandas as pd

#import seaborn as sns
#import time
#import statsmodels.api as sm

#from pyspark.sql.functions import isnull
#from pyspark.sql import functions as F
#from datetime import date, datetime, time, timedelta
from pyspark.sql.types import DateType
from pyspark.sql.functions import lit

# File location and type
file_location = "/FileStore/tables/AAPL__1_-bf1ee.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
aapl_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(aapl_df.take(5))

Date,Open,High,Low,Close,Adj Close,Volume
1980-12-12,0.513393,0.515625,0.513393,0.513393,0.023007,117258400
1980-12-15,0.488839,0.488839,0.486607,0.486607,0.021807,43971200
1980-12-16,0.453125,0.453125,0.450893,0.450893,0.020206,26432000
1980-12-17,0.462054,0.464286,0.462054,0.462054,0.020706,21610400
1980-12-18,0.475446,0.477679,0.475446,0.475446,0.021307,18362400


In [3]:
# Check the datatype of each column
aapl_df.dtypes

In [4]:
# Transform columns to the appropriate datatype and remove columns we don't need
price_df = aapl_df.select(aapl_df.Date.cast(DateType()).alias('Date'),
               aapl_df.Close.cast('float').alias('Close'))

# Check the datatype of each column
price_df.dtypes

In [5]:
# Remove rows that contain Null value (1 Null record)
price_df = price_df.filter("Date is not null and Close is not null")

# Check the size of dataframe
price_df.count()

In [6]:
# Add a column 'Stock' which is a string key of the stock price in order to prepare for the TimeSeriesRDD in Spark_TimeSeries library
price_df = price_df.withColumn("Stock",lit("AAPL"))

# Store the dataframe into a csv file in order to share with Scala REPL
price_df.write.option("header", "true").csv("/FileStore/tables/aapl_price4.csv")

display(price_df.take(5))

Date,Close,Stock
1980-12-12,0.513392984867096,AAPL
1980-12-15,0.4866069853305816,AAPL
1980-12-16,0.4508930146694183,AAPL
1980-12-17,0.4620540142059326,AAPL
1980-12-18,0.4754459857940674,AAPL


In [7]:
%scala
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.sql.types.DoubleType
import com.cloudera.sparkts.models.{ARIMA, ARIMAModel}
import com.cloudera.sparkts.{DateTimeIndex, DayFrequency, TimeSeriesRDD}
import java.time.{ZoneId, ZonedDateTime}
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.linalg.DenseVector
import org.joda.time.DateTime

// Load data from file in HDFS and format the data
val path = "/FileStore/tables/aapl_price4.csv"
val formattedData = spark.read.option("header", "true").csv(path)
val priceDf = formattedData
      .withColumn("Date", to_timestamp(formattedData("date"), "yyyy-MM-dd"))
      .withColumn("Close", formattedData("close").cast(DoubleType))
      .sort("Date")

//val dd = priceDf.filter("Date > '2010-10-22 00:00:00'")
//val dd = priceDf.filter((unix_timestamp(current_timestamp()) - $"timestamp")/60 < 10)
/*
val minDate = priceDf.select(min("Date")).collect()(0).getTimestamp(0)
val maxDate = priceDf.select(max("Date")).collect()(0).getTimestamp(0)
val zone = ZoneId.systemDefault()
val dtIndex = DateTimeIndex.uniformFromInterval(
      ZonedDateTime.of(minDate.toLocalDateTime, zone),
      ZonedDateTime.of(maxDate.toLocalDateTime, zone),
      new DayFrequency(1))
*/

// Extract date from priceDF
val dateList = priceDf.select("Date").collect()
val dateListLength = dateList.size

// Initialize Array[Double] to store the predicted price
var day: Array[Double] = Array()
var week1: Array[Double] = Array()
var week2 : Array[Double] = Array()
var month1: Array[Double] = Array()
var month4 : Array[Double] = Array()


val rollingNum = 2000
val trainingNum = 100
// Moving ARIMA start
for(i <- dateListLength-rollingNum to dateListLength-1){
  println("### No." + i)
  // Initialize a TimeSeriesRDD
  val minDate = dateList(i-trainingNum).getTimestamp(0).toLocalDateTime()
  val maxDate = dateList(i).getTimestamp(0).toLocalDateTime()
  val zone = ZoneId.systemDefault()
  val dtIndex = DateTimeIndex.uniformFromInterval(
        ZonedDateTime.of(minDate, zone),
        ZonedDateTime.of(maxDate, zone),
        new DayFrequency(1))
  val tsRdd: TimeSeriesRDD[String] = TimeSeriesRDD.timeSeriesRDDFromObservations(
        targetIndex = dtIndex,
        df = priceDf,
        tsCol = "Date",
        keyCol = "Stock",
        valueCol = "Close")
  // Number of days we want to predict (26 Trading days a month, 4 months)
  val days = 104
  println(minDate)
  println(maxDate)
  // Fill in missing values based on linear interpolation (Not everyday is trading day)
  val tsRdd_filled = tsRdd.fill("linear")
  
  // Find the best parameter in ARIMA
  val arimaAndVectorRdd = tsRdd_filled.map {
        case (key, denseVector) =>
          (ARIMA.autoFit(denseVector), denseVector)
      }
  val coefficients = arimaAndVectorRdd.map {
        case (arimaModel, denseVector) => {
          (arimaModel.coefficients.mkString(","),
            (arimaModel.p, arimaModel.d, arimaModel.q))
        }
      }
  coefficients.collect().foreach {
        case (coefficients, (p, d, q)) =>
          println("coefficients:" + coefficients + "=>" + "p = " + p + ", d = " + d + ", q = " + q)
      }

  // Predict next n days value
  val forecast = arimaAndVectorRdd.map {
    case (arimaModel, denseVector) => {
      arimaModel.forecast(denseVector, days)
    }
  }
  val forecastValue = forecast.map(_.toArray.mkString(","))

  // Extrace predicted value (Remove the past value)
  val preditcedValueRdd = forecastValue.map { parts =>
    val partArray = parts.split(",")
    for (i <- partArray.length - days until partArray.length) yield partArray(i)
  }.map(_.toArray.mkString(","))
  
  /*
  // Print the predicted value
  preditcedValueRdd.collect().foreach { row =>
    println("forecast of next " + days + " observations: " + row)
  }
  */
  // Update the Array[Double] initialized before
  //val forvalue: Array[Double] = preditcedValueRdd.toArray()
  val forvalue: Array[Double] = preditcedValueRdd.collect().mkString("").split(",").map(_.toDouble)
  //val forvalue: Array[Double] = preditcedValueRdd.collect().size
  // 5 trading days a week, 26 trading days a month
  day :+= forvalue(0)
  week1 :+= forvalue(4) 
  week2 :+= forvalue(9)
  month1 :+= forvalue(25) 
  month4 :+= forvalue(103)
  println(forvalue(0))
  println(forvalue(4))
  println(forvalue(9))
  println(forvalue(25))
  println(forvalue(103))
}
/*
//Version 2

//val features = priceDf.select("features")
//val featurestd = features.rdd.map(_.getAs[Vector]("features"))
//val tt = priceDf.head(2000).select("Close").as[Double].collect()
val stock_array = priceDf.select("Close").as[Double].collect()
val stock_array_length = stock_array.size

var day = Array(0.0)
var week1 = Array(0.0)
var week2 = Array(0.0)
var month1 = Array(0.0)
var month4 = Array(0.0)

for(i <- 0 to stock_array_length){
println(i)
      
var train_data_array : Array[Double] = stock_array.slice(i,i+100)
val train_data_vector = new DenseVector(train_data_array)
//println(train_data_vector.size)

val maxP = 20
val maxD = 10
val maxQ = 20
val arima = ARIMA.autoFit(train_data_vector,maxP,maxD,maxQ)
println("is stationary:"+arima.isStationary())
println("AIC value:"+arima.approxAIC(train_data_vector))
print(s"best model：p is:${arima.p},d is:${arima.d},q is ${arima.q}")

//Train the values

val arimaModel = ARIMA.fitModel(arima.p, arima.d, arima.q, train_data_vector)


//Get the forecasted value and convert into the dense vector
val forecasted = arimaModel.forecast(train_data_vector, days)
val forvalue = new DenseVector(forecasted.toArray.slice(forecasted.size - days, forecasted.size))
//println(forvalue.size)
day :+= forvalue(0)
week1 :+= forvalue(4) 
week2 :+= forvalue(9)
month1 :+= forvalue(25) 
month4 :+= forvalue(103) 
}
*/
/*
val arimaModel = ARIMA.fitModel(1, 0, 1, featuresRDD)
println("coefficients: " + arimaModel.coefficients)
val forecast = arimaModel.forecast(ts, 20)
println("forecast of next 20 observations: " + forecast.toArray.mkString(","))
*/

In [8]:
%scala
case class Row(day: Double, week1: Double, week2: Double, month1: Double, month4: Double)

val xs = Array(day, week1, week2, month1, month4).transpose
val rdd = sc.parallelize(xs).map(ys => Row(ys(0), ys(1), ys(2), ys(3), ys(4)))
val dfdd = rdd.toDF("day", "week1", "week2", "month1", "month4")
dfdd.display()

In [9]:
%scala
dfdd.write.csv("/FileStore/tables/predict2.csv")