In [None]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import yfinance as yf
import matplotlib.pyplot as plt

def create_sequences(data, seq_length):
    xs = []
    ys = []
    for i in range(len(data)-seq_length-1):
        x = data[i:(i+seq_length)]
        y = data[i+seq_length]
        xs.append(x)
        ys.append(y)
    return np.array(xs), np.array(ys)

class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_layer_size, num_layers, output_size):
        super(RNNModel, self).__init__()
        self.hidden_layer_size = hidden_layer_size
        self.rnn = nn.RNN(input_size, hidden_layer_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_layer_size, output_size)
    
    def forward(self, input_seq):
        rnn_out, _ = self.rnn(input_seq)
        predictions = self.linear(rnn_out[:, -1])
        return predictions
    
def train_model(model, train_loader, criterion, optimizer, num_epochs=100):
    for epoch in range(num_epochs):
        for seqs, labels in train_loader:
            optimizer.zero_grad()
            y_pred = model(seqs)
            loss = criterion(y_pred, labels)
            loss.backward()
            optimizer.step()
        if epoch % 10 == 0:
            print(f'Epoch {epoch} Loss: {loss.item()}')

def evaluate_model(model, test_loader):
    with torch.no_grad():
        predictions, actuals = [], []
        for seqs, labels in test_loader:
            output = model(seqs)
            predictions.extend(output.view(-1).tolist())
            actuals.extend(labels.view(-1).tolist())
    predictions = np.array(predictions)
    actuals = np.array(actuals)
    return predictions, actuals

# Function to perform the training process
def train_and_validate_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=100):
    for epoch in range(num_epochs):
        # Training step
        model.train()
        for seqs, labels in train_loader:
            optimizer.zero_grad()
            y_pred = model(seqs)
            loss = criterion(y_pred, labels)
            loss.backward()
            optimizer.step()
        
        # Validation step
        model.eval()
        val_losses = []
        with torch.no_grad():
            for seqs, labels in val_loader:
                y_pred = model(seqs)
                loss = criterion(y_pred, labels)
                val_losses.append(loss.item())
        val_loss = np.mean(val_losses)

        if epoch % 10 == 0:
            print(f'Epoch {epoch} | Training Loss: {loss.item()} | Validation Loss: {val_loss}')

def plot_predictions(model, loader, scaler):
    model.eval()  # Set the model to evaluation mode
    predictions = []
    actuals = []
    with torch.no_grad():
        for seqs, labels in loader:
            preds = model(seqs)  # Predict
            # Inverse transform predictions and actual labels
            preds = scaler.inverse_transform(preds.cpu().numpy()).flatten().tolist()
            labels = scaler.inverse_transform(labels.cpu().numpy()).flatten().tolist()
            predictions.extend(preds)
            actuals.extend(labels)
    
    # Plotting
    plt.figure(figsize=(10, 5))
    plt.plot(actuals, label='Actual Price', color='blue', marker='o')
    plt.plot(predictions, label='Predicted Price', color='red', linestyle='--')
    plt.title('Apple Stock Price Prediction')
    plt.xlabel('Time')
    plt.ylabel('Stock Price')
    plt.legend()
    plt.show()

# Parameters
seq_length = 5
input_size = 1
hidden_layer_size = 50
num_layers = 2
output_size = 1
batch_size = 64
learning_rate = 0.001
num_epochs = 100

aapl_df = yf.download('AAPL', start='2018-01-01', end='2024-01-15')
aapl_df[['Close']]
scaler = MinMaxScaler(feature_range=(-1, 1))
data_normalized = scaler.fit_transform(aapl_df.values.reshape(-1, 1))


