In [1]:
import torch
import torch.nn as nn
import pandas as pd
import ta
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
from torch.ao.nn.quantized.functional import conv2d
from torch.multiprocessing import start_processes
from torch.utils.data import Dataset, DataLoader
import random

TARGET_COL = 3
LOOKBACK = 120
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 8
HIDDEN_SIZE = 256
CONV1_SIZE = 32
NUM_LAYERS = 4
OUTPUT_SIZE = 1
EPOCHS = 10
NUM_ROWS = 98000

df = pd.read_csv("EURH1.csv")
df = df.iloc[:,2:-2]

df['LOG'] = np.log(df['close'] / df['close'].shift(1))
df['PCTC'] = df['close'].pct_change()
df['HLD'] = df['high'] - df['low']
df['OCD'] = df['close'] - df['open']

df = df.bfill()

features = df.columns.tolist()

scaler = StandardScaler()
main_scaled_df = scaler.fit_transform(df)
scaled_df = main_scaled_df[:-999, :]
scaled_df = scaled_df[-NUM_ROWS:]



In [None]:

def create_dataset(data):
    X,y = [],[]
    for i in range(len(data) - LOOKBACK - 1):
        X.append(data[i:(i + LOOKBACK), :])  # Take all columns
        y.append(data[i + LOOKBACK, TARGET_COL])

    return np.array(X), np.array(y)

X,y = create_dataset(scaled_df)

X_train, X_test = train_test_split(X, test_size=0.2, shuffle=False)  # shuffle=False for time series
y_train, y_test = train_test_split(y, test_size=0.2, shuffle=False)  # shuffle=False for time series

X_train = torch.tensor(X_train).to(device)
y_train = torch.tensor(y_train).to(device)
X_test = torch.tensor(X_test).to(device)
y_test = torch.tensor(y_test).to(device)

class StockDataset(Dataset):
    def __init__(self, features, targets):
        self.features = features
        self.targets = targets

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.targets[idx]

train_dataset = StockDataset(X_train, y_train)
test_dataset = StockDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size,conv_channels, output_size, num_layers, dropout = 0.0, kernel_size = 3):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # CNN layer for feature extraction
        self.conv1d = nn.Conv1d(in_channels=input_size, out_channels=conv_channels, kernel_size=3, padding=1)

        # LSTM layer for temporal modeling
        self.lstm = nn.LSTM(conv_channels, hidden_size, num_layers, batch_first=True)

        # Fully connected layer for output
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Apply CNN to the input (batch_size, sequence_length, features)
        x = x.permute(0, 2, 1) # Reshape for Conv1d
        x = self.conv1d(x)
        x = x.permute(0, 2, 1) # Reshape back for LSTM

        # Pass through LSTM
        h_0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c_0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h_0, c_0))

        # Take the output from the last time step and pass to the linear layer
        out = self.fc(out[:, -1, :])
        return out

num_features = scaled_df.shape[1]
model = LSTM(input_size=num_features, hidden_size=HIDDEN_SIZE, conv_channels=CONV1_SIZE,output_size=OUTPUT_SIZE,num_layers=NUM_LAYERS).to(device)
from torch import optim

optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
criterion = nn.MSELoss()
running_loss = 0.0
avg_train_loss = 0.0

for epoch in range(EPOCHS):
    model.train()
    running_loss = 0.0
    for x_batch, y_batch in train_loader:

        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        optimizer.zero_grad()

        y_pred = model(x_batch.float())
        loss = criterion(y_pred.flatten(), y_batch.float())

        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {avg_train_loss:.5f}")

model.eval()

predictions = []
actuals = []

running_loss = 0.0
avg_test_loss = 0.0
with torch.no_grad():
    for x_batch, y_batch in test_loader:
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        y_pred = model(x_batch.float())
        predictions.extend(y_pred.flatten().cpu().numpy())
        actuals.extend(y_batch.flatten().cpu().numpy())
        loss = criterion(y_pred.flatten(), y_batch.float())
        running_loss += loss.item()
    avg_test_loss = running_loss / len(test_loader)
    print(f"Loss: {avg_test_loss:.5f}")

    predictions = np.array(predictions)

actuals = np.array(actuals)

def inverse_close(scaled_close):
    dummy = np.zeros((scaled_close.shape[0], len(features)))
    dummy[:, TARGET_COL] = scaled_close
    return scaler.inverse_transform(dummy)[:, TARGET_COL]

actual_prices = inverse_close(actuals.flatten())
pred_prices_test = inverse_close(predictions.flatten())


# Plot predictions vs real values
plt.figure(figsize=(10,5))
plt.plot(actual_prices[-10:], label="Real")
plt.plot(pred_prices_test[-10:], label="Predicted")
plt.legend()
plt.title("Test Results: Real vs Predicted")
plt.show()

model.eval()

Epoch 1/20, Loss: 0.00814
Epoch 2/20, Loss: 0.00021
Epoch 3/20, Loss: 0.00020
Epoch 4/20, Loss: 0.00019
Epoch 5/20, Loss: 0.00018
Epoch 6/20, Loss: 0.00018
Epoch 7/20, Loss: 0.00018
Epoch 8/20, Loss: 0.00018
Epoch 9/20, Loss: 0.00017
Epoch 10/20, Loss: 0.00017
Epoch 11/20, Loss: 0.00017
Epoch 12/20, Loss: 0.00017
Epoch 13/20, Loss: 0.00017
Epoch 14/20, Loss: 0.00017
Epoch 15/20, Loss: 0.00017
Epoch 16/20, Loss: 0.00016
Epoch 17/20, Loss: 0.00016


In [None]:


for n in range(0, 10):
    num = random.randint(99120, 99998)

    # Get the real target value
    target_value = df.iloc[num + 1, TARGET_COL]  # adjust if needed

    # Get the last sequence
    last_seq = main_scaled_df[num - 120:num]

    # Scale using the same scaler (fit earlier)
    input_seq = torch.tensor(last_seq, dtype=torch.float32).unsqueeze(0).to(device)

    # Predict next scaled value
    with torch.no_grad():
        next_scaled = model(input_seq.float())
        next_scaled = next_scaled.item()

    # Convert scaled prediction to real value
    dummy = np.zeros((1, main_scaled_df.shape[1]))
    dummy[0, TARGET_COL] = next_scaled

    dummy_df = pd.DataFrame(dummy, columns=features)
    next_pred = scaler.inverse_transform(dummy_df)[0, TARGET_COL]

    # Last close value
    last_close = df.iloc[num, TARGET_COL]

    print("Last Close:", last_close)
    print("Next predicted value:", next_pred)
    print("Next real value:", target_value)
    print("Residual:", next_pred - target_value)

    # Compare directions
    if (next_pred > last_close and target_value > last_close) or \
       (next_pred < last_close and target_value < last_close):
        res = True
    else:
        res = False

    print("Result:", res)
    print("________________________________________________________")
