# AI Bubble Predictor

AI is a very trendy topic nowadays. Everyone is talking about it, and we hear about it everywhere. But can it really deliver on its promises?

I don't think so. I think that AI is just a bubble that will burst soon. It's a great technology, but it's not as powerful as people think. As any tool, it has its limitations — and those have been overlooked by many.

In this project, I will build a model to predict when the AI bubble will burst. I will use historical data about a phenomenon I believe was similar to the AI bubble (the dot-com bubble) to train an LSTM model. Then, I will use this model to generate a hypothesis about when the AI bubble will burst.

I hope this project will help people understand the limitations of AI and avoid the consequences of the burst of the AI bubble.

## Getting the data

In [None]:
# First, get stock market data about the dot-com bubble

import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
import datetime

# Get the stock data
full_timeframe_start = datetime.datetime(1996, 1, 1)
full_timeframe_end = datetime.datetime(2006, 1, 1)
companies = ['MSFT', 'AAPL', 'AMZN', 'INTL', 'ORCL', 'INTC', 'QCOM', 'HPQ', 'EBAY', 'VZ', 'T', 'GOOGL', 'FB', 'TWTR', 'SNAP', 'TSLA', 'NFLX', 'DIS', 'CMCSA', 'FOXA', 'DISCA', 'VIAC', 'NWSA', 'NWS', 'CBS']
stock_data = yf.download(tickers=companies, start=full_timeframe_start, end=full_timeframe_end)
# Remove those columns that have any NaN values
stock_data = stock_data.dropna(axis=1)
companies = stock_data.columns.get_level_values(1).unique()
num_companies = len(companies)
print(f"Number of companies: {num_companies}")

# Save the data to a CSV file
stock_data.to_csv('stock_data.csv')

display(stock_data.head())

## Training the LSTM model

In [None]:
# Here, we'll transform the data to make it easier to work with. We'll calculate the daily returns for each stock.
# We'll also calculate the average daily return for the dot-com bubble period.
# Finally, we'll calculate the average daily return for the period after the dot-com bubble.

# Calculate the daily returns
stock_data['Adj Close'].pct_change()
print(f'Stock data shape: {stock_data.shape}')

# Calculate the average daily return for the dot-com bubble period
dot_com_bubble_start = full_timeframe_start
dot_com_bubble_end = datetime.datetime(2002, 1, 1)

dot_com_bubble_returns = stock_data.loc[dot_com_bubble_start:dot_com_bubble_end]['Adj Close'].pct_change().mean()

# Calculate the average daily return for the period after the dot-com bubble
post_dot_com_bubble_start = dot_com_bubble_end
post_dot_com_bubble_end = full_timeframe_end

post_dot_com_bubble_returns = stock_data.loc[post_dot_com_bubble_start:post_dot_com_bubble_end]['Adj Close'].pct_change().mean()

print('Average daily return for the dot-com bubble period:', dot_com_bubble_returns)
print('Average daily return for the period after the dot-com bubble:', post_dot_com_bubble_returns)

In [None]:
# Now, let's plot the stock prices for each company during the dot-com bubble period.
# We'll also plot the average daily return for the dot-com bubble period and the period after the dot-com bubble.

# Plot the stock prices for each company during the dot-com bubble period
stock_data.loc[dot_com_bubble_start:dot_com_bubble_end]['Adj Close'].plot(figsize=(12, 8))
plt.title('Stock Prices During the Dot-Com Bubble Period')
plt.legend(companies)
plt.show()

In [None]:
# Now, use the transformed data to train an LSTM model to predict the stock prices for the next day.

# First, we'll normalize the data
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(stock_data['Adj Close'])
print(f'Scaled data shape: {scaled_data.shape}')

import torch
# Now, we'll create the input and output sequences for the LSTM model
def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        # Generate sequences of length seq_length like: [0, 1, 2, ..., seq_length-1], [1, 2, 3, ..., seq_length], ...
        X.append(data[i:i + seq_length])
        y.append(data[i+1:i + seq_length + 1])
    return torch.FloatTensor(X), torch.FloatTensor(y) # Convert the data to PyTorch tensors

seq_length = 60 # i.e. the model will look at the previous 180 days to predict the stock price for the next day

X, y = create_sequences(scaled_data, seq_length)

# We need to create the sequences before splitting the data into training and testing sets, as the sequences need to be continuous
# If we split the data first, the sequences would be broken up and the model wouldn't be able to learn from them
# Now, we'll split the data into training and testing sets
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) # This keeps the sequences continuous

