**Data Collection and Create CSV file**


In [118]:
import yfinance as yf
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt

def download_stock_data(tickers, start_date, end_date=datetime.today().strftime('%Y-%m-%d')):
    # Loop through each ticker and download data
    for ticker in tickers:
        # Download historical data
        data = yf.download(ticker, start=start_date, end=end_date)
        # Reset column names to avoid mismatched headers
        data.columns = ['Adj Close', 'Close', 'High', 'Low', 'Open', 'Volume']
        # Reset the index to a new "Date" column and keep only the date part
        data.reset_index(inplace=True)
        data['Date'] = data['Date'].dt.date  # Convert DateTime to just date (YYYY-MM-DD format) to get rid of hours, minutes, and seconds
        data['Date'] = pd.to_datetime(data['Date']) # Convert 'Date' column to datetime format so model interpre it as dates

        # Save to CSV with Date as a column
        data.to_csv(f'{ticker}_10yrs.csv', index=False)  # Save without the index to make Date a column

**LSTM model Implementation**

**Data Preparation and Scaling**

In [119]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

def scale_stock_data(ticker):
    #  Load data and convert 'Date' to datetime format
    data = pd.read_csv(f'{ticker}_10yrs.csv')
    data['Date'] = pd.to_datetime(data['Date'])

    # Use only the 'Close' column for prediction
    close_prices = data[['Close']]

    # Standardize the close prices for better training performance
    scaler = StandardScaler()
    close_prices_scaled = scaler.fit_transform(close_prices)

    return data, close_prices_scaled, scaler

**Sequence Creation**

In [120]:
from sklearn.model_selection import train_test_split

def prepare_data_for_lstm(close_prices_scaled, sequence_length=60):
    # Define sequence length (60 days of historical prices)

    # Create sequences and targets for the model
    X = []
    y = []
    for i in range(sequence_length, len(close_prices_scaled)):
        X.append(close_prices_scaled[i-sequence_length:i])  # Last 30 days
        y.append(close_prices_scaled[i, 0])  # Next day's price

    # Convert to numpy arrays
    X, y = np.array(X), np.array(y)

    # Reshape X to (samples, time steps, features) for LSTM input
    X = X.reshape(X.shape[0], X.shape[1], 1)

    # Split the data into training and testing sets, remain chronological order
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

    # Show input data details
    print(f"X shape and y shape: {X.shape, y.shape}")
    print(f"X_train shape and y_train shape: {X_train.shape, y_train.shape}")
    print(f"X_test shape and y_test shape: {X_test.shape, y_test.shape}")

    return X_train, X_test, y_train, y_test

**Build and Train Model**

In [121]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import StandardScaler

def show_model_info(model):
    # Show model summary
    print(model.summary())
    return

def build_and_train_model(X_train, y_train, X_test, y_test, lstm_units, dropout_rate, learning_rate, epochs, batch_size, input_shape):
    # Initialize the scaler
    scaler = StandardScaler()

    # Fit and transform the scaler on the training data
    X_train_scaled = scaler.fit_transform(X_train.reshape(-1, X_train.shape[-1]))
    y_train_scaled = scaler.fit_transform(y_train.reshape(-1, 1))

    # Transform the test data using the fitted scaler
    X_test_scaled = scaler.transform(X_test.reshape(-1, X_test.shape[-1]))
    y_test_scaled = scaler.transform(y_test.reshape(-1, 1))

    # Reshape scaled data back to the original shape for LSTM
    X_train_scaled = X_train_scaled.reshape(X_train.shape)
    X_test_scaled = X_test_scaled.reshape(X_test.shape)

    model = Sequential([
        LSTM(lstm_units, return_sequences=True, input_shape=input_shape),
        Dropout(dropout_rate),  # Dropout rate of 20%
        LSTM(lstm_units),
        Dropout(dropout_rate),
        Dense(1)
    ])

    # Compile the model
    optimizer = Adam(learning_rate=learning_rate)  # Adjust learning rate
    model.compile(optimizer=optimizer, loss='mean_squared_error')

    # Train the model with verbose set to 0 to suppress epoch output
    history = model.fit(
        X_train_scaled, y_train_scaled,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(X_test_scaled, y_test_scaled),
        verbose=0
    )

    # Predict on test data
    y_pred_scaled = model.predict(X_test_scaled)

    # Reverse the standardization to get predictions in original scale
    y_test_unscaled = scaler.inverse_transform(y_test_scaled).flatten()
    y_pred_unscaled = scaler.inverse_transform(y_pred_scaled).flatten()

    return model, history, y_pred_unscaled, y_test_unscaled


**Prepare for Incremental Learning**

In [122]:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam

def get_new_data(ticker, scaler, sequence_length=60):
    # scaler = StandardScaler()
    end_date = datetime.today().strftime('%Y-%m-%d')
    start_date = (datetime.today() - timedelta(days=90)).strftime('%Y-%m-%d')

    data = yf.download(ticker, start=start_date, end=end_date)
    data.columns = ['Adj Close', 'Close', 'High', 'Low', 'Open', 'Volume']

    # Convert to DataFrame to preserve feature names
    close_prices_df = pd.DataFrame(data['Close'])

    # Scale and fit data
    close_prices_scaled = scaler.fit_transform(close_prices_df)
    X_new = []
    y_new = []
    for i in range(sequence_length, len(close_prices_scaled)):
        X_new.append(close_prices_scaled[i-sequence_length:i])
        y_new.append(close_prices_scaled[i, 0])  # Next day's price

    # Convert to numpy arrays
    X_new, y_new = np.array(X_new), np.array(y_new)
    y_new = y_new.reshape(-1, 1)

    return X_new, y_new


