In [54]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import *
from sklearn.preprocessing import MinMaxScaler
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
import matplotlib.pyplot as plt

In [55]:
# read file from datalake
datalake_nm = 'datalake'+mssparkutils.env.getWorkspaceName()[7:] # get datalake name 
file_path ='abfss://files@{0}.dfs.core.windows.net/synapse/workspaces/data/stock_price'.format(datalake_nm)
df = spark.read.parquet(file_path )

## 01 Data preprocessing

In [56]:
symbol_list=['NVDA','AMD','INTC','QCOM','GOOG','MSFT','AMZN','AAPL']

# Filter data for NVDA and AAPL
df = df.withColumn('close',col('close').cast('float')).withColumn('date',to_date(col('date')))

# select data in previous 720 days
df_list = {symbol:df.filter(col("symbol") == symbol).select("date", "close").orderBy("date", ascending=False).limit(720) for symbol in symbol_list}

# Store stock price in dict 
price_list = {}
for symbol,sdf in df_list.items():
    pdf = sdf.toPandas().iloc[::-1].reset_index(drop=True)
    price = pdf['close'].values
    price_list[symbol]=(price)

# Create scaler for each ticker
scaler_dict = {}

for symbol in symbol_list:
    scaler_dict[symbol] = MinMaxScaler()

# Get scaled value and store in dict 
scaled_value_dict = {}

for symbol in symbol_list:
    scaled_value_dict[symbol] = scaler_dict[symbol].fit_transform(price_list[symbol].reshape(-1, 1))

# 02 Training model 

In [57]:
# Create sequences for LSTM
def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length])
    return np.array(X), np.array(y)

# Define LSTM model function
def build_lstm_model(input_shape):
    model = Sequential([
        LSTM(50, return_sequences=True, input_shape=(input_shape)),
        Dropout(0.2),
        LSTM(50, return_sequences=False),
        Dropout(0.2),
        Dense(25),
        Dense(1)
    ])
    model.compile(optimizer='adam', loss='mean_squared_error')
    return model

seq_length = 30  # Adjust as needed

model_dict = {}

for symbol,scaled_value in scaled_value_dict.items():

    X_, y_ = create_sequences(scaled_value, seq_length)
    X_= X_.reshape((X_.shape[0], X_.shape[1], 1))

    model = build_lstm_model(input_shape=(X_.shape[1], X_.shape[2]))
    model.fit(X_, y_, epochs=50, batch_size=32, verbose=1)
    model_dict[symbol] = model


In [61]:
# Function to predict next n days
def predict_next_days(model, data, scaler, seq_length, future_days):
    predicted = []
    last_sequence = data[-seq_length:]
    for _ in range(future_days):
        next_day_prediction = model.predict(last_sequence.reshape(1, seq_length, 1))[0,0]
        predicted.append(next_day_prediction)
        last_sequence = np.append(last_sequence[1:], next_day_prediction)
    return scaler.inverse_transform(np.array(predicted).reshape(-1, 1)).flatten()

# Number of future days to predict
future_days = 14
prediction_list = {}

for Ticker in symbol_list:

    predictions = predict_next_days(model_dict[Ticker], scaled_value_dict[Ticker], scaler_dict[Ticker], seq_length, future_days)
    prediction_list[Ticker] = predictions


In [71]:
# write data into datalake
spdf = spark.createDataFrame(pd.DataFrame(prediction_list))
datalake_nm = 'datalake'+mssparkutils.env.getWorkspaceName()[7:] # get datalake name 
file_path ='abfss://files@{0}.dfs.core.windows.net/synapse/workspaces/data/Stock_predictions'.format(datalake_nm)
spdf.write.parquet(file_path,mode='overwrite')