In [5]:
!pip install yfinance

  pid, fd = os.forkpty()




In [6]:
# Data Imports
import yfinance as yf
import pandas as pd 
import numpy as np 
import random

from sklearn.preprocessing import MinMaxScaler
from pandas.tseries.offsets import BDay

# Visualization Imports
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.express as px
import plotly.io as pio
import scipy.stats as stats

# Neural Network Imports
import tensorflow as tf
from tensorflow.keras import models
from tensorflow.keras import regularizers
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Bidirectional
from tensorflow.keras.callbacks import ModelCheckpoint

# Setting seed
SEED = 0
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

# Visualization Configurations
pio.templates.default = "plotly_dark"
%config InlineBackend.figure_format = 'retina'

In [7]:
def show(data: pd.DataFrame):
    df = data.copy()
    df = df.style.format(precision=3)
    df = df.background_gradient(cmap='Reds', axis=0)
    display(df)

def highlight_half(data: pd.DataFrame, axis=1, precision=3):
    
    s = data.shape[1] if axis else data.shape[0]
    data_style = data.style.format(precision=precision)

    def apply_style(val):
        style1 = 'background-color: red; color: white'
        style2 = 'background-color: blue; color: white'
        return [style1 if x < s//2 else style2 for x in range(s)]

    display(data_style.apply(apply_style, axis=axis))

In [8]:
# Target stock & columns for modeling
SYMBOL = "TSLA"
columns = ['open', 'high', 'low', 'close', 'volume']

# Getting Tesla (TSLA) stock data
ticker = yf.Ticker(SYMBOL)

# End stock dates
end_date = "2023-12-01"

# Pulling stock data 
df = ticker.history(start="2017-01-01", end=end_date)
df.columns = df.columns.str.lower()

# Showing data
show(df.tail())

Unnamed: 0_level_0,open,high,low,close,volume,dividends,stock splits
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2023-11-24 00:00:00-05:00,233.75,238.75,232.33,235.45,65125200,0.0,0.0
2023-11-27 00:00:00-05:00,236.89,238.33,232.1,236.08,112031800,0.0,0.0
2023-11-28 00:00:00-05:00,236.68,247.0,234.01,246.72,148549900,0.0,0.0
2023-11-29 00:00:00-05:00,249.21,252.75,242.76,244.14,135401300,0.0,0.0
2023-11-30 00:00:00-05:00,245.14,245.22,236.91,240.08,132353200,0.0,0.0


In [9]:
# Data info
print('Data Info:')
df.info()

Data Info:
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1740 entries, 2017-01-03 00:00:00-05:00 to 2023-11-30 00:00:00-05:00
Data columns (total 7 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   open          1740 non-null   float64
 1   high          1740 non-null   float64
 2   low           1740 non-null   float64
 3   close         1740 non-null   float64
 4   volume        1740 non-null   int64  
 5   dividends     1740 non-null   float64
 6   stock splits  1740 non-null   float64
dtypes: float64(6), int64(1)
memory usage: 108.8 KB


In [10]:
def plot_candlestick(stock_df, name='', rolling_avg=None, fig_size=(1100, 700)):
    """
    Plot a candlestick chart for the given stock dataframe with optional rolling averages.
    
    Args:
        stock_df (pd.DataFrame): The stock data as a pandas DataFrame.
        name (str): The name of the stock, defaults to 'Tesla'.
        rolling_avg (list of int, optional): A list of integers for rolling average window sizes.
        fig_size (tuple): The figure size, defaults to (1100, 700).
    """
    # Copy df to avoid modifying the original data
    stock_data = stock_df.copy()
    
    # Creating plot
    fig = go.Figure(data=[go.Candlestick(x=stock_data.index,
        close=stock_data['close'], open=stock_data['open'], high=stock_data['high'], low=stock_data['low'], 
        name="Candlesticks", increasing_line_color='green', decreasing_line_color='red', line=dict(width=1)
                                        )])
    # Rolling averages if specified
    if rolling_avg:
        colors = ['rgba(0, 255, 255, 0.5)',   # cyan
                  'rgba(255, 255, 0, 0.5)',   # yellow
                  'rgba(255, 165, 0, 0.5)',   # orange
                  'rgba(255, 105, 180, 0.5)', # pink
                  'rgba(165, 42, 42, 0.5)',   # brown
                  'rgba(128, 128, 128, 0.5)', # gray
                  'rgba(128, 128, 0, 0.5)',   # olive
                  'rgba(0, 0, 255, 0.5)']     # blue
        
        for i, avg in enumerate(rolling_avg):
            color = colors[i % len(colors)]
            ma_column = f'{avg}-day MA'
            stock_data[ma_column] = stock_data['close'].rolling(window=avg).mean()

            # Moving average trace
            fig.add_trace(go.Scatter(x=stock_data.index, y=stock_data[ma_column],
                    mode='lines', name=f'{avg}-day Moving Average', line=dict(color=color)))

    # Layout updates
    fig.update_layout(title=f"{name} Stock Price - Candlestick Chart",
                      xaxis_title="Date", yaxis_title="Price",
                      width=fig_size[0], height=fig_size[1],
                      xaxis=dict(
                          rangeselector=dict(
                              buttons=list([
                                  dict(count=14, label="2w", step="day", stepmode="backward"),
                                  dict(count=1, label="1m", step="month", stepmode="backward"),
                                  dict(count=3, label="3m", step="month", stepmode="backward"),
                                  dict(count=6, label="6m", step="month", stepmode="backward"),
                                  dict(count=1, label="YTD", step="year", stepmode="todate"),
                                  dict(count=1, label="1y", step="year", stepmode="backward"),
                                  dict(count=2, label="2y", step="year", stepmode="backward"),
                                  dict(count=3, label="3y", step="year", stepmode="backward"),
                                  dict(count=5, label="5y", step="year", stepmode="backward"),
                                  dict(step="all")]),
                              bgcolor='pink',
                              font=dict(color='black'),
                              activecolor='lightgreen'))
                     )
    fig.show()

In [11]:
# General Tesla stocks
plot_candlestick(df, name=SYMBOL)

# With Moving averages
plot_candlestick(df, name=SYMBOL, rolling_avg=[20, 50, 200])


In [12]:
# Plotting stock splits
fig = px.line(x=df.index, y=df['stock splits'], title=f'{SYMBOL} Stock Splits Over Time')
fig.update_layout(width=1100, height=500)
fig.update_traces(line=dict(color='cyan', width=3))
fig.update_xaxes(title_text='Date')
fig.update_yaxes(title_text='Stock Splits')
fig.show()

In [13]:
# Creating subplot
fig = make_subplots(rows=2, cols=2, column_widths=[0.7, 0.3],
                    vertical_spacing=0.1, horizontal_spacing=0.05,
                    subplot_titles=(f"{SYMBOL} - Percent Change over Time", f"{SYMBOL} Percent Change - Histogram",
                                    f"{SYMBOL} - Stock Volume over Time", f"{SYMBOL} Stock Volume - Histogram",))
# Percent Change Plot
percent_change = df['close'].pct_change() * 100
fig.add_trace(go.Scatter(x=df.index, y=percent_change, name='Percent Change', marker_color='darkorchid'), row=1, col=1)
fig.add_trace(go.Histogram(x=percent_change, nbinsx=50, name='Percent Change', marker_color='darkorchid'),  row=1, col=2)
fig.add_annotation(text=f"Mean: {percent_change.mean():.2f}%<br>Std Dev: {percent_change.std():.2f}%",
                   xref='x2', yref='y2', x=percent_change.mean(), y=5, showarrow=True)
# Volume Plot
fig.add_trace(go.Scatter(x=df.index, y=df['volume'], name='Volume', marker_color='darkcyan'), row=2, col=1)
fig.add_trace(go.Histogram(x=df['volume'], nbinsx=50, name='Daily Volume', marker_color='darkcyan'),  row=2, col=2)
fig.add_annotation(text=f"Mean: {df['volume'].mean():.2f}<br>Std Dev: {df['volume'].std():.2f}",
                   xref='x4', yref='y4', x=df['volume'].mean(), y=5, showarrow=True)

fig.update_layout(height=700, width=1100)
fig.show()

In [14]:
class RNNFormater:
    
    def __init__(self, data: pd.DataFrame, mapping_steps=10):
        """
        Initialize the RNNFormater with a DataFrame and steps to map for data.
        
        Args:
            data (pd.DataFrame): Input DataFrame containing time series data.
            mapping_steps (int): Number of time steps for each input sequence to be mapped to output.
        """
        # Storing data
        self.df = data.copy()
        self.data = self.df.values

        # Scaler stored for usage later
        self.scaler = MinMaxScaler()
        self.normalized_data = self.scaler.fit_transform(self.data)
        
        self.time_steps = data.shape[0]
        self.n_columns = data.shape[1]

        # Number of mapping steps
        self.mapping_steps = mapping_steps

    def data_mapping(self):
        """
        Maps a 2D array into a 3D array for RNNs input, with each sequence having mapping_steps time steps.
    
        Args:
            mapping_steps (int): Number of time steps for each sequence.
    
        Returns:
            np.array: A 3D array suitable for RNN inputs.
        """
        mapping_steps = self.mapping_steps + 1
        
        mapping_iterations = self.time_steps - mapping_steps + 1
        self.normalized_data_mapped = np.empty((mapping_iterations, mapping_steps, self.n_columns))
        
        for i in range(mapping_iterations):
            self.normalized_data_mapped[i, :, :] = self.normalized_data[i:i + mapping_steps, :]
        
        return self.normalized_data_mapped
    
    def rnn_train_test_split(self, test_percent=0.1):
        """
        Splits the 3D mapped data into training and testing sets for an RNN.
        
        Args:
            test_percent (float): The fraction of data to be used for testing.
        
        Returns:
            tuple: X_train, X_test, y_train, y_test
        """
        self.test_size = int(np.round(self.normalized_data_mapped.shape[0] * test_percent))
        self.train_size = self.normalized_data_mapped.shape[0] - self.test_size
        
        X_train = self.normalized_data_mapped[:self.train_size, :-1, :]
        y_train = self.normalized_data_mapped[:self.train_size, -1, :]
        
        X_test = self.normalized_data_mapped[self.train_size:, :-1, :]
        y_test = self.normalized_data_mapped[self.train_size:, -1, :]
        
        return X_train, X_test, y_train, y_test  

    def forecast_n_steps(self, model, data: pd.DataFrame, n_forecast_steps=30):
        """
        Forecast multiple steps ahead using the LSTM model.
    
        Args:
            model (tf.keras.Model): Trained LSTM model for prediction.
            data (pd.DataFrame): Input DataFrame containing the latest time series data.
            n_forecast_steps (int): Number of future steps to forecast.
    
        Returns:
            np.array: Forecasted values for n_forecast_steps.
        """
        # Scaling the latest 'mapping_steps' data for mapping
        last_steps = self.scaler.transform(data.values)[-self.mapping_steps:]
    
        # Initialize normalized_data_mapped array
        normalized_data_mapped = np.empty((n_forecast_steps, self.mapping_steps, self.n_columns))
    
        # Initialize predictions array
        predictions = np.empty((n_forecast_steps, self.n_columns))
    
        # Predict the first step
        normalized_data_mapped[0, :, :] = last_steps
        predictions[0, :] = model.predict(
            normalized_data_mapped[0, :, :].reshape(1, self.mapping_steps, self.n_columns),
            verbose=False
        )
        # Generate predictions and update normalized_data_mapped for each subsequent step
        for i in range(1, n_forecast_steps):
            # Shift the window and insert new prediction at end
            normalized_data_mapped[i, :-1, :] = normalized_data_mapped[i - 1, 1:, :]
            normalized_data_mapped[i, -1, :] = predictions[i - 1, :]
    
            # Predicting next step
            norm_data = normalized_data_mapped[i, :, :].reshape(1, self.mapping_steps, self.n_columns)
            predictions[i, :] = model.predict(norm_data, verbose=False)
    
        # Inverse transform the predictions to original scale
        predictions = self.scaler.inverse_transform(predictions)
        return predictions

In [15]:
# Initializing class
mapping_steps = 32 # ~ 1 months in buisness days
rnn_formater = RNNFormater(df[columns], mapping_steps=mapping_steps)

# Mapping steps
norm_data_mapped = rnn_formater.data_mapping() # n_steps -> y
# print(f'Mapped Normalized data step 0:\n{norm_data_mapped[0]}')
print(f'Normalized data shape: {norm_data_mapped[0].shape}')

Normalized data shape: (33, 5)


In [16]:
# Train Test Split
X_train, X_test, y_train, y_test = rnn_formater.rnn_train_test_split(test_percent=0.05)
print(f'Number of time steps for test set: {rnn_formater.test_size}')

print(f'X shape: {X_train.shape}')
# print(X_train[0])

print(f'y shape: {y_train.shape}')
# print(y_train[0])

Number of time steps for test set: 85
X shape: (1623, 32, 5)
y shape: (1623, 5)


In [17]:
# For consistant results
random.seed(0)
np.random.seed(0)
tf.random.set_seed(0)

# Vanilla LSTM
model = models.Sequential([
    LSTM(units=80, input_shape=(mapping_steps, len(columns))),
    Dropout(0.05),
    Dense(units=len(columns))   
])

# Compiling model
model.compile(optimizer='adam', loss='mae')
model.summary()


Do not pass an `input_shape`/`input_dim` argument to a layer. When using Sequential models, prefer using an `Input(shape)` object as the first layer in the model instead.



In [18]:

# Callback to save model weights
model_checkpoint = ModelCheckpoint('LSTM_Tesla_model.keras', monitor='val_loss', save_best_only=True)

# Fitting the model
history = model.fit(X_train, y_train, 
                    batch_size=256, 
                    epochs=1_000, 
                    validation_data=(X_test, y_test), 
                    callbacks=[model_checkpoint],
                    shuffle=False,
                    verbose=False)

In [19]:
def plot_training_history(history, plot_title='Training Performance', plot_legends=None, color0=0):
    """
    Plots the training history of a model using Plotly.

    Args:
        history (dict): A dictionary containing the training history metrics.
        plot_title (str): Title of the plot.
        plot_legends (list): List of legends for the plot. If None, it uses the keys from the history dictionary.

    Returns:
        None: Displays the plot.
    """
    # Extracting metrics from the history object
    epochs = np.arange(1, len(next(iter(history.values()))) + 1)
    colors = ['blue', 'gold', 'violet', 'lime', 'blue', 'pink', 'yellow']
    data = []

    # If no legends are provided, use keys from the history
    if not plot_legends:
        plot_legends = list(history.keys())

    # Prepare data for each metric in the history
    for i, (key, legend) in enumerate(zip(history.keys(), plot_legends)):
        color_index = i % len(colors) + color0
        data.append(go.Scatter(x=epochs, y=history[key], mode='lines+markers', name=legend, line=dict(color=colors[color_index])))

    # Add error for minimum epoch value
    min_epoch = np.argmin(history['val_loss']) + 1 
    loss_str = f"Train Loss: {history['loss'][min_epoch-1]:.3e}<br>Test Loss: {history['val_loss'][min_epoch - 1]:.3e}"

    # Creating the layout
    layout = go.Layout(title=plot_title, xaxis=dict(title='Epochs'), yaxis=dict(title='Value'), width=1100, height=600)
    fig = go.Figure(data=data, layout=layout)

    # Annotate the minimum loss with an arrow
    fig.add_annotation(
        go.layout.Annotation(
            x=min_epoch,
            y=history['loss'][min_epoch - 1],
            xref="x",
            yref="y",
            text=loss_str,
            showarrow=True,
            arrowhead=7,
            arrowcolor='green',
            arrowsize=2,
            bordercolor='green',
            borderwidth=2,
            ax=0,
            ay=-40
        )
    )
    fig.show()

In [20]:
# Plotting LSTM model loss
plot_training_history(history.history, plot_title='LSTM Model Loss')

In [21]:
# Loading best wieghts during training
model = models.load_model(f'LSTM_Tesla_model.keras')

# Predicting
predictions = model.predict(X_test, verbose=False)
predictions = rnn_formater.scaler.inverse_transform(predictions)

# Showing predictions and data
index_1 = y_test.shape[0]
df_y_test = df[columns].iloc[-index_1:]
df_predictions = pd.DataFrame(predictions, index=df_y_test.index, columns=[f'pred_{col}' for col in columns])
df_test_pred = pd.concat([df_y_test, df_predictions], axis=1)
                         
# Shwoing outputs
highlight_half(df_test_pred.tail(), axis=1)

Unnamed: 0_level_0,open,high,low,close,volume,pred_open,pred_high,pred_low,pred_close,pred_volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2023-11-24 00:00:00-05:00,233.75,238.75,232.33,235.45,65125200,234.643,239.33,230.395,235.133,122747320.0
2023-11-27 00:00:00-05:00,236.89,238.33,232.1,236.08,112031800,236.236,240.532,232.231,236.503,99483784.0
2023-11-28 00:00:00-05:00,236.68,247.0,234.01,246.72,148549900,235.313,239.99,231.088,235.573,111623032.0
2023-11-29 00:00:00-05:00,249.21,252.75,242.76,244.14,135401300,244.507,249.501,239.532,244.522,126951072.0
2023-11-30 00:00:00-05:00,245.14,245.22,236.91,240.08,132353200,245.018,249.896,240.36,245.015,125653304.0


In [22]:
# Error dataframe
df_error = pd.DataFrame(df_predictions.values - df_y_test.values, index=df.index[-index_1:], columns=[f'error_{col}' for col in columns])
print('RMSE Per Column')
print((df_error**2).mean()**(1/2))

RMSE Per Column
error_open      4.182534e+00
error_high      5.479255e+00
error_low       5.381378e+00
error_close     7.431314e+00
error_volume    1.711884e+07
dtype: float64


In [23]:
def plotly_residual_analysis(df, title_add=''):
    """
    Perform residual analysis for multiple features in a DataFrame.
    The DataFrame should contain actual and predicted columns for each feature.
    
    Args:
        df (pd.DataFrame): DataFrame containing actual and predicted columns.
        title_add (str, optional): Additional title for the subplots.
    """
    # Number of columns
    columns = [col for col in df.columns if not col.startswith('pred_')]
    num_features = len(columns)

    # Color per column
    colors = ['blue', 'green', 'red', 'purple', 'orange', 'yellow']

    # Subplots per columns
    fig = make_subplots(rows=num_features, cols=4, vertical_spacing=0.035, horizontal_spacing=0.035,
                        subplot_titles=("Histogram", "QQ-Normal Plot", "Residuals vs. Predicted Values", "Residuals vs Index"))

    for i, col in enumerate(columns):
        actual = df[col]
        predicted = df[f'pred_{col}']
        residuals = actual - predicted
        mean_residuals = np.mean(residuals)
        sd_residuals = np.std(residuals)
        rmse = np.sqrt(np.mean(residuals**2))
        index = df.index

        # Assign color for each feature
        color = colors[i % len(colors)]

        # Histogram of residuals
        fig.add_trace(go.Histogram(x=residuals, nbinsx=30, name=f'{col.title()} Residuals', marker_color=color), row=i+1, col=1)
        # Add lines for mean and standard deviation
        fig.add_vline(x=mean_residuals, line=dict(color='black', width=2), row=i+1, col=1)
        fig.add_vline(x=mean_residuals + sd_residuals, line=dict(color='grey', width=2, dash='dash'), row=i+1, col=1)
        fig.add_vline(x=mean_residuals - sd_residuals, line=dict(color='grey', width=2, dash='dash'), row=i+1, col=1)
        fig.add_annotation(x=mean_residuals, y=5, text=f"Mean: {mean_residuals:.2f}", showarrow=True, row=i+1, col=1)
        fig.add_annotation(x=sd_residuals + mean_residuals, y=5, text=f"SD: {sd_residuals:.2f}", showarrow=False, row=i+1, col=1)
        
        # QQ-Normal of residuals
        qq = stats.probplot(residuals, dist="norm", plot=None)
        fig.add_trace(go.Scatter(x=qq[0][0], y=qq[1][1] + qq[1][0]*qq[0][0], mode='lines',  showlegend=False), row=i+1, col=2)
        fig.add_trace(go.Scatter(x=qq[0][0], y=qq[0][1], mode='markers', marker_color=color, name=f'{col.title()} QQ'), row=i+1, col=2)

        # Residuals vs. predicted values
        fig.add_trace(go.Scatter(x=predicted, y=residuals, mode='markers', marker_color=color, name=f'{col.title()} Resid Pred'), row=i+1, col=3)
        fig.add_hline(y=0, line=dict(color='red'), row=i+1, col=3)
        fig.add_hline(y=2 * rmse, line=dict(color='red', dash='dash'), row=i+1, col=3)
        fig.add_hline(y=-2 * rmse, line=dict(color='red', dash='dash'), row=i+1, col=3)

        # Residuals vs. index
        fig.add_trace(go.Scatter(x=index, y=residuals, mode='markers', marker_color=color, name=f'{col.title()} Resid Index'), row=i+1, col=4)
        fig.add_hline(y=0, line=dict(color='red'), row=i+1, col=4)
        fig.add_hline(y=2 * rmse, line=dict(color='red', dash='dash'), row=i+1, col=4)
        fig.add_hline(y=-2 * rmse, line=dict(color='red', dash='dash'), row=i+1, col=4)

    # Update layout
    fig.update_layout(height=250*num_features, width=1400, title_text="Residual Analysis " + title_add)
    fig.show()

In [24]:
# Residual Analysis Plot
plotly_residual_analysis(df_test_pred, 
                         title_add=f'- {SYMBOL} Vanilla LSTM')

In [25]:
def plot_predictions(y_values_df, predictions_df, title_add=''):
    """
    Plots actual values and predictions for each feature in separate subplots.
    
    Args:
        y_values_df (pd.DataFrame): DataFrame containing actual values.
        predictions_df (pd.DataFrame): DataFrame containing predicted values.
        title_add (str, optional): Additional title for the subplots.
    """
    # Number/color per features 
    columns = [col for col in y_values_df.columns]
    num_features = len(columns)
    actual_colors = ['cyan', 'lime', 'yellow', 'violet', 'gold', 'pink']

    # Creating subplots
    fig = make_subplots(rows=num_features, cols=1, vertical_spacing=0.03, subplot_titles=[col.title() for col in columns])

    for i, col in enumerate(columns):
        # Actual values trace
        fig.add_trace(go.Scatter(x=y_values_df.index, y=y_values_df[col], mode='lines', name=col.title(),
                                 line=dict(color=actual_colors[i % len(actual_colors)])), row=i+1, col=1)
        
        # Predicted values trace
        pred_col = f'pred_{col}'
        if pred_col in predictions_df.columns:
            fig.add_trace(go.Scatter(x=predictions_df.index, y=predictions_df[pred_col], 
                                     mode='lines', name=f'Predicted {col.title()}', line=dict(color='red')), row=i+1, col=1)
            
            # Calculate RMSE and add as an annotation
            rmse = np.sqrt(np.mean((y_values_df[col] - predictions_df[pred_col]) ** 2))
            fig.add_annotation(xref='x domain', yref='y domain', x=1, y=0.05, showarrow=False,
                               text=f'RMSE: {rmse:.2f}', row=i+1, col=1, font=dict(color='red'))
    fig.update_layout(height=350*num_features, width=1100, title_text="Data & Predictions " + title_add)
    fig.show()

In [26]:
# Plotting prediction and data
plot_predictions(df_y_test, df_predictions, title_add=f'- {SYMBOL} Vanilla LSTM')

In [27]:
# Getting last steps for LSTM forecast
last_steps = df[columns].iloc[-mapping_steps:, :]

# Number of steps to forecast
n_forecast_steps = 10 # ~ 2 weeks in business days

# Forming date index
date_index = pd.date_range(start=end_date, periods=n_forecast_steps, freq=BDay())

# Forecasting n-steps
forecast_array = rnn_formater.forecast_n_steps(model, last_steps, n_forecast_steps)
forecast = pd.DataFrame(forecast_array, index=date_index, columns=[f'forecast_{col}' for col in columns])
show(forecast)

Unnamed: 0,forecast_open,forecast_high,forecast_low,forecast_close,forecast_volume
2023-12-01 00:00:00,239.336,244.213,234.711,239.611,126224759.43
2023-12-04 00:00:00,239.378,244.325,234.727,239.713,124082712.163
2023-12-05 00:00:00,239.433,244.37,234.764,239.776,122825360.073
2023-12-06 00:00:00,239.375,244.283,234.686,239.7,121931410.598
2023-12-07 00:00:00,239.191,244.067,234.491,239.491,121185668.328
2023-12-08 00:00:00,238.898,243.744,234.196,239.174,120504244.755
2023-12-11 00:00:00,238.523,243.341,233.827,238.778,119859159.467
2023-12-12 00:00:00,238.09,242.885,233.405,238.328,119242489.622
2023-12-13 00:00:00,237.619,242.392,232.948,237.843,118654400.006
2023-12-14 00:00:00,237.124,241.878,232.47,237.339,118098990.457


In [28]:
# Plotting function
def plot_stock_data(df, previous_data=None, test_data=None, title_add=''):
    colors = ['blue', 'green', 'purple', 'orange', 'cyan']

    fig = make_subplots(rows=df.shape[1], cols=1, shared_xaxes=True, vertical_spacing=0.02,
                        subplot_titles=df.columns)

    for i, col in enumerate(df.columns):
        fig.add_trace(
            go.Scatter(x=df.index, y=df[col], mode='lines', name=col, line=dict(dash='dashdot', color='red'),
                      ), row=i+1, col=1)

        if previous_data is not None:
            column = list(previous_data.columns)[i]
            fig.add_trace(go.Scatter(x=previous_data.index, y=previous_data[column], mode='lines', name=column,
                                     line=dict(color=colors[i % len(colors)])), row=i+1, col=1)

        if test_data is not None:
            column = list(test_data.columns)[i]
            fig.add_trace(
                go.Scatter(x=test_data.index, y=test_data[column], mode='lines', name=f'Unseen {col.title()}',
                           line=dict(color='floralwhite')), row=i+1, col=1)
            

    fig.update_layout(height=1200, width=1100, title_text=f"Stock Data Over Time {title_add}")
    fig.show()

In [29]:
# Actual stock values to test forecast
df_test = ticker.history(start=end_date, end='2023-12-15').iloc[-n_forecast_steps:, :]
df_test.columns = df_test.columns.str.lower()

# Plotting forecast
plot_stock_data(forecast, last_steps, df_test, title_add=f'- {SYMBOL} Vanilla LSTM')

In [30]:
# Error per column on New Data
error_array = forecast.values - df_test[columns].values
errors = pd.DataFrame(error_array, index=df_test.index, columns=[f'error_{col}' for col in columns])
show(errors)

Unnamed: 0_level_0,error_open,error_high,error_low,error_close,error_volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2023-12-01 00:00:00-05:00,6.196,4.023,2.811,0.781,5051259.43
2023-12-04 00:00:00-05:00,3.628,4.955,1.437,4.133,19982912.163
2023-12-05 00:00:00-05:00,5.563,-2.29,1.064,1.056,-15145739.927
2023-12-06 00:00:00-05:00,-3.545,-2.287,-4.484,0.33,-4504789.402
2023-12-07 00:00:00-05:00,-2.359,-0.013,-2.489,-3.149,14043368.328
2023-12-08 00:00:00-05:00,-1.372,-1.526,-5.074,-4.666,17524144.755
2023-12-11 00:00:00-05:00,-4.217,-0.099,-3.623,-0.962,21945259.467
2023-12-12 00:00:00-05:00,-0.46,3.895,-0.465,1.318,23914189.622
2023-12-13 00:00:00-05:00,3.429,2.092,4.748,-1.447,-27631899.994
2023-12-14 00:00:00-05:00,-4.096,-12.002,-8.32,-13.711,-42730209.543


In [31]:
# RMSE per column on New Data
root_mean_squared_error = np.sqrt((errors**2).mean())
root_mean_squared_error = pd.DataFrame(root_mean_squared_error, columns=['RMSE New TLSA Data'])
show(root_mean_squared_error.T)

Unnamed: 0,error_open,error_high,error_low,error_close,error_volume
RMSE New TLSA Data,3.86,4.66,4.102,4.933,21958383.348


In [34]:
class RNNFormaterMultiStep:
    def __init__(self, data: pd.DataFrame, n_steps_in, n_steps_out):
        """
        Initialize the RNNFormater with a DataFrame, number of input steps, and number of output steps.
        
        Args:
            data (pd.DataFrame): Input DataFrame containing time series data.
            n_steps_in (int): Number of time steps for each input sequence.
            n_steps_out (int): Number of time steps for each output sequence.
        """
        self.df = data.copy()
        self.n_steps_in = n_steps_in
        self.n_steps_out = n_steps_out
        self.n_columns = self.df.shape[1]
        
        self.scaler = MinMaxScaler()
        self.normalized_data = self.scaler.fit_transform(self.df.values)

    def data_mapping(self):
        """
        Maps a 2D array into a 3D array for RNN input, with each sequence having n_steps_in time steps
        and each target having n_steps_out time steps.
        
        Returns:
            X (np.array): A 3D array of input sequences.
            y (np.array): A 3D array of target sequences.
        """
        num_samples = len(self.normalized_data) - self.n_steps_in - self.n_steps_out + 1

        X = np.empty((num_samples, self.n_steps_in, self.n_columns))
        y = np.empty((num_samples, self.n_steps_out, self.n_columns))

        for i in range(num_samples):
            X[i, :, :] = self.normalized_data[i:i + self.n_steps_in, :]
            y[i, :, :] = self.normalized_data[i + self.n_steps_in:i + self.n_steps_in + self.n_steps_out, :]

        return X, y

    def rnn_train_test_split(self, X, y, test_percent=0.1):
        """
        Splits the 3D mapped data into training and testing sets for an RNN.
        
        Args:
            X (np.array): The input data sequences.
            y (np.array): The target data sequences.
            test_percent (float): The fraction of data to be used for testing.
        
        Returns:
            X_train, X_test, y_train, y_test (tuple): Split data into training and testing sets.
        """
        test_size = int(len(X) * test_percent)
        X_train, y_train = X[:-test_size], y[:-test_size]
        X_test, y_test = X[-test_size:], y[-test_size:]

        return X_train, X_test, y_train, y_test

    def multi_step_forecast(self, model, data: np.array):
        """
        Forecast multiple steps ahead using the LSTM model.
        
        Args:
            model (tf.keras.Model): Trained LSTM model for prediction.
            data (pd.DataFrame): Input DataFrame containing the latest time series data.
        
        Returns:
            pd.DataFrame: Forecasted values for n_steps_out steps.
        """    
        # Normalizing latest data
        last_steps_normalized = self.scaler.transform(data)
        last_steps_normalized = last_steps_normalized.reshape(1, self.n_steps_in, self.n_columns)
        
        # Predicting using model
        forecast = model.predict(last_steps_normalized)
        forecast = forecast.reshape(self.n_steps_out, self.n_columns)

        # Inverse transforming to original scale
        forecast = self.scaler.inverse_transform(forecast)
        forecast = pd.DataFrame(forecast, columns=[f'forecast_{col}' for col in self.df.columns])
        
        return forecast
        

In [36]:
n_steps_in = 252  # Number of time steps for each input sequence
n_steps_out = 21  # Number of time steps for each output sequence
columns = ['open', 'high', 'low', 'close', 'volume']  # Columns to be used for modeling


In [37]:
# Multi-Step LSTM
ms_model = models.Sequential([
    LSTM(units=40, return_sequences=True, input_shape=(n_steps_in, len(columns)), kernel_initializer='orthogonal'),
    Bidirectional(LSTM(units=20, return_sequences=False)),
    Dense(units=n_steps_out * len(columns))   
])


Do not pass an `input_shape`/`input_dim` argument to a layer. When using Sequential models, prefer using an `Input(shape)` object as the first layer in the model instead.



In [38]:
n_steps_in = 252  # ~ 1 year in business days mapping
n_steps_out = 21  # ~ 1 month forecast in business days

# Intializing created RNN class
ms_rnn_formatter = RNNFormaterMultiStep(df[columns], n_steps_in, n_steps_out)

# Normalizing and data mappings for input & output
X, y = ms_rnn_formatter.data_mapping()

# Train Test split
X_train, X_test, y_train, y_test = ms_rnn_formatter.rnn_train_test_split(X, y)

print(f'X_train shape: {X_train.shape}\ny_train shape: {y_train.shape}') # (time mapes, time steps, columns)
print(f'\nX_test shape: {X_test.shape}\ny_test shape: {y_test.shape}')

X_train shape: (1322, 252, 5)
y_train shape: (1322, 21, 5)

X_test shape: (146, 252, 5)
y_test shape: (146, 21, 5)


In [39]:
# For consistant results
random.seed(0)
np.random.seed(0)
tf.random.set_seed(0)

# Multi-Step LSTM
ms_model = models.Sequential([
    
    # Layer 1
    LSTM(units=40, return_sequences=True, input_shape=(n_steps_in, len(columns)), kernel_initializer='orthogonal'),

    # Layer 2
    LSTM(units=20, return_sequences=False),
    # Bidirectional(LSTM(units=20, return_sequences=False)),

    # Output layer
    Dense(units=n_steps_out * len(columns))   
])

ms_model.compile(optimizer='adam', loss='mse')
ms_model.summary()


Do not pass an `input_shape`/`input_dim` argument to a layer. When using Sequential models, prefer using an `Input(shape)` object as the first layer in the model instead.



In [41]:
# Callback to save model weights
model_checkpoint = ModelCheckpoint('LSTM_Tesla_model.keras', monitor='val_loss', save_best_only=True)

# Fitting the model
ms_history = ms_model.fit(X_train, y_train.reshape(-1, n_steps_out * len(columns)), 
                          batch_size=1024,
                          epochs=150, 
                          validation_data=(X_test, y_test.reshape(-1, n_steps_out * len(columns))), 
                          callbacks=[model_checkpoint],
                          shuffle=False, 
                          verbose=False)

In [42]:
# Plotting stakced multi-step model loss
plot_training_history(ms_history.history,
                      plot_title='Stacked Multi-Step LSTM Model Loss')

In [43]:
# Getting stock data for forecasting
data_to_test = df[columns].iloc[-n_steps_in:]

# Showing data
print(f'Test Data Shape: {data_to_test.shape}')
show(data_to_test.tail())

Test Data Shape: (252, 5)


Unnamed: 0_level_0,open,high,low,close,volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2023-11-24 00:00:00-05:00,233.75,238.75,232.33,235.45,65125200
2023-11-27 00:00:00-05:00,236.89,238.33,232.1,236.08,112031800
2023-11-28 00:00:00-05:00,236.68,247.0,234.01,246.72,148549900
2023-11-29 00:00:00-05:00,249.21,252.75,242.76,244.14,135401300
2023-11-30 00:00:00-05:00,245.14,245.22,236.91,240.08,132353200


In [45]:
# Loading best wieghts during training
ms_model = models.load_model('LSTM_Tesla_model.keras')

# Forecasting using rnn
forecast = ms_rnn_formatter.multi_step_forecast(ms_model, data_to_test.values)

# Setting index as datetime corresponding to forecast period
forecast.index = pd.date_range(start=end_date, periods=n_steps_out, freq=BDay())
print(f'Forecast shape: {forecast.shape}')
show(forecast)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 241ms/step
Forecast shape: (21, 5)


Unnamed: 0,forecast_open,forecast_high,forecast_low,forecast_close,forecast_volume
2023-12-01 00:00:00,249.977,248.37,243.742,255.988,116179960.0
2023-12-04 00:00:00,245.934,253.03,238.462,258.812,106340232.0
2023-12-05 00:00:00,257.251,254.425,244.048,235.612,93655752.0
2023-12-06 00:00:00,246.798,258.98,238.378,251.076,92761640.0
2023-12-07 00:00:00,254.512,254.36,252.86,249.988,99586208.0
2023-12-08 00:00:00,247.561,249.554,237.245,246.061,131233776.0
2023-12-11 00:00:00,253.341,259.321,252.054,256.429,100428600.0
2023-12-12 00:00:00,250.699,259.46,243.388,255.238,79712984.0
2023-12-13 00:00:00,238.23,251.562,239.051,242.432,107761216.0
2023-12-14 00:00:00,254.463,260.617,240.161,254.976,107146024.0


In [46]:
def plot_forecast(previous_data, forecast, test_data=None, title_add=''):
    """
    Plots actual values and predictions for each feature in separate subplots.
    
    Args:
        previous_data (pd.DataFrame): DataFrame containing actual values.
        forecast (pd.DataFrame): DataFrame containing predicted values.
        title_add (str, optional): Additional title for the subplots.
    """
    # Number/color per features 
    columns = [col for col in previous_data.columns]
    num_features = len(columns)
    actual_colors = ['cyan', 'gold', 'violet', 'lime', 'blue', 'pink', 'yellow']

    # Creating subplots
    fig = make_subplots(rows=num_features, cols=1, vertical_spacing=0.03, subplot_titles=[col.title() for col in columns])

    for i, col in enumerate(columns):
        # Actual values trace
        fig.add_trace(
            go.Scatter(x=previous_data.index, y=previous_data[col], mode='lines', name=col.title(),
                       line=dict(color=actual_colors[i % len(actual_colors)])), row=i+1, col=1)
        
        # Predicted values trace
        pred_col = f'forecast_{col}'
        if pred_col in forecast.columns:
            fig.add_trace(
                go.Scatter(x=forecast.index, y=forecast[pred_col], 
                           mode='lines', name=f'Forecast {col.title()}', line=dict(color='red')), row=i+1, col=1)

        if test_data is not None:
            fig.add_trace(
                go.Scatter(x=test_data.index, y=test_data[col], mode='lines', name=f'Unseen {col.title()}',
                           line=dict(color='floralwhite')), row=i+1, col=1)
            
            
    fig.update_layout(height=350*num_features, width=1100, title_text="Data and Forecast " + title_add)
    fig.show()

In [47]:
# Actual stock values to test forecast
df_test = ticker.history(start=end_date).iloc[-n_steps_out:, :]
df_test.columns = df_test.columns.str.lower()

# Plotting stacked mutli-step LSTM forecast
plot_forecast(data_to_test[columns], forecast, df_test, title_add='- TSLA Stock')

**If you like my notebook, please upvote it : )**