In [None]:
import MetaTrader5 as mt5
import tensorflow as tf
import numpy as np
import pandas as pd
import tf2onnx
import keras
from datetime import timedelta, datetime
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import Dense, Activation, Conv1D, MaxPooling1D, Dropout, Flatten, LSTM
from keras.metrics import RootMeanSquaredError as rmse
from tensorflow.keras import callbacks
from sys import argv

# Constants
inp_history_size = 120
sample_size = inp_history_size * 3 * 20
symbol = "EURUSD"
optional = "D1_2024"
inp_model_name = f"{symbol}_{optional}.onnx"

# Initialize MetaTrader 5
if not mt5.initialize():
    print("initialize() failed, error code =", mt5.last_error())
    quit()

# Data paths
data_path = argv[0]
last_index = data_path.rfind("\\") + 1
data_path = data_path[0:last_index]
print("Data path to save ONNX model:", data_path)

# File path for saving
terminal_info = mt5.terminal_info()
file_path = terminal_info.data_path + "\\MQL5\\Files\\"
print("File path to save ONNX model:", file_path)

# Set start and end dates for history data
end_date = datetime(2024, 1, 1, 0)
start_date = end_date - timedelta(days=inp_history_size * 20 * 3)
print("Data start date =", start_date)
print("Data end date =", end_date)

# Get rates
eurusd_rates = mt5.copy_rates_from(symbol, mt5.TIMEFRAME_D1, end_date, sample_size)

# Scale data
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(
    eurusd_rates
)  # Assume 'eurusd_rates' is the correct data

# Split data into training and testing
training_size = int(len(scaled_data) * 0.80)
print("Training size:", training_size)
train_data_initial = scaled_data[:training_size, :]
test_data_initial = scaled_data[training_size:, :1]


# Function to split a univariate sequence into samples
def split_sequence(sequence, n_steps):
    X, y = [], []
    for i in range(len(sequence)):
        end_ix = i + n_steps
        if end_ix > len(sequence) - 1:
            break
        seq_x, seq_y = sequence[i:end_ix], sequence[end_ix]
        X.append(seq_x)
        y.append(seq_y)
    return np.array(X), np.array(y)


# Split into samples
time_step = inp_history_size
x_train, y_train = split_sequence(train_data_initial, time_step)
x_test, y_test = split_sequence(test_data_initial, time_step)

# Reshape input to be [samples, time steps, features]
x_train = x_train.reshape(x_train.shape[0], x_train.shape[1], 1)
x_test = x_test.reshape(x_test.shape[0], x_test.shape[1], 1)

# Define the model
model = Sequential()
model.add(
    Conv1D(
        filters=256,
        kernel_size=2,
        strides=1,
        padding="same",
        activation="relu",
        input_shape=(inp_history_size, 1),
    )
)
model.add(MaxPooling1D(pool_size=2))
model.add(LSTM(100, return_sequences=True))
model.add(Dropout(0.3))
model.add(LSTM(100, return_sequences=False))
model.add(Dropout(0.3))
model.add(Dense(units=1, activation="sigmoid"))
model.compile(optimizer="adam", loss="mse", metrics=[rmse()])

# Set up early stopping
early_stopping = callbacks.EarlyStopping(
    monitor="val_loss",
    patience=20,
    restore_best_weights=True,
)

# Model training
history = model.fit(
    x_train,
    y_train,
    epochs=300,
    validation_data=(x_test, y_test),
    batch_size=32,
    callbacks=[early_stopping],
    verbose=2,
)

# Evaluate training data
train_loss, train_rmse = model.evaluate(x_train, y_train, batch_size=32)
print(f"Train loss={train_loss:.3f}")
print(f"Train RMSE={train_rmse:.3f}")

# Evaluate testing data
test_loss, test_rmse = model.evaluate(x_test, y_test, batch_size=32)
print(f"Test loss={test_loss:.3f}")
print(f"Test RMSE={test_rmse:.3f}")


# Define a function to represent the model
@tf.function(input_signature=[tf.TensorSpec([None, inp_history_size, 1], tf.float32)])
def model_function(x):
    return model(x)


# Convert the model to ONNX
output_path = data_path + inp_model_name
onnx_model, _ = tf2onnx.convert.from_function(
    model_function,
    input_signature=[tf.TensorSpec([None, inp_history_size, 1], tf.float32)],
    opset=13,
    output_path=output_path,
)

print(f"Saved ONNX model to {output_path}")

# Save model to ONNX in both paths
output_path = file_path + inp_model_name
onnx_model = tf2onnx.convert.from_keras(model, output_path=output_path)
print(f"Saved model to {output_path}")