In [3]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, GRU, Dense, Dropout, Bidirectional

# Function to fetch stock data and preprocess it
def fetch_data():
    # Loading the entire stock data
    ticker = input("Enter the stock ticker (e.g., AAPL): ")
    data = yf.download(ticker)

    if data.empty:
        print("No data fetched. Please check the ticker or internet connection.")
        return None

    # Allow user to analyze only the last 'n' days
    try:
        n = int(input("Enter the number of recent days you want for analysis: "))
        if n <= 0:
            print("Number of days should be positive. Using full data instead.")
            return data
        
        filtered_data = data.tail(n)
        if filtered_data.empty:
            print("No data available for the selected range. Returning full dataset.")
            return data
        return filtered_data
    except ValueError:
        print("Invalid input. Please enter a valid number of days.")
        return data

def split_train_test(data, time_step, split_ratio=0.8):
    train_size = int(len(data) * split_ratio)
    train_data = data[:train_size]
    test_data = data[train_size - time_step:]  # Include overlap for time steps

    # Create time-step datasets
    X_train, y_train = create_time_step_data(train_data, time_step)
    X_test, y_test = create_time_step_data(test_data, time_step)

    # Reshape for models
    X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
    X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
    return X_train, y_train, X_test, y_test


# Function to prepare the dataset for training
def create_dataset(data, time_step):
    X, Y = [], []
    for i in range(len(data) - time_step):
        X.append(data[i:(i + time_step), 0])
        Y.append(data[i + time_step, 0])
    return np.array(X), np.array(Y)
    
#-------------------------------------------------------------------------------------------------------


# Bayesian Neural Network Model
class BayesianNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_prob=0.3):
        super(BayesianNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Train and predict with Bayesian Neural Network
def train_bnn(X_train, y_train, X_test):
    model = BayesianNN(input_dim=X_train.shape[1], hidden_dim=64, output_dim=1)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0005)

    # Convert to PyTorch tensors and ensure the correct shape
    X_train = torch.tensor(X_train, dtype=torch.float32)
    X_test = torch.tensor(X_test, dtype=torch.float32)
    y_train = torch.tensor(y_train, dtype=torch.float32)

    # Ensure the input has the correct shape
    X_train = X_train.view(X_train.size(0), -1)  # Flatten the input
    X_test = X_test.view(X_test.size(0), -1)    # Flatten the input similarly

    # Training loop
    epochs = 500
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output.squeeze(), y_train)
        loss.backward()
        optimizer.step()

    # Predictions
    model.eval()
    with torch.no_grad():
        predictions = model(X_test).numpy()

    return predictions, model

# Build and train LSTM model
def train_lstm(X_train, y_train, X_test):
    model = Sequential()
    model.add(LSTM(50, return_sequences=True, input_shape=(X_train.shape[1], 1)))
    model.add(Dropout(0.2))
    model.add(LSTM(50, return_sequences=False))
    model.add(Dropout(0.2))
    model.add(Dense(1))

    model.compile(optimizer='adam', loss='mean_squared_error')
    model.fit(X_train, y_train, epochs=50, batch_size=32, verbose=1)
    predictions = model.predict(X_test)

    return predictions, model



#--------------------------------------------------------------

# Bayesian LSTM with MC Dropout
class BayesianLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_prob=0.3):
        super(BayesianLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        lstm_out = self.dropout(lstm_out[:, -1, :])  # Use the last hidden state
        output = self.fc(lstm_out)
        return output

def train_bayesian_lstm(X_train, y_train, X_test, y_test):
    model = BayesianLSTM(input_dim=1, hidden_dim=64, output_dim=1, dropout_prob=0.2) #Corrected instantiation
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Convert to PyTorch tensors
    X_train = torch.tensor(X_train, dtype=torch.float32)
    y_train = torch.tensor(y_train, dtype=torch.float32)
    X_test = torch.tensor(X_test, dtype=torch.float32)
    y_test = torch.tensor(y_test, dtype=torch.float32)

    # Training loop
    epochs = 50
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output.squeeze(), y_train)
        loss.backward()
        optimizer.step()

    # Predictions
    model.eval()
    with torch.no_grad():
        predictions = model(X_test).squeeze().numpy()

    rmse = np.sqrt(mean_squared_error(y_test.numpy(), predictions))

    # Plot Predictions vs Actual
    plt.figure(figsize=(10, 6))
    plt.plot(y_test.numpy(), label='Actual Price')
    plt.plot(predictions, label='Predicted Price')
    plt.title('Bayesian LSTM: Predictions vs Actual')
    plt.xlabel('Time')
    plt.ylabel('Close Price')
    plt.legend()
    plt.show()

    return rmse, predictions


