In [1]:
#import libraries
from pyspark.sql import SQLContext, Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import abs, sqrt
from statsmodels.tsa.stattools import adfuller
from numpy import log
import numpy as np, pandas as pd
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import matplotlib.pyplot as plt

In [2]:
#Create dataframe for training data as df1 and test data as df2
df = spark.sql("select * from AAPL_final_csv")
df_list = ['Date', 'Close']
df1 = df \
    .select([banana for banana in df.columns if banana in df_list]) \
    .filter(df['Date'] >= "2015-06-01 00:00:00") \
    .filter(df['Date'] < "2017-03-01 00:00:00")

df2 = df \
    .select([banana for banana in df.columns if banana in df_list]) \
    .filter(df['Date'] >= "2017-03-01 00:00:00") \
    .filter(df['Date'] < "2018-12-31 00:00:00")

#Apply lead to df2
lead_value = 5
w = Window.orderBy("Date")
lead_col = lead('Close',lead_value).over(w)
df2 = df2 \
    .withColumn('Lead_True', lead_col)\
    .dropna()

df2.show()

In [3]:
# convert train df.column to list then to array
df_close_raw =  np.array(df1.select("Close").collect())
df_close = np.zeros(df_close_raw.size)
for i in range(df_close_raw.size):
  df_close[i] = df_close_raw[i][0]

In [4]:
# convert test df.column to list then to array
df_close_raw_test =  np.array(df2.select("Close").collect())
df_close_test = np.zeros(df_close_raw_test.size)
for i in range(df_close_raw_test.size):
  df_close_test[i] = df_close_raw_test[i][0]

df_close_raw_lead_test =  np.array(df2.select("Lead_True").collect())
df_close_lead_test = np.zeros(df_close_raw_lead_test.size)
for i in range(df_close_raw_lead_test.size):
  df_close_lead_test[i] = df_close_raw_lead_test[i][0]

In [5]:
#perform adfuller test to verify the stability of the data, >0.05 not stable, need differencing
result = adfuller(df_close)
print('ADF Statistic: %f' % result[0])
print('p-value: %f' % result[1])

In [6]:
#plot 0, 1, 2, 3 order of log differencing and plot auto correlation
df_close_log = log(df_close)
fig, axes = plt.subplots(4, 2, sharex=True)
plt.rcParams.update({'figure.figsize':(28,20), 'figure.dpi':120})
axes[1, 1].set(ylim=(-0.2,0.4), xlim=(-1,220))
axes[0, 0].plot(df_close_log); axes[0, 0].set_title('Original Series')
plot_acf(df_close_log, ax=axes[0, 1])

# 1st Differencing
axes[1, 0].plot(np.diff(df_close_log)); axes[1, 0].set_title('1st Order Differencing')
plot_acf(np.diff(df_close_log), ax=axes[1, 1])

# 2st Differencing
axes[2, 0].plot(np.diff(np.diff(df_close_log))); axes[2, 0].set_title('2nd Order Differencing')
plot_acf(np.diff(np.diff(df_close_log)), ax=axes[2, 1])

# 3rd Differencing
axes[3, 0].plot(np.diff(np.diff(np.diff(df_close_log)))); axes[3, 0].set_title('3rd Order Differencing')
plot_acf(np.diff(np.diff(np.diff(df_close_log))), ax=axes[3, 1])

display(fig)

In [7]:
#adfuller test to check the stability of the first differencing
result = adfuller(np.diff(df_close_log))
print('ADF Statistic: %f' % result[0])
print('p-value: %f' % result[1])

In [8]:
# PACF plot of 1st differenced series
plt.rcParams.update({'figure.figsize':(12,4), 'figure.dpi':120})
fig, axes = plt.subplots(1, 2, sharex=True)
axes[0].plot(np.diff(df_close_log,1)); axes[0].set_title('1st Differencing')
axes[1].set(ylim=(-0.25,1), xlim=(-1,50))
plot_pacf(np.diff(df_close_log,1), ax=axes[1])
display(fig)

In [9]:
#junk