In [None]:
from utils import load_ticker, split_data, min_max_scale, exp_mov_avg_smooth
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
import torch.nn as nn
import numpy as np
import torch

In [None]:
date, adj_close, data_df = load_ticker()
plt.figure(figsize=(10, 6))
plt.plot(date, adj_close)
plt.title('Adj Close Price vs. Time')
plt.xlabel('Time')
plt.ylabel('Adj Close')
plt.show()

In [None]:
x_train, y_train, x_test, y_test = split_data(date, adj_close)

y_train, scaler = min_max_scale(y_train)
y_test = scaler.transform(y_test.reshape(-1, 1)).reshape(-1)

y_train = exp_mov_avg_smooth(y_train)

In [None]:
# Define the LSTM model
class LSTMModel(nn.Module):
    def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])  # Only take the output of the last time step
        return out

# Prepare data for the LSTM
sequence_length = 30  # Define the sequence length

def create_sequences(data, seq_length):
    sequences = []
    for i in range(len(data) - seq_length):
        seq = data[i:i + seq_length]
        label = data[i + seq_length]
        sequences.append((seq, label))
    return sequences

train_sequences = create_sequences(y_train, sequence_length)
test_sequences = create_sequences(y_test, sequence_length)

# Convert to tensors
train_x = torch.tensor([seq[0] for seq in train_sequences], dtype=torch.float32).unsqueeze(-1)
train_y = torch.tensor([seq[1] for seq in train_sequences], dtype=torch.float32).unsqueeze(-1)

test_x = torch.tensor([seq[0] for seq in test_sequences], dtype=torch.float32).unsqueeze(-1)
test_y = torch.tensor([seq[1] for seq in test_sequences], dtype=torch.float32).unsqueeze(-1)

# Create data loaders
batch_size = 64
train_loader = DataLoader(TensorDataset(train_x, train_y), batch_size=batch_size, shuffle=True)
test_loader = DataLoader(TensorDataset(test_x, test_y), batch_size=batch_size, shuffle=False)

# Initialize the model, loss function, and optimizer
model = LSTMModel()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Train the model
epochs = 50
for epoch in range(epochs):
    model.train()
    epoch_loss = 0
    for batch_x, batch_y in train_loader:
        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss / len(train_loader):.4f}")

# Evaluate on test data
model.eval()
predictions = []
actuals = []
with torch.no_grad():
    for batch_x, batch_y in test_loader:
        outputs = model(batch_x)
        predictions.append(outputs.numpy())
        actuals.append(batch_y.numpy())

predictions = np.concatenate(predictions).reshape(-1)
actuals = np.concatenate(actuals).reshape(-1)

# Inverse transform the predictions
predictions_actual = scaler.inverse_transform(predictions.reshape(-1, 1)).reshape(-1)
actuals_actual = scaler.inverse_transform(actuals.reshape(-1, 1)).reshape(-1)

# Plot the results
plt.figure(figsize=(12, 6))
plt.plot(range(len(actuals_actual)), actuals_actual, label="Actual Prices")
plt.plot(range(len(predictions_actual)), predictions_actual, label="Predicted Prices")
plt.xlabel("Time")
plt.ylabel("Price")
plt.title("Actual vs Predicted Prices")
plt.legend()
plt.show()