# Bidirectional LSTM Model
class BidirectionalLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=1, dropout_prob=0.2): #Added num_layers
        super(BidirectionalLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout_prob, bidirectional=True) #Corrected instantiation
        self.fc = nn.Linear(hidden_dim * 2, output_dim)  # Multiply by 2 for bidirectional

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        out = self.fc(lstm_out[:, -1, :])  # Get the last output of the LSTM
        return out

# Train and predict with FFNN
def train_ffnn(X_train, y_train, X_test):
    model = FFNN(input_dim=X_train.shape[1], hidden_dim=64, output_dim=1)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0005)

    X_train = torch.tensor(X_train, dtype=torch.float32)
    X_test = torch.tensor(X_test, dtype=torch.float32)
    y_train = torch.tensor(y_train, dtype=torch.float32)

    X_train = X_train.view(X_train.size(0), -1)
    X_test = X_test.view(X_test.size(0), -1)

    epochs = 500
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output.squeeze(), y_train)
        loss.backward()
        optimizer.step()

    model.eval()
    with torch.no_grad():
        predictions = model(X_test).numpy()

    return predictions, model

# Baysian lstm
def train_bayesian_lstm(X_train, y_train, X_test, y_test):
    model = BayesianLSTM(input_dim=1, hidden_dim=64, output_dim=1, dropout_prob=0.2)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Convert to PyTorch tensors and reshape for LSTM
    X_train = torch.tensor(X_train, dtype=torch.float32).reshape(-1, X_train.shape[1], 1) #Corrected Reshape
    y_train = torch.tensor(y_train, dtype=torch.float32).reshape(-1, 1) #Corrected Reshape
    X_test = torch.tensor(X_test, dtype=torch.float32).reshape(-1, X_test.shape[1], 1)  #Corrected Reshape
    y_test = torch.tensor(y_test, dtype=torch.float32).reshape(-1, 1) #Corrected Reshape


    # Training loop
    epochs = 50
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output.squeeze(), y_train)
        loss.backward()
        optimizer.step()

    # Predictions
    model.eval()
    with torch.no_grad():
        predictions = model(X_test).squeeze().numpy()

    rmse = np.sqrt(mean_squared_error(y_test.numpy(), predictions))

    # Plot Predictions vs Actual
    plt.figure(figsize=(10, 6))
    plt.plot(y_test.numpy(), label='Actual Price')
    plt.plot(predictions, label='Predicted Price')
    plt.title('Bayesian LSTM: Predictions vs Actual')
    plt.xlabel('Time')
    plt.ylabel('Close Price')
    plt.legend()
    plt.show()

    return rmse, predictions


# Train and predict with Bidirectional LSTM
def train_bidirectional_lstm(X_train, y_train, X_test):
    model = BidirectionalLSTM(input_dim=1, hidden_dim=64, output_dim=1, num_layers=2, dropout_prob=0.2) #Corrected instantiation and added num_layers
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0005)

    X_train = torch.tensor(X_train, dtype=torch.float32).unsqueeze(-1)
    X_test = torch.tensor(X_test, dtype=torch.float32).unsqueeze(-1)
    y_train = torch.tensor(y_train, dtype=torch.float32)

    epochs = 500
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output.squeeze(), y_train)
        loss.backward()
        optimizer.step()

    model.eval()
    with torch.no_grad():
        predictions = model(X_test).numpy()

    return predictions, model


# Build and train GRU model
def train_gru(X_train, y_train, X_test):
    model = Sequential()
    model.add(GRU(50, return_sequences=True, input_shape=(X_train.shape[1], 1)))
    model.add(Dropout(0.2))
    model.add(GRU(50, return_sequences=False))
    model.add(Dropout(0.2))
    model.add(Dense(1))

    model.compile(optimizer='adam', loss='mean_squared_error')
    model.fit(X_train, y_train, epochs=50, batch_size=32, verbose=1)
    predictions = model.predict(X_test)

    return predictions, model


# Predict future prices
def predict_future_prices(model, scaler, data, time_step, future_days, model_type='LSTM'):
    last_data = data[-time_step:]
    last_data = last_data.reshape(1, time_step, 1)
    
    future_predictions = []
    
    for _ in range(future_days):
        if model_type in ['LSTM', 'GRU', 'Bidirectional LSTM','Bayesian LSTM',' FFNN']:
            future_pred = model.predict(last_data)
        else:
            future_pred = model(torch.tensor(last_data, dtype=torch.float32).view(1, -1)).detach().numpy()

        future_predictions.append(future_pred[0, 0])
        last_data = np.append(last_data[:, 1:, :], future_pred.reshape(1, 1, 1), axis=1)

    future_predictions = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1))

    return future_predictions
    


