In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error

# Load datasets
train_data = pd.read_csv('Google_Stock_Price_Train.csv')
test_data = pd.read_csv('Google_Stock_Price_Test.csv')


train_prices = train_data['Open'].values.reshape(-1, 1)
test_prices = test_data['Open'].values.reshape(-1, 1)


all_prices = np.vstack([train_prices, test_prices])
scaler = MinMaxScaler(feature_range=(0, 1))
all_prices_scaled = scaler.fit_transform(all_prices)


train_prices_scaled = all_prices_scaled[:len(train_prices)]
test_prices_scaled = all_prices_scaled[len(train_prices):]


seq_length = 5

# Create sequences for RNN input
def create_sequences(data, seq_length):
    sequences = []
    targets = []
    for i in range(len(data) - seq_length):
        sequences.append(data[i:i + seq_length])
        targets.append(data[i + seq_length])
    return np.array(sequences), np.array(targets)

X_train_full, y_train_full = create_sequences(train_prices_scaled, seq_length)
X_train, X_val, y_train, y_val = train_test_split(X_train_full, y_train_full, test_size=0.2, random_state=42)

X_test, y_test = create_sequences(test_prices_scaled, seq_length)


print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)


X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)


class StockDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = StockDataset(X_train_tensor, y_train_tensor)
val_dataset = StockDataset(X_val_tensor, y_val_tensor)
test_dataset = StockDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# LSTM Model Definition
class LSTMStockPredictor(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMStockPredictor, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h_0 = torch.zeros(num_layers, x.size(0), hidden_size).to(x.device)
        c_0 = torch.zeros(num_layers, x.size(0), hidden_size).to(x.device)
        out, _ = self.lstm(x, (h_0, c_0))
        out = self.fc(out[:, -1, :])  # Use the last time step
        return out

# Model Hyperparameters
input_size = 1
hidden_size = 50
num_layers = 2
output_size = 1
learning_rate = 0.01
num_epochs = 20

model = LSTMStockPredictor(input_size, hidden_size, num_layers, output_size)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


train_losses = []
val_losses = []

# Training the model
for epoch in range(num_epochs):
    model.train()
    total_train_loss = 0
    for X_batch, y_batch in train_loader:
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()
    
    train_losses.append(total_train_loss / len(train_loader))
    
    # Validation Loss
    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            outputs = model(X_batch)
            val_loss = criterion(outputs, y_batch)
            total_val_loss += val_loss.item()
    val_losses.append(total_val_loss / len(val_loader))
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}")

# Test Predictions
model.eval()
test_predictions = []
test_actuals = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = model(X_batch)
        test_predictions.extend(outputs.numpy())
        test_actuals.extend(y_batch.numpy())


test_predictions = scaler.inverse_transform(np.array(test_predictions).reshape(-1, 1))
test_actuals = scaler.inverse_transform(np.array(test_actuals).reshape(-1, 1))

# Compute Metrics
mse = mean_squared_error(test_actuals, test_predictions)
rmse = np.sqrt(mse)
mae = mean_absolute_error(test_actuals, test_predictions)

print(f"Test MSE: {mse:.4f}")
print(f"Test RMSE: {rmse:.4f}")
print(f"Test MAE: {mae:.4f}")

# Visualization: Actual vs Predicted Prices
x_axis = np.arange(len(test_actuals))
plt.figure(figsize=(12, 6))
plt.plot(x_axis, test_actuals, label="Actual Prices", color="blue", linewidth=2)
plt.plot(x_axis, test_predictions, label="Predicted Prices", color="orange", linewidth=2)
plt.legend()
plt.title("Stock Price Prediction")
plt.xlabel("Time (Test Data Points)")
plt.ylabel("Stock Price")
plt.grid(True)
plt.show()

# Visualization: Training and Validation Loss
plt.figure(figsize=(10, 6))
plt.plot(range(1, len(train_losses) + 1), train_losses, label="Training Loss")
plt.plot(range(1, len(val_losses) + 1), val_losses, label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss (MSE)")
plt.title("Training and Validation Loss Curve")
plt.legend()
plt.grid()
plt.show()

# Visualization: Residuals Distribution
residuals = test_actuals.flatten() - test_predictions.flatten()
plt.figure(figsize=(10, 6))
plt.hist(residuals, bins=20, color='gray', edgecolor='black')
plt.title("Residuals Distribution")
plt.xlabel("Residual (Actual - Predicted)")
plt.ylabel("Frequency")
plt.grid()
plt.show()