**Incremental Learning**

In [123]:
def incremental_learning_process(ticker, model, scaler, iterations=10):
    for epoch in range(1, iterations + 1):
        X_new, y_new = get_new_data(ticker, scaler)

        # Reshape for model input
        X_new_reshaped = X_new.reshape(X_new.shape[0], X_new.shape[1], 1)

        # Train the model with verbose=0 to silence epoch printing
        model.fit(X_new_reshaped, y_new, epochs=5, batch_size=64, verbose=0)

        # Predict on the new data
        y_pred_scaled = model.predict(X_new_reshaped)

        # Inverse transform to get original scale
        y_pred_unscaled = scaler.inverse_transform(y_pred_scaled).flatten()

        # Optionally print a summary for each update
        print(f'Update {epoch}: predicted price:', y_pred_unscaled)

    return model

**Metrics Evaluation**

In [124]:
from sklearn.metrics import mean_squared_error
import numpy as np
from sklearn.metrics import mean_absolute_error

def evaluate_model_performance(ticker, y_test_unscaled, y_pred_unscaled):
    # RMSE (Root Mean Square Error)
    print(f'For {ticker}:')
    rmse = np.sqrt(mean_squared_error(y_test_unscaled, y_pred_unscaled))
    print(f"RMSE: {rmse}")

    # MAE (Mean Absolute Error)
    mae = mean_absolute_error(y_test_unscaled, y_pred_unscaled)
    print(f"MAE: {mae}")

    # Direction Accuracy
    direction_accuracy = np.mean(
        np.sign(y_test_unscaled[1:] - y_test_unscaled[:-1]) == np.sign(y_pred_unscaled[1:] - y_test_unscaled[:-1])
    ) * 100
    print(f"Direction Accuracy: {direction_accuracy:.2f}%")

    # Backtesting
    # Simulate trading: Buy if price is predicted to go up, sell if it goes down
    returns = (y_test_unscaled[1:] - y_test_unscaled[:-1])  # Actual price changes
    predicted_returns = (y_pred_unscaled[1:] - y_test_unscaled[:-1])  # Predicted price changes
    profit = np.sum(np.sign(predicted_returns) * returns)

    print(f"Backtesting Profit: ${profit:.2f}")

**Plot**


In [125]:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

def plot_actual_vs_predicted(data, y_test_unscaled, y_pred_unscaled, ticker):
    # Plot the actual vs. predicted values on test data
    plt.figure(figsize=(10, 6))

    # Plotting only the test range
    plt.plot(data['Date'][-len(y_test_unscaled):], y_test_unscaled, label='Actual Closing Price', color='blue')
    plt.plot(data['Date'][-len(y_test_unscaled):], y_pred_unscaled, label='Predicted Closing Price', color='green')

    plt.xlabel('Date')
    plt.ylabel('Closing Price reverse z-score normalization')
    plt.title(f"Actual vs Predicted Closing Prices of {ticker} (2 Years)")
    plt.legend()

    # Set date format on x-axis (adapted for 10-year range)
    plt.gca().xaxis.set_major_locator(mdates.YearLocator(1))  # Tick every year
    plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y'))  # Format ticks as 'YYYY'
    plt.xticks(rotation=45)
    plt.grid(axis='x', linestyle='--', alpha=0.7)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.show()

**Main Function**

In [None]:
# Define tickers and start date, end date is default today
# tickers = ['AAPL']
tickers = ['AAPL', 'NVDA', 'MSFT', 'AMZN', 'GOOG', '2222.SR', 'META', 'TSLA', 'TSM', 'AVGO']

start_date = '2014-9-11'
lstm_units = 50
dropout_rate=0.2
learning_rate=0.01
epochs=20
batch_size=64
sequence_length=60 # Sequence length is consistent with the input shape
input_shape=(60, 1)

def lstm_model(ticker, start_date, lstm_units, dropout_rate, learning_rate, epochs, batch_size, input_shape):
    # Download the data from yfinace
    download_stock_data(tickers, start_date)
    # print(data.tail())
    # print(data.head())

    # Scale data
    data, close_prices_scaled, scaler = scale_stock_data(ticker)

    # Create sequence for lstm
    X_train, X_test, y_train, y_test = prepare_data_for_lstm(close_prices_scaled, sequence_length)

    # Define and build the model
    model, history, y_pred_unscaled, y_test_unscaled = build_and_train_model(X_train, y_train, X_test, y_test, lstm_units, dropout_rate, learning_rate, epochs, batch_size, input_shape)

    # Show model information
    show_model_info(model)

    # Incremental Learning
    updated_model = incremental_learning_process(ticker, model, scaler)


    # Evaluate model
    evaluate_model_performance(ticker, y_test_unscaled, y_pred_unscaled)

    # Plot the model
    plot_actual_vs_predicted(data, y_test_unscaled, y_pred_unscaled, ticker)
    return

for ticker in tickers:
    lstm_model(ticker, start_date, lstm_units, dropout_rate, learning_rate, epochs, batch_size, input_shape)


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
  super().__init__(**kwargs)


X shape and y shape: ((2523, 60, 1), (2523,))
X_train shape and y_train shape: ((2018, 60, 1), (2018,))
X_test shape and y_test shape: ((505, 60, 1), (505,))