#-----------------------------------------------------------------------------------------------------------------------------------
# ------------------------------Function to generate various plots----------------------------------------------------------------------
def plot_stock_price(data):
    plt.plot(data['Close'], color='blue', label='Stock Price')
    plt.title('Stock Price Over Time')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.show()


def plot_moving_averages(data):
    default_ma_windows = [50, 200]
    
    # Ask the user how many additional moving averages they want to plot
    additional_lines = int(input("How many additional moving averages would you like to plot (enter 0 for none)? "))
    
    # Initialize the list with the default moving averages
    ma_windows = default_ma_windows.copy()
    
    # Collect the additional moving averages from the user
    for i in range(additional_lines):
        days = int(input(f"Enter the number of days for moving average line {i+1}: "))
        ma_windows.append(days)
    
    # Plot the stock price
    plt.plot(data['Close'], color='blue', label='Stock Price')

    # Loop through the list of moving average windows and plot each
    for days in ma_windows:
        ma_column_name = f'{days}_MA'
        data[ma_column_name] = data['Close'].rolling(window=days).mean()
        plt.plot(data[ma_column_name], label=f'{days}-Day MA')

    # Customize the plot
    plt.title('Stock Price and Moving Averages')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.show()


def plot_rsi(data):
    delta = data['Close'].diff()
    gain = (delta.where(delta > 0, 0)).fillna(0)
    loss = (-delta.where(delta < 0, 0)).fillna(0)
    avg_gain = gain.rolling(window=14).mean()
    avg_loss = loss.rolling(window=14).mean()
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    
    plt.plot(data.index, rsi, label='RSI', color='purple')
    plt.title('Relative Strength Index (RSI)')
    plt.xlabel('Date')
    plt.ylabel('RSI')
    plt.axhline(70, color='red', linestyle='--')
    plt.axhline(30, color='green', linestyle='--')
    plt.legend()
    plt.show()

def plot_macd(data):
    short_ema = data['Close'].ewm(span=12, adjust=False).mean()
    long_ema = data['Close'].ewm(span=26, adjust=False).mean()
    macd = short_ema - long_ema
    signal = macd.ewm(span=9, adjust=False).mean()

    plt.plot(data.index, macd, label='MACD', color='blue')
    plt.plot(data.index, signal, label='Signal Line', color='red')
    plt.title('MACD (Moving Average Convergence Divergence)')
    plt.xlabel('Date')
    plt.ylabel('MACD Value')
    plt.legend()
    plt.show()

def plot_bollinger_bands(data):
    moving_avg = data['Close'].rolling(window=20).mean()
    rolling_std = data['Close'].rolling(window=20).std()
    
    upper_band = moving_avg + (rolling_std * 2)
    lower_band = moving_avg - (rolling_std * 2)

    plt.plot(data['Close'], label='Stock Price')
    plt.plot(moving_avg, label='20-Day Moving Average', color='orange')
    plt.plot(upper_band, label='Upper Band', color='red')
    plt.plot(lower_band, label='Lower Band', color='green')
    plt.title('Bollinger Bands')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.show()

def plot_all_analysis(data):
    plot_stock_price(data)
    plot_moving_averages(data)
    plot_rsi(data)
    plot_macd(data)
    plot_bollinger_bands(data)


#----------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
def choose_plot(data):# Plot analysis options
    print("\nChoose the plots for analysis (multi-select by entering numbers separated by commas):")
    print("1. Stock Price Line Plot (Time Series Plot)")
    print("2. Moving Averages (50-Day, 200-Day)")
    print("3. Relative Strength Index (RSI)")
    print("4. MACD (Moving Average Convergence Divergence)")
    print("5. Bollinger Bands")
    print("6. Select All")
    print("7. Done")

    plot_choices = input("Enter the numbers corresponding to your choices (e.g., 1,3,5): ").strip()
    plot_choices = [int(x) for x in plot_choices.split(',')] if plot_choices != '8' else list(range(1, 8))

    if data is None:
        print("No data available for plotting. Exiting plot selection.")
        return

    if 6 in plot_choices:  # Select All option
        plot_all_analysis(data)
        return

    for choice in plot_choices:
        if choice == 1:
            plot_stock_price(data)
        elif choice == 2:
            plot_moving_averages(data)
        elif choice == 3:
            plot_rsi(data)
        elif choice == 4:
            plot_macd(data)
        elif choice == 5:
            plot_bollinger_bands(data)
        elif choice == 7:
            print("Exiting plot selection.")
            break
        else:
            print(f"Invalid choice: {choice}. Please select a valid option.")

