In [None]:
import matplotlib.pyplot as plt 
import MetaTrader5 as mt5
import tensorflow as tf
import numpy as np
import pandas as pd
import tf2onnx
from sklearn.model_selection import RandomizedSearchCV
from keras.wrappers.scikit_learn import KerasClassifier
from sklearn.preprocessing import MinMaxScaler
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers import Dense, Activation, Conv1D, MaxPooling1D, Dropout
from tensorflow.python.keras.layers.recurrent import LSTM
from tensorflow.python.keras.metrics import RootMeanSquaredError as rmse
from tensorflow.keras.callbacks import EarlyStopping
from datetime import timedelta, datetime
import time

In [None]:
# Check TensorFlow version
print(tf.__version__)

In [None]:
# %%
# Check GPU support
print(len(tf.config.list_physical_devices('GPU')) > 0)

In [None]:
# Initialize MetaTrader5 for history data
if not mt5.initialize():
    print("initialize() failed, error code =", mt5.last_error())
    quit()

In [None]:
# Show terminal info
terminal_info = mt5.terminal_info()
print(terminal_info)

In [None]:
# Show file path
file_path = terminal_info.data_path + "\\MQL5\\Files\\"
print(file_path)

In [None]:
# Set start and end dates for history data
end_date = datetime.now()
start_date = end_date - timedelta(days=120)

In [None]:
# Print start and end dates
print("data start date=", start_date)
print("data end date=", end_date)


In [None]:
# Get XAUUSD rates (H1) from start_date to end_date
xauusd_rates = mt5.copy_rates_range("XAUUSDm", mt5.TIMEFRAME_H1, start_date, end_date)

In [None]:
# Check the data
print(xauusd_rates)

In [None]:
# Create DataFrame
df = pd.DataFrame(xauusd_rates)


In [None]:
# Show DataFrame head
df.head()

In [None]:
# Show DataFrame tail
df.tail()


In [None]:
# Show DataFrame shape (the number of rows and columns in the data set)
df.shape


In [None]:
# Prepare close prices only
data = df.filter(['close']).values

In [None]:
# Show close prices
plt.figure(figsize=(18,10))
plt.plot(data, 'b', label='Original')
plt.xlabel("Hours")
plt.ylabel("Price")
plt.title("XAUUSD_H1")
plt.legend()

In [None]:
# Scale data using MinMaxScaler
scaler = MinMaxScaler(feature_range=(0,1))
scaled_data = scaler.fit_transform(data)

In [None]:
# Training size is 80% of the data
training_size = int(len(scaled_data) * 0.80) 
print("training size:", training_size)

In [None]:
# Create train data and check size
train_data_initial = scaled_data[0:training_size, :]
print(len(train_data_initial))

In [None]:
# Create test data and check size
test_data_initial = scaled_data[training_size:, :1]
print(len(test_data_initial))

In [None]:
# Split a univariate sequence into samples
def split_sequence(sequence, n_steps):
    X, y = list(), list()
    for i in range(len(sequence)):
        end_ix = i + n_steps
        if end_ix > len(sequence)-1:
            break
        seq_x, seq_y = sequence[i:end_ix], sequence[end_ix]
        X.append(seq_x)
        y.append(seq_y)
    return np.array(X), np.array(y)

In [None]:
# Split into samples
time_step = 120
x_train, y_train = split_sequence(train_data_initial, time_step)
x_test, y_test = split_sequence(test_data_initial, time_step)

In [None]:
# Reshape input to be [samples, time steps, features] which is required for LSTM
x_train = x_train.reshape(x_train.shape[0], x_train.shape[1], 1)
x_test = x_test.reshape(x_test.shape[0], x_test.shape[1], 1)

In [None]:
# Show shape of train data
x_train.shape

In [None]:
# %%
# Show shape of test data
x_test.shape

In [None]:
# Create the model function for KerasClassifier
def create_model(optimizer='adam'):
    model = Sequential()
    model.add(Conv1D(filters=256, kernel_size=2, activation='relu', padding='same', input_shape=(120, 1)))
    model.add(MaxPooling1D(pool_size=2))
    model.add(LSTM(100, return_sequences=True))
    model.add(Dropout(0.3))
    model.add(LSTM(100, return_sequences=False))
    model.add(Dropout(0.3))
    model.add(Dense(units=1, activation='sigmoid'))
    model.compile(optimizer=optimizer, loss='mse', metrics=[rmse()])
    return model

In [None]:
# Create KerasClassifier with fixed epochs
model = KerasClassifier(build_fn=create_model, epochs=300, verbose=0)

In [None]:
# Define parameter grid for Grid Search
param_dist = {
    'batch_size': [32, 64],
    'optimizer': ['RMSprop', 'Adam']
}