'''

# Create sequences
seq_length = 5
X, y = create_sequences(data_normalized, seq_length)

# Split the data into training, validation, and testing sets
X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.25, random_state=42)

# Convert to tensors and create DataLoader for batch processing
train_data = torch.utils.data.TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train))
train_loader = torch.utils.data.DataLoader(dataset=train_data, batch_size=64, shuffle=True)
val_data = torch.utils.data.TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val))
val_loader = torch.utils.data.DataLoader(dataset=val_data, batch_size=64, shuffle=False)

# Model initialization
model = RNNModel(input_size, hidden_layer_size, num_layers, output_size)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Train and validate the model
train_and_validate_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=100)

# Forecasting future stock prices
# For a simple demonstration, let's predict the next day's price using the latest data from X_test
model.eval()
with torch.no_grad():
    last_sequence = torch.FloatTensor(X_test[-1:]).to(torch.float32)  # Last sequence from the test set
    predicted_normalized_price = model(last_sequence).item()  # Model's prediction
    predicted_price = scaler.inverse_transform([[predicted_normalized_price]])[0][0]  # Inverse transform to get the actual price
    print(f'Predicted Price: {predicted_price}')

plot_predictions(model, val_loader, scaler)
'''

In [None]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import yfinance as yf
import matplotlib.pyplot as plt


def create_sequences(data, seq_length):
    xs = []
    ys = []
    for i in range(len(data)-seq_length-1):
        x = data[i:(i+seq_length)]
        y = data[i+seq_length]
        xs.append(x)
        ys.append(y)
    return np.array(xs), np.array(ys)

class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_layer_size, num_layers, output_size):
        super(RNNModel, self).__init__()
        self.hidden_layer_size = hidden_layer_size
        self.rnn = nn.RNN(input_size, hidden_layer_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_layer_size, output_size)
    
    def forward(self, input_seq):
        rnn_out, _ = self.rnn(input_seq)
        predictions = self.linear(rnn_out[:, -1])
        return predictions
    
def train_model(model, train_loader, criterion, optimizer, num_epochs=100):
    for epoch in range(num_epochs):
        for seqs, labels in train_loader:
            optimizer.zero_grad()
            y_pred = model(seqs)
            loss = criterion(y_pred, labels)
            loss.backward()
            optimizer.step()
        if epoch % 10 == 0:
            print(f'Epoch {epoch} Loss: {loss.item()}')

def evaluate_model(model, test_loader):
    with torch.no_grad():
        predictions, actuals = [], []
        for seqs, labels in test_loader:
            output = model(seqs)
            predictions.extend(output.view(-1).tolist())
            actuals.extend(labels.view(-1).tolist())
    predictions = np.array(predictions)
    actuals = np.array(actuals)
    return predictions, actuals

def train_and_validate_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=100):
    for epoch in range(num_epochs):
        # Training step
        model.train()
        for seqs, labels in train_loader:
            optimizer.zero_grad()
            y_pred = model(seqs)
            loss = criterion(y_pred, labels)
            loss.backward()
            optimizer.step()
        
        # Validation step
        model.eval()
        val_losses = []
        with torch.no_grad():
            for seqs, labels in val_loader:
                y_pred = model(seqs)
                loss = criterion(y_pred, labels)
                val_losses.append(loss.item())
        val_loss = np.mean(val_losses)

        if epoch % 10 == 0:
            print(f'Epoch {epoch} | Training Loss: {loss.item()} | Validation Loss: {val_loss}')

'''
def plot_predictions(model, loader, scaler, y_test):
    model.eval()  # Set the model to evaluation mode
    predictions = []
    actuals = []
    with torch.no_grad():
        for seqs, labels in loader:
            preds = model(seqs)  # Predict
            # Inverse transform predictions and actual labels
            preds = scaler.inverse_transform(preds.cpu().numpy()).flatten().tolist()
            labels = scaler.inverse_transform(labels.cpu().numpy()).flatten().tolist()
            predictions.extend(preds)
            actuals.extend(labels)

    print(y_test)
    
    dates = aapl_df.index[-len(y_test):]  # This assumes y_test is not shuffled and is in order

    def plot_predictions_with_dates(predictions, actuals, dates):
        plt.figure(figsize=(15, 7))
        plt.plot(dates, actuals, label='Actual Price', color='blue', marker='o')
        plt.plot(dates, predictions, label='Predicted Price', color='red', linestyle='--')
        plt.title('Apple Stock Price Prediction')
        plt.xlabel('Date')
        plt.xticks(rotation=45)  # Rotate dates for better readability
        plt.ylabel('Stock Price')
        plt.legend()
        plt.tight_layout()  # Adjust layout to fit date labels
        plt.show()

    plot_predictions_with_dates(predictions, actuals, dates)
'''
    