#---------------------------------------------------------------------------------------------------------------
def choose_model(data):# Model prediction options
    print("\nChoose the model for prediction:")
    print("1. Bayesian Neural Network (BNN)")
    print("2. LSTM")
    print("3. GRU")
    print("4. Feedforward Neural Network (FFNN)")
    print("5. Bayesian LSTM")
    print("6. Bidirectional LSTM")

    model_choice = int(input("Enter the number corresponding to your choice: "))
    future_days = int(input("Enter the number of future days to predict: "))
    time_step = int(input("Enter the time step (e.g., 50): ")) 

    if data is None:
        return

    close_prices = data['Close'].values.reshape(-1, 1)

    # Normalize data
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaled_data = scaler.fit_transform(close_prices)

    # Split data into training and testing datasets
    train_size = int(len(scaled_data) * 0.8)
    train_data = scaled_data[:train_size]
    test_data = scaled_data[train_size - time_step:]



    # Reshape input for models
    X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
    X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

    X_train, y_train, X_test, y_test = split_train_test(scaled_data, time_step) #Corrected line


    # Train the model based on user choice
    if model_choice == 1:  # BNN
        predictions, model = train_bnn(X_train, y_train, X_test)
        model_type = 'BNN'
    elif model_choice == 2:  # LSTM
        predictions, model = train_lstm(X_train, y_train, X_test)
        model_type = 'LSTM'
    elif model_choice == 3:  # GRU
        predictions, model = train_gru(X_train, y_train, X_test)
        model_type = 'GRU'
    elif model_choice == 4:  # FFNN
        predictions, model=train_ffnn(X_train, y_train, X_test)
        model_type = 'FFNN'
    elif model_choice == 5:  # Bayesian LSTM
        predictions, model = train_bayesian_lstm(X_train, y_train, X_test, y_test) #Corrected function call
        model_type = 'Bayesian LSTM '
    elif model_choice == 6:  # Bidirectional LSTM
        predictions, model= train_bidirectional_lstm(X_train, y_train, X_test)
        model_type = 'Bidirectional LSTM'
    else:
        print("Invalid choice. Please select a valid option.")
        return

    # Inverse transform the predictions and calculate RMSE
    actual_prices = scaler.inverse_transform(y_test.reshape(-1, 1))
    predictions = scaler.inverse_transform(predictions)

    # Calculate RMSE
    rmse = np.sqrt(mean_squared_error(actual_prices, predictions))
    print('Root Mean Squared Error (RMSE): ',rmse)

    # Plot predictions vs actual data
    plt.plot(actual_prices, label='Actual Price')
    plt.plot(predictions, label='Predicted Price')
    plt.title(' Model Prediction vs Actual')
    plt.xlabel('Time')
    plt.ylabel('Price')
    plt.legend()
    plt.show()

    # Predict future prices if needed
    future_predictions = predict_future_prices(model, scaler, scaled_data, time_step, future_days, model_type=model_type)
    print("Predicted future prices for",future_days, "days")
    plt.plot(range(len(scaled_data)), scaler.inverse_transform(scaled_data), label='Historical Price')
    plt.plot(range(len(scaled_data), len(scaled_data) + future_days), future_predictions, label='Future Predictions', color='red')
    plt.title('Future Stock Price Prediction')
    plt.xlabel('Time')
    plt.ylabel('Price')
    plt.legend()
    plt.show()


def choose_analysis_or_model():
    
    # Fetch data
    data = fetch_data()
    
    if data is not None:
        # Count the number of observations
        num_observations = data.shape[0]
        print("------#####[",num_observations,"]Days are selected#-------")
    else:
        print("Failed to load data.")
        
    print("Choose the analysis type:")
    print("1. Analysis using Plots")
    print("2. Prediction using Models")
    print("3. Done")
    choice = int(input("Enter the number corresponding to your choice: "))

    if choice == 1:
        choose_plot(data)
    elif choice == 2:
        choose_model(data)
    elif choice == 3:
        print("Thank you for using the tool. Goodbye!")

        
    else:
        print("Invalid choice. Please select either '1' for analysis or '2' for models.")



#Main execution
choose_analysis_or_model()



Enter the stock ticker (e.g., AAPL):  aapl


[*********************100%***********************]  1 of 1 completed


Enter the number of recent days you want for analysis:  1000


------#####[ 1000 ]Days are selected#-------
Choose the analysis type:
1. Analysis using Plots
2. Prediction using Models
3. Done


Enter the number corresponding to your choice:  2



Choose the model for prediction:
1. Bayesian Neural Network (BNN)
2. LSTM
3. GRU
4. Feedforward Neural Network (FFNN)
5. Bayesian LSTM
6. Bidirectional LSTM


Enter the number corresponding to your choice:  5
Enter the number of future days to predict:  50
Enter the time step (e.g., 50):  50


UnboundLocalError: cannot access local variable 'X_train' where it is not associated with a value