<a href="https://colab.research.google.com/github/iamchetry/Stock-Price-Prediction/blob/main/Stock_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark
!pip install elephas

import pandas as pd
from matplotlib.pyplot import *

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.feature import StandardScaler, VectorAssembler, QuantileDiscretizer
from pyspark.sql.functions import rand, lead, mean, stddev, col, udf, lit
from pyspark.ml import Pipeline
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation, LSTM
from tensorflow.keras import optimizers, regularizers
from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras.callbacks import EarlyStopping, Callback

from elephas.ml_model import ElephasEstimator
from google.cloud import bigquery

from google.colab import drive
drive.mount('/content/drive')

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/content/drive/My Drive/stock-data-analysis-cse560-b3fc65360aa2.json'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Spark Session
conf = SparkConf().setAppName('Stock Price Prediction').setMaster('local[2]')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)

In [None]:
# Connect to BigQuery
bqclient = bigquery.Client()

def fetch_from_bquery(name=None):
  # Download query results.
  query_string = ''' select * from stock-data-analysis-cse560.StockInfoDatabase.stock_history where Symbol={};'''.format(name)

  df_hist = (
      bqclient.query(query_string)
      .result()
      .to_dataframe(
          # Optionally, explicitly request to use the BigQuery Storage API. As of
          # google-cloud-bigquery version 1.26.0 and above, the BigQuery Storage
          # API is used by default.
          create_bqstorage_client=True,
      )
  )

  df_hist.to_csv('df_stock_hist.csv', index=False)
  !mv df_stock_hist.csv '/content/drive/My Drive/'

In [None]:
# Load Spark df
fetch_from_bquery(name='GOOGL')
df_hist = sql_context.read.csv('/content/drive/My Drive/df_stock_hist.csv', header=True, inferSchema=True)[['date_', 'Open']]
df_hist.orderBy('date_', ascending=True, inplace=True)

DataFrame[date_: string, Open: double]

In [None]:
# Creating Lagged Features
for _ in range(1, 61):
  df_hist = df_hist.withColumn('Open_{}'.format(_), lead('Open', _).over(Window.orderBy('date_')))

df_hist = df_hist.drop('date_')
df_hist = df_hist.withColumnRenamed('Open_60', 'target_price').withColumnRenamed('Open', 'Open_0')
df_hist = df_hist.na.drop()

In [None]:
# Train Test Split
discretizer = QuantileDiscretizer(numBuckets=10, inputCol='target_price', outputCol='bins')

df_hist = discretizer.fit(df_hist).transform(df_hist)
df_hist.bins = df_hist.bins.astype('int')

train = df_hist.sampleBy('bins', fractions={0: 0.8, 1: 0.8, 2: 0.8, 3: 0.8, 4: 0.8, 5: 0.8, 6: 0.8, 7: 0.8, 8: 0.8, 9: 0.8}, seed=10)
test = df_hist.subtract(train)

df_hist = df_hist.drop('bins')
train = train.drop('bins')
test = test.drop('bins')

In [None]:
# Calculate Mean and STD for each column in Train data
mean_list = list()
std_list = list()

for col_ in train.columns:
  df_stats = train.select(mean(col(col_)).alias('avg_{}'.format(col_)), stddev(col(col_)).alias('std_{}'.format(col_))).collect()
  mean_list.append(df_stats[0]['avg_{}'.format(col_)])
  std_list.append(df_stats[0]['std_{}'.format(col_)])

In [None]:
# Scale Data
def z_score(x, mean_, std_):
  return (x - mean_)/std_

scale_ = udf(lambda x, mean_, std_: z_score(x, mean_, std_), DoubleType())

for _, col_ in enumerate(list(df_hist.columns)):
  mean_ = mean_list[_]
  std_ = std_list[_]

  train = train.withColumn(col_+'_scaled', scale_(df_hist[col_], lit(mean_), lit(std_)))
  test = test.withColumn(col_+'_scaled', scale_(df_hist[col_], lit(mean_), lit(std_)))

In [None]:
# Create Feature Vector and Target Variable
assembler = VectorAssembler(inputCols=['Open_{}_scaled'.format(_) for _ in range(60)], outputCol='features')

train = assembler.transform(train).select(['features', 'target_price_scaled'])
test = assembler.transform(test).select(['features', 'target_price_scaled'])

In [None]:
# LSTM Structure
input_dim = len(train.select("features").first()[0])
model = Sequential()

model.add(LSTM(units = 128, return_sequences = True, input_shape = (input_dim, 1),
               activity_regularizer=regularizers.l2(0.25)))
model.add(Dropout(0.25))
model.add(LSTM(units = 128, return_sequences = True,
               activity_regularizer=regularizers.l2(0.25)))
model.add(Dropout(0.25))
model.add(LSTM(units = 128, return_sequences = True,
               activity_regularizer=regularizers.l2(0.25)))
model.add(Dropout(0.25))
model.add(LSTM(units = 128, activity_regularizer=regularizers.l2(0.25)))
model.add(Dropout(0.25))
model.add(Dense(units = 1))

model.compile(optimizer='adam', loss = 'mean_squared_error')
cb = [EarlyStoppingByLossVal(monitor='val_loss', value=0.03, verbose=2)]
model.fit(x_train, y_train, epochs = 1000, validation_split=0.2, batch_size = 128, verbose=2, callbacks=cb)

sgd = optimizers.SGD(learning_rate=0.001, momentum=0.9)
sgd_conf = optimizers.serialize(sgd)

In [None]:
# Initialize Elephas Spark ML Estimator
estimator = ElephasEstimator()

estimator.set_keras_model_config(model.to_yaml())
estimator.set_optimizer_config(sgd_conf)
estimator.set_mode("synchronous")
estimator.set_loss("mae")
estimator.set_metrics(['mse'])
estimator.set_epochs(1000)
estimator.set_batch_size(128)
estimator.set_validation_split(0.2)
estimator.set_categorical_labels(False)

ElephasEstimator_9e2722c71094

In [None]:
# Fitting a model returns a Transformer
pipeline = Pipeline(stages=[estimator])
fitted_pipeline = pipeline.fit(train)

# Evaluate Spark model by evaluating the underlying model
prediction = fitted_pipeline.transform(test)
pnl = prediction.select("label", "prediction")

prediction_and_label = pnl.rdd.map(lambda row: (row.label, row.prediction))
metrics = RegressionMetrics(prediction_and_label)
print(metrics.r2)
print(metrics.meanAbsoluteError)
print(metrics.rootMeanSquaredError)

In [None]:
# Evaluate Spark model
prediction = fitted_pipeline.transform(train)
pnl = prediction.select("index_category", "prediction")
pnl.show(100)