print(f'Type of X_train: {type(X_train)}')
print(f'Type of y_train: {type(y_train)}')
print('X_train shape:', X_train.shape)
print('y_train shape:', y_train.shape)
print('X_test shape:', X_test.shape)
print('y_test shape:', y_test.shape)

In [None]:
# Plot the first element of each sequence in the training set in blue, then plot the first element of each sequence in the testing set in red
# This verifies that the sequences are continuous in a visual way
plt.plot(range(len(X_train)), X_train[:, 0], label='Training set', color='blue', alpha=0.5)
plt.plot(range(len(X_train), len(X_train) + len(X_test)), X_test[:, 0], label='Testing set', color='red', alpha=0.5)
plt.title('First Element of Each Sequence in the Training Set')
plt.show()

In [None]:
# Now, we'll build the LSTM model with PyTorch
import torch
import torch.nn as nn
from torch.autograd import Variable

# Ensure reproducibility
import random

random.seed(42)
torch.manual_seed(42)
np.random.seed(42)


class LSTM(nn.Module):

    def __init__(
        self,
        input_size=1,
        hidden_layer_size=100,
        output_size=1,
        num_layers=1,
        dropout=0.0,
        device="cpu",
    ):
        super(LSTM, self).__init__()
        self.hidden_layer_size = hidden_layer_size
        self.num_layers = num_layers
        self.input_size = input_size

        # Apply dropout to the input layer
        self.dropout = nn.Dropout(dropout).to(device)

        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_layer_size,
            num_layers=num_layers,
            dropout=(
                dropout if num_layers > 1 else 0
            ),  # It doesn't make sense to apply dropout if we only have 1 layer
            batch_first=True,
        ).to(
            device
        )  # Input_size is 1 because we're using the daily returns as input (this is just 1 dimension)

        self.linear = nn.Linear(hidden_layer_size, output_size).to(device)

        self.device = device

    def forward(self, input_seq: Variable) -> Variable:
        # print(f"input_seq shape: {input_seq.shape}")
        h0 = torch.zeros(self.num_layers, input_seq.size(0), self.hidden_layer_size).to(
            self.device
        )
        c0 = torch.zeros(self.num_layers, input_seq.size(0), self.hidden_layer_size).to(
            self.device
        )

        # Apply dropout to the input layer
        input_seq = self.dropout(input_seq)

        # We need to detach the hidden state to prevent the model from backpropagating through the entire history
        lstm_out, (hn, cn) = self.lstm(input_seq, (h0.detach(), c0.detach()))
        # print(f'lstm_out shape: {lstm_out.shape}')

        # Apply dropout to the output of the LSTM layer
        lstm_out = self.dropout(lstm_out)

        predictions = self.linear(lstm_out)
        # print(f"predictions shape: {predictions.shape}")
        return predictions

    def generate_predictions(
        self, input_seq: Variable, num_predictions: int
    ) -> Variable:
        generated_items = starting_sequence
        current_window = starting_sequence

        # Prepare the initial hidden state
        h0 = torch.zeros(self.num_layers, input_seq.size(0), self.hidden_layer_size).to(
            self.device
        )
        c0 = torch.zeros(self.num_layers, input_seq.size(0), self.hidden_layer_size).to(
            self.device
        )
        for num_item in range(num_to_generate):
            lstm_output, (hn, cn) = self.lstm(current_window, (h0, c0))
            h0, c0 = hn.detach(), cn.detach()
            output = self.linear(lstm_output)
            items_to_add_to_window = output[:, -1, :].view(
                -1, 1, starting_sequence.shape[-1]
            )
            current_window = torch.roll(current_window, -1, 1)
            current_window[:, -1, :] = items_to_add_to_window
            generated_items = torch.cat(
                [generated_items, items_to_add_to_window], dim=1
            )

        return generated_items.view(
            starting_sequence.shape[0], -1, starting_sequence.shape[-1]
        )

    def forward2(self, input_seq: Variable) -> Variable:
        # print(f"input_seq shape: {input_seq.shape}")
        lstm_out, _ = self.lstm(input_seq)
        # print(f'lstm_out shape: {lstm_out.shape}')
        predictions = self.linear(lstm_out)
        # print(f"predictions shape: {predictions.shape}")
        return predictions

In [None]:
# If cuda is available, move the model to the GPU
if torch.cuda.is_available():
    print("Using CUDA")
    device = torch.device("cuda")
# Is mps available?
elif torch.backends.mps.is_built():
    print("Using MPS")
    device = torch.device("mps")
else:
    print("Using CPU")
    device = torch.device("cpu")

# device = torch.device('cpu')

# Only take the first company for now
X_train = X_train[:, :, 1:4]
y_train = y_train[:, :, 1:4]
X_test = X_test[:, :, 1:4]
y_test = y_test[:, :, 1:4]

# Also move the training data to the GPU
X_train = X_train.to(device)
y_train = y_train.to(device)
X_test = X_test.to(device)
y_test = y_test.to(device)
print(f"Shape of X_train: {X_train.shape}")
print(f"Device of X_train: {X_train.device}")
print(f"Type of X_train: {type(X_train)}")

num_layers = 2
hidden_layer_size = 32

In [None]:
model = LSTM(
    input_size=1,
    hidden_layer_size=hidden_layer_size,
    output_size=1,
    num_layers=num_layers,
    device=device,
    # dropout=0.25,
)
print(model)

loss_function = nn.MSELoss(reduction="sum")
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-2)

tensor_dataset = torch.utils.data.TensorDataset(X_train, y_train)
loader = torch.utils.data.DataLoader(tensor_dataset, batch_size=128, shuffle=True)

# Train the model
epochs = 100
log_every = 1
import tqdm
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter()

INDEX_OF_COMPANY_TO_PLOT = 0


# Monitor the training and testing RMSE and stop training when the testing RMSE starts to increase, as this is a sign of overfitting
best_test_rmse = float("inf")
best_model = None
best_epoch = 0
patience = 15

remaining_patience = patience
for epoch in tqdm.tqdm(range(epochs)):
    total_loss = 0
    model.train()
    for X_batch, y_batch in loader:
        # X_batch has shape (batch_size, seq_length, num_companies). Turn it into (batch_size, seq_length, input_size)
        X_batch_all_companies_together = X_batch.view(-1, seq_length, 1)
        # y_batch has shape (batch_size). Turn it into (batch_size, output_size)

        y_pred = model(X_batch_all_companies_together)
        y_pred = y_pred.view(-1, seq_length, y_batch.shape[-1])
        loss = loss_function(y_pred, y_batch)
        writer.add_scalar("loss", loss, epoch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    # Monitor the training and testing RMSE
    model.eval()
    with torch.no_grad():
        num_of_companies_in_batch = X_batch.shape[-1]
        # Get the full testing data
        X_test_full = X_test
        X_test_full = X_test_full.view(-1, seq_length, 1)

        y_test_without_prompt = y_test[seq_length:, :, INDEX_OF_COMPANY_TO_PLOT]
        starting_sequence = X_test[0, :, INDEX_OF_COMPANY_TO_PLOT].view(
            1, -1, 1
        )  # First sequence of the first company
        num_to_generate = (
            X_test.shape[0] - seq_length
        )  # We want to generate the rest of the test data, i.e. what's remaining after this first sequence
        y_pred = model.generate_predictions(
            starting_sequence, num_to_generate
        )[:, seq_length:, :]
        test_rmse = torch.sqrt(
            loss_function(y_pred, y_test_without_prompt)
        ).item()
    writer.add_scalar("test_rmse", test_rmse, epoch)
    print(f"Epoch {epoch}: test RMSE {test_rmse:.4f}, average loss {total_loss / len(loader):.4f}")
    if test_rmse < best_test_rmse:
        best_test_rmse = test_rmse
        best_model = model.state_dict()
        best_epoch = epoch
        print(f"====> New best!")

    if epoch % log_every == 0:
        # Get the full training and testing data for one of the companies (the first one)
        y_train_full = y_train[:, -1, INDEX_OF_COMPANY_TO_PLOT]
        y_train_full = y_train_full.view(-1, 1)
            
        # Plot the predictions for the training and testing data. The training data is in blue, the testing data is in red (one goes after the other)
        plt.plot(
            range(len(y_train_full)),
            y_train_full.cpu(),
            label="Training data",
            color="blue",
            alpha=0.5,
        )
        plt.plot(
            range(len(y_train_full), len(y_train_full) + len(y_test[:,-1, INDEX_OF_COMPANY_TO_PLOT])),
            y_test[:, -1, INDEX_OF_COMPANY_TO_PLOT].cpu(),
            label="Testing data",
            color="red",
            alpha=0.5,
        )
        plt.plot(
            range(len(y_train_full) + seq_length, len(y_train_full) + len(y_test[:,-1, INDEX_OF_COMPANY_TO_PLOT])),
            y_pred.squeeze().cpu(),
            label="Predictions",
            color="green",
            alpha=0.5,
        )
        plt.title("Predictions")
        plt.legend()
        plt.show()

    if test_rmse > best_test_rmse:
        remaining_patience -= 1
        if remaining_patience == 0:
            print("Early stopping")
            break
    else:
        remaining_patience = patience

# Load the best model
model.load_state_dict(best_model)
print(f"Best epoch: {best_epoch}")

writer.close()

# Store the model
torch.save(model.state_dict(), "lstm_model.pt")
print("Model saved")

In [None]:
# Load the model from the file
model = LSTM(
    input_size=1,
    hidden_layer_size=hidden_layer_size,
    output_size=1,
    num_layers=num_layers,
    device=device,
)
model.load_state_dict(torch.load("lstm_model.pt"))

# Evaluate the model
model.eval()

# Make predictions. We'll get num_companies predictions for each test sequence.
print(f"X_test shape: {X_test.shape}")
# X_test: (num_sequences, seq_length, num_companies)
with torch.no_grad():
    y_pred = model(X_test.view(-1, seq_length, 1))
    print(f"y_pred shape: {y_pred.shape}")

# Calculate the loss
all_predictions = y_pred
print("Loss:", loss.item())
# Calculate the average difference between the predicted and actual stock prices
average_diff = torch.mean(torch.abs(all_predictions[seq_length:, ...] - y_test[seq_length:, ...]))
print("Average difference:", average_diff.item())

# Finally, we'll plot the predicted stock prices for the next day.
# We'll also plot the actual stock prices for the test data.

# Choose a color map so that each company has a different color and the colors are the same for the predicted and actual stock prices
cmap = plt.get_cmap("tab20")
colors = [cmap(i) for i in range(num_companies)] # These are rbga colors

# Plot the percentage change in stock prices for each company
# Use the colors defined above to plot the predicted and actual stock prices for each company
for i in range(1):
    plt.figure(figsize=(12, 8))
    plt.plot(all_predictions[:, i].cpu().numpy(), label=f"{companies[i]} (pred)", linestyle="--", color=colors[i])
    plt.plot(y_test[:, i].cpu().numpy(), label=f"{companies[i]} (actual)", color=colors[i])
    # y range: -1 to 1
    plt.ylim(-1, 1)
    plt.xlabel("Day")
    plt.ylabel("Stock Price")
    plt.title("Predicted vs. Actual Stock Prices")
    plt.legend()
    plt.show()

In [None]:
# Inverse transform the data
y_test_pred = scaler.inverse_transform(all_predictions.cpu().numpy())
y_test_actual = scaler.inverse_transform(y_test.cpu().numpy())

# Plot the predicted stock prices for the next day
for i in range(5):
    plt.figure(figsize=(12, 8))
    plt.plot(y_test_pred[:, i], label=f"{companies[i]} (pred)", linestyle="--", color=colors[i])
    plt.plot(y_test_actual[:, i], label=f"{companies[i]} (actual)", color=colors[i])
    plt.xlabel("Day")
    plt.ylabel("Stock Price")
    plt.title("Predicted vs. Actual Stock Prices")
    plt.legend()
    plt.show()

In [None]:
starting_sequence = X_test[0, ...].unsqueeze(dim=0) # First sequence of the first company
num_to_generate = X_test.shape[0] - seq_length # We want to generate the rest of the test data, i.e. what's remaining after this first sequence
with torch.no_grad():
    generations = model.generate_predictions(starting_sequence, num_to_generate)
display(generations)

# Plot the percentage change in stock prices for each company
for i in range(1):
    plt.figure(figsize=(12, 8))
    plt.plot(y_test[:, i].cpu().numpy(), label=f"{companies[i]} (actual)", color=colors[i])
    # Draw a vertical line after the ending of the prompt (seq_length)
    plt.axvline(x=seq_length, color='black', linestyle='--')
    plt.plot(range(seq_length, seq_length + num_to_generate), generations[0, seq_length:, i].cpu().numpy(), label=f"{companies[i]} (pred)", linestyle="--", color=colors[i])
    plt.ylim(-1, 1)
    plt.xlabel("Day")
    plt.ylabel("Stock Price")
    plt.title("Predicted vs. Actual Stock Prices")
    plt.legend()
    plt.show()