def plot_predictions_with_dates(predictions, actuals, dates):
    # Sort by dates
    sorted_indices = np.argsort(dates)
    sorted_dates = dates[sorted_indices]
    sorted_predictions = np.array(predictions)[sorted_indices]
    sorted_actuals = np.array(actuals)[sorted_indices]
    
    plt.figure(figsize=(15, 7))
    plt.plot(sorted_dates, sorted_actuals, label='Actual Price', color='blue', marker='o')
    plt.plot(sorted_dates, sorted_predictions, label='Predicted Price', color='red', linestyle='--')
    plt.title('Apple Stock Price Prediction')
    plt.xlabel('Date')
    plt.xticks(rotation=45)
    plt.ylabel('Stock Price')
    plt.legend()
    plt.tight_layout()
    plt.show()

seq_length = 5
input_size = 1
hidden_layer_size = 50
num_layers = 2
output_size = 1
batch_size = 64
learning_rate = 0.001
num_epochs = 100

aapl_df = yf.download('AAPL', start='2018-01-01', end='2024-01-15')
aapl_df = aapl_df[['Close']]

scaler = MinMaxScaler(feature_range=(-1, 1))
data_normalized = scaler.fit_transform(aapl_df.values.reshape(-1, 1))

X, y = create_sequences(data_normalized, seq_length)

# Split the data into training, validation, and testing sets
X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.25, random_state=42)

# Convert to tensors and create DataLoader for batch processing
train_data = torch.utils.data.TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train))
train_loader = torch.utils.data.DataLoader(dataset=train_data, batch_size=64, shuffle=True)
val_data = torch.utils.data.TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val))
val_loader = torch.utils.data.DataLoader(dataset=val_data, batch_size=64, shuffle=False)

X_temp, X_test, y_temp, y_test, idx_temp, idx_test = train_test_split(
    X, y, range(len(X)), test_size=0.2, random_state=42, shuffle=False
)
X_train, X_val, y_train, y_val, idx_train, idx_val = train_test_split(
    X_temp, y_temp, idx_temp, test_size=0.25, random_state=42, shuffle=False
)

test_dates = aapl_df.iloc[idx_test].index
test_data = torch.utils.data.TensorDataset(torch.FloatTensor(X_test), torch.FloatTensor(y_test))
test_loader = torch.utils.data.DataLoader(dataset=test_data, batch_size=64, shuffle=False)

sorted_indices = np.argsort(test_dates)
sorted_dates = test_dates[sorted_indices]

X_test_sorted = X_test[sorted_indices]
y_test_sorted = y_test[sorted_indices]

test_data_sorted = torch.utils.data.TensorDataset(torch.FloatTensor(X_test_sorted), torch.FloatTensor(y_test_sorted))
test_loader_sorted = torch.utils.data.DataLoader(dataset=test_data_sorted, batch_size=64, shuffle=False)


model = RNNModel(input_size, hidden_layer_size, num_layers, output_size)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

train_and_validate_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=100)

predictions, actuals = evaluate_model(model, test_loader)
mse = mean_squared_error(actuals, predictions)
rmse = np.sqrt(mse)
mae = mean_absolute_error(actuals, predictions)
r2 = r2_score(actuals, predictions)
print(f'MSE: {mse}')
print(f'RMSE: {rmse}')
print(f'MAE: {mae}')
print(f'R-squared: {r2}')

model.eval()
with torch.no_grad():
    last_sequence = torch.FloatTensor(X_test[-1:]).to(torch.float32)
    predicted_normalized_price = model(last_sequence).item()
    predicted_price = scaler.inverse_transform([[predicted_normalized_price]])[0][0]
    print(f'Predicted Price: {predicted_price}')