In [None]:
# Perform Randomized Search with a limit on the number of parameter combinations to try (n_iter)
random_search = RandomizedSearchCV(estimator=model, param_distributions=param_dist, n_iter=5, n_jobs=-1, cv=2)
random_result = random_search.fit(x_train, y_train)

In [None]:
# Show best parameters and score
print("Best: %f using %s" % (random_result.best_score_, random_result.best_params_))

In [None]:
# Measure time for model fitting with best parameters
time_calc_start = time.time()

In [None]:
# Define EarlyStopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=5)

In [None]:
# Fit model with best parameters found by Randomized Search
best_params = random_result.best_params_
history = model.fit(x_train, y_train, 
                    epochs=300,  # Fixed epochs
                    validation_data=(x_test, y_test), 
                    batch_size=best_params['batch_size'], 
                    verbose=1, 
                    callbacks=[early_stopping])  # ใช้ EarlyStopping เพื่อลดเวลาการฝึก

In [None]:
# Calculate time
fit_time_seconds = time.time() - time_calc_start
print("fit time =", fit_time_seconds, " seconds.")

In [None]:
# Show iteration-loss graph for training and validation
plt.figure(figsize = (18,10))
plt.plot(history.history['loss'], label='Training Loss', color='b')
plt.plot(history.history['val_loss'], label='Validation-loss', color='g')
plt.xlabel("Iteration")
plt.ylabel("Loss")
plt.title("LOSS")
plt.legend()
plt.show()

In [None]:
# Measure time for model fitting with best parameters
# Show iteration-rmse graph for training and validation
plt.figure(figsize = (18,10))
plt.plot(history.history['root_mean_squared_error'], label='Training RMSE', color='b')
plt.plot(history.history['val_root_mean_squared_error'], label='Validation-RMSE', color='g')
plt.xlabel("Iteration")
plt.ylabel("RMSE")
plt.title("RMSE")
plt.legend()
plt.show()

In [None]:
# Evaluate training data
model.evaluate(x_train, y_train, batch_size=32)

In [None]:
# Evaluate testing data
model.evaluate(x_test, y_test, batch_size=32)

In [None]:
# Prediction using training data
train_predict = model.predict(x_train)
plot_y_train = y_train.reshape(-1,1)

In [None]:
# Show actual vs predicted (training) graph
plt.figure(figsize=(18,10))
plt.plot(scaler.inverse_transform(plot_y_train), color = 'b', label = 'Original')
plt.plot(scaler.inverse_transform(train_predict), color='red', label = 'Predicted')
plt.title("Prediction Graph Using Training Data")
plt.xlabel("Hours")
plt.ylabel("Price")
plt.legend()
plt.show()

In [None]:
# Prediction using testing data
test_predict = model.predict(x_test)
plot_y_test = y_test.reshape(-1,1)

In [None]:
# Calculate metrics
from sklearn import metrics
from sklearn.metrics import r2_score

In [None]:
# Transform data to real values
value1 = scaler.inverse_transform(plot_y_test)
value2 = scaler.inverse_transform(test_predict)

In [None]:
# Calculate score
score = np.sqrt(metrics.mean_squared_error(value1,value2))
print("RMSE         : {}".format(score))
print("MSE          :", metrics.mean_squared_error(value1,value2))
print("R2 score     :",metrics.r2_score(value1,value2))

In [None]:
# Show actual vs predicted (testing) graph
plt.figure(figsize=(18,10))
plt.plot(scaler.inverse_transform(plot_y_test), color = 'b',  label = 'Original')
plt.plot(scaler.inverse_transform(test_predict), color='g', label = 'Predicted')
plt.title("Prediction Graph Using Testing Data")
plt.xlabel("Hours")
plt.ylabel("Price")
plt.legend()
plt.show()

In [None]:
# Calculate metrics
from sklearn import metrics
from sklearn.metrics import r2_score

In [None]:
# Transform data to real values
value1 = scaler.inverse_transform(plot_y_test)
value2 = scaler.inverse_transform(test_predict)

In [None]:
# Calculate score
score = np.sqrt(metrics.mean_squared_error(value1,value2))
print("RMSE         : {}".format(score))
print("MSE          :", metrics.mean_squared_error(value1,value2))
print("R2 score     :",metrics.r2_score(value1,value2))

In [None]:
# Show actual vs predicted (testing) graph
plt.figure(figsize=(18,10))
plt.plot(scaler.inverse_transform(plot_y_test), color = 'b',  label = 'Original')
plt.plot(scaler.inverse_transform(test_predict), color='g', label = 'Predicted')
plt.title("Prediction Graph Using Testing Data")
plt.xlabel("Hours")
plt.ylabel("Price")
plt.legend()
plt.show()

In [None]:
# Finish
mt5.shutdown()