plot_predictions_with_dates(predictions, actuals, test_dates)

In [None]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import yfinance as yf
import matplotlib.pyplot as plt

def create_sequences(data, seq_length):
    xs, ys = [], []
    for i in range(len(data)-seq_length-1):
        xs.append(data[i:(i+seq_length)])
        ys.append(data[i+seq_length])
    return np.array(xs), np.array(ys)

class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_layer_size, num_layers, output_size, dropout_prob):
        super(RNNModel, self).__init__()
        self.rnn = nn.RNN(input_size, hidden_layer_size, num_layers, batch_first=True, dropout=dropout_prob)
        self.linear = nn.Linear(hidden_layer_size, output_size)
    
    def forward(self, input_seq):
        rnn_out, _ = self.rnn(input_seq)
        return self.linear(rnn_out[:, -1])

def train_and_validate_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=100):
    for epoch in range(num_epochs):
        model.train()
        for seqs, labels in train_loader:
            optimizer.zero_grad()
            y_pred = model(seqs)
            loss = criterion(y_pred, labels)
            loss.backward()
            optimizer.step()
        
        model.eval()
        val_losses = []
        with torch.no_grad():
            for seqs, labels in val_loader:
                y_pred = model(seqs)
                val_losses.append(criterion(y_pred, labels).item())
        val_loss = np.mean(val_losses)

        if epoch % 10 == 0:
            print(f'Epoch {epoch} | Training Loss: {loss.item()} | Validation Loss: {val_loss}')

def evaluate_model(model, test_loader):
    predictions, actuals = [], []
    with torch.no_grad():
        for seqs, labels in test_loader:
            output = model(seqs)
            predictions.extend(output.view(-1).tolist())
            actuals.extend(labels.view(-1).tolist())
    return np.array(predictions), np.array(actuals)

def plot_predictions_with_dates(predictions, actuals, dates):
    plt.figure(figsize=(15, 7))
    plt.plot(dates, actuals, label='Actual Price', color='blue', marker='o')
    plt.plot(dates, predictions, label='Predicted Price', color='red', linestyle='--')
    plt.title('Stock Price Prediction')
    plt.xlabel('Date')
    plt.xticks(rotation=45)
    plt.ylabel('Stock Price')
    plt.legend()
    plt.tight_layout()
    plt.show()


# Load data and split into training, validation and test set
aapl_df = yf.download('AAPL', start='2005-01-01', end='2024-01-01')
data_normalized = MinMaxScaler(feature_range=(-1, 1)).fit_transform(aapl_df[['Close']].values.reshape(-1, 1))
X, y = create_sequences(data_normalized, seq_length=21)
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, shuffle=False)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, shuffle=False)

# Convert to tensors and create DataLoader for batch processing
train_loader = torch.utils.data.DataLoader(dataset=torch.utils.data.TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)), batch_size=64, shuffle=True)
val_loader = torch.utils.data.DataLoader(dataset=torch.utils.data.TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)), batch_size=64, shuffle=False)
test_loader = torch.utils.data.DataLoader(dataset=torch.utils.data.TensorDataset(torch.FloatTensor(X_test), torch.FloatTensor(y_test)), batch_size=64, shuffle=False)

# Model initialization and training
model = RNNModel(input_size=1, hidden_layer_size=100, num_layers=2, output_size=1, dropout_prob=0.25)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
train_and_validate_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=200)

# Evaluation
predictions, actuals = evaluate_model(model, test_loader)
print(f'MSE: {mean_squared_error(actuals, predictions)}')
print(f'RMSE: {np.sqrt(mean_squared_error(actuals, predictions))}')
print(f'MAE: {mean_absolute_error(actuals, predictions)}')
print(f'R-squared: {r2_score(actuals, predictions)}')

# Plotting predictions
plot_predictions_with_dates(predictions, actuals, aapl_df.index[-len(predictions):])


In [None]:
# Rewrite Code for RNN Model to make it device independent
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import yfinance as yf
import matplotlib.pyplot as plt

def create_sequences(data, seq_length):
    xs, ys = [], []
    for i in range(len(data)-seq_length-1):
        xs.append(data[i:(i+seq_length)])
        ys.append(data[i+seq_length])
    return np.array(xs), np.array(ys)

class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_layer_size, num_layers, output_size, dropout_prob):
        super(RNNModel, self).__init__()
        self.rnn = nn.RNN(input_size, hidden_layer_size, num_layers, batch_first=True, dropout=dropout_prob)
        self.linear = nn.Linear(hidden_layer_size, output_size)
    
    def forward(self, input_seq):
        rnn_out, _ = self.rnn(input_seq)
        return self.linear(rnn_out[:, -1])

def train_and_validate_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=100):
    for epoch in range(num_epochs):
        model.train()
        for seqs, labels in train_loader:
            seqs, labels = seqs.to(device), labels.to(device)
            optimizer.zero_grad()
            y_pred = model(seqs)
            loss = criterion(y_pred, labels)
            loss.backward()
            optimizer.step()
        
        model.eval()
        val_losses = []
        with torch.no_grad():
            for seqs, labels in val_loader:
                seqs, labels = seqs.to(device), labels.to(device)
                y_pred = model(seqs)
                val_losses.append(criterion(y_pred, labels).item())
        val_loss = np.mean(val_losses)

        if epoch % 10 == 0:
            print(f'Epoch {epoch} | Training Loss: {loss.item()} | Validation Loss: {val_loss}')

def evaluate_model(model, test_loader):
    predictions, actuals = [], []
    with torch.no_grad():
        for seqs, labels in test_loader:
            seqs, labels = seqs.to(device), labels.to(device)
            output = model(seqs)
            predictions.extend(output.view(-1).tolist())
            actuals.extend(labels.view(-1).tolist())
    return np.array(predictions), np.array(actuals)

def plot_predictions_with_dates(predictions, actuals, dates):
    plt.figure(figsize=(15, 7))
    plt.plot(dates, actuals, label='Actual Price', color='blue', marker='o')
    plt.plot(dates, predictions, label='Predicted Price', color='red', linestyle='--')
    plt.title('Stock Price Prediction')
    plt.xlabel('Date')
    plt.xticks(rotation=45)
    plt.ylabel('Stock Price')
    plt.legend()
    plt.tight_layout()
    plt.show()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load data and split into training, validation and test set
aapl_df = yf.download('AAPL', start='2005-01-01', end='2024-01-01')
data_normalized = MinMaxScaler(feature_range=(-1, 1)).fit_transform(aapl_df[['Close']].values.reshape(-1, 1))
X, y = create_sequences(data_normalized, seq_length=5)
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, shuffle=False)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, shuffle=False)

# Convert to tensors and create DataLoader for batch processing
train_loader = torch.utils.data.DataLoader(dataset=torch.utils.data.TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)), batch_size=64, shuffle=True)
val_loader = torch.utils.data.DataLoader(dataset=torch.utils.data.TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)), batch_size=64, shuffle=False)
test_loader = torch.utils.data.DataLoader(dataset=torch.utils.data.TensorDataset(torch.FloatTensor(X_test), torch.FloatTensor(y_test)), batch_size=64, shuffle=False)

# Model initialization and training
model = RNNModel(input_size=1, hidden_layer_size=50, num_layers=2, output_size=1, dropout_prob=0.2).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
train_and_validate_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=100)

# Evaluation
predictions, actuals = evaluate_model(model, test_loader)
print(f'MSE: {mean_squared_error(actuals, predictions)}')
print(f'RMSE: {np.sqrt(mean_squared_error(actuals, predictions))}')
print(f'MAE: {mean_absolute_error(actuals, predictions)}')
print(f'R-squared: {r2_score(actuals, predictions)}')

# Plotting predictions
plot_predictions_with_dates(predictions, actuals, aapl_df.index[-len(predictions):])