In [145]:
import warnings
warnings.filterwarnings("ignore")
import os
import ta
import time
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import norm
from sklearn.model_selection import TimeSeriesSplit,train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import seaborn as sns
from statsmodels.nonparametric.smoothers_lowess import lowess
from pykalman import KalmanFilter

In [146]:
pair = 'AUDCAD'

In [147]:
def STK(close, low, high, n):
    
    STK = ((close - low.rolling(n).min()) / (high.rolling(n).max() - low.rolling(n).min())) * 100
    
    return STK

def rolling_std(df, col, window):

    df = df.copy()

    df[f'rolling_Std_{window}'] = df[col].rolling(window=window).std()

    return df 

def rsi_lowess(df, col, n, frac=0.01):
    # RSI
    rsi_period = int(n)
    df['RSI'] = ta.momentum.RSIIndicator(df[col], window=rsi_period).rsi()

    df['RSI'].fillna(0, inplace=True)

    # Convert Timestamp to numbers.
    numerical_index = df.index.map(pd.Timestamp.timestamp)

    # Compute RSI Lowess
    smoothed_rsi = lowess(df['RSI'], numerical_index, frac=frac, it=3)

    # RSI_Lowess
    df['RSI_lowess'] = smoothed_rsi[:, 1]

    return df
    
def cmma(df, close_col, high_col, low_col, lookback, atr_length):

    n = len(df)
    output = np.zeros(n)
    front_bad = max(lookback, atr_length)

    for i in range(front_bad, n):
        # ATR
        tr = np.maximum(
            df[high_col].iloc[i - atr_length + 1:i + 1] - df[low_col].iloc[i - atr_length + 1:i + 1],
            np.abs(df[high_col].iloc[i - atr_length + 1:i + 1] - df[close_col].iloc[i - atr_length:i].shift(1)),
            np.abs(df[low_col].iloc[i - atr_length + 1:i + 1] - df[close_col].iloc[i - atr_length:i].shift(1))
        )
        atr = np.mean(tr)

        # LOG AVERAGE
        log_avg = np.mean(np.log(df[close_col].iloc[i - lookback:i]))

        if atr > 0:
            denom = atr * np.sqrt(lookback + 1)
            z_score = (np.log(df[close_col].iloc[i]) - log_avg) / denom
            output[i] = 100.0 * norm.cdf(z_score) - 50.0
        else:
            output[i] = 0.0

    df[f"cmma_{lookback}"] = output

    return df
    
def kalman_filter(df, close_col, window=None):

    # Extraer los valores de la columna
    valores = df[close_col].values

    # Configurar el filtro de Kalman
    kf = KalmanFilter(transition_matrices=[1],
                      observation_matrices=[1],
                      initial_state_mean=0,
                      initial_state_covariance=1,
                      observation_covariance=1,
                      transition_covariance=0.01)

    # Aplicar el filtro de Kalman
    state_means, _ = kf.filter(valores)

    # Agregar la nueva columna con los valores filtrados al DataFrame
    df[f"kalman_{close_col}"] = state_means

    # Si se proporciona un window, aplicar rolling
    if window:
        df[f"kalman_{close_col}_{window}"] = df[f"kalman_{close_col}"].rolling(window=window).mean()

    return df
    
def create_features(df):
    # Crear las características solicitadas
    df['%K005']      = STK(df['close'], df['low'], df['high'], 5)
    df['%K010']      = STK(df['close'], df['low'], df['high'], 10)
    df['%K015']      = STK(df['close'], df['low'], df['high'], 15)
    df['%K020']      = STK(df['close'], df['low'], df['high'], 20)
    df['sma_5']      = df['close'].rolling(window=5).mean()
    df['sma_10']     = df['close'].rolling(window=10).mean()
    df['sma_15']     = df['close'].rolling(window=15).mean()
    df['sma_20']     = df['close'].rolling(window=20).mean()
    df['std_5']      = df['close'].rolling(window=5).std()
    df['std_10']     = df['close'].rolling(window=10).std()
    df['std_15']     = df['close'].rolling(window=15).std()
    df['std_20']     = df['close'].rolling(window=20).std()
    df['RSI']        = ta.momentum.RSIIndicator(df['close'], int(20)).rsi()
    df['RSI_50']     = (ta.momentum.RSIIndicator(df['close'], int(20)).rsi())-50
    df = rsi_lowess(df, col='close', n=14, frac=0.01) 
    df = cmma(df, 'close', 'high', 'low', lookback=5, atr_length=200)
    df = cmma(df, 'close', 'high', 'low', lookback=10, atr_length=200)
    df = cmma(df, 'close', 'high', 'low', lookback=15, atr_length=200)
    df = cmma(df, 'close', 'high', 'low', lookback=21, atr_length=200)
    df = cmma(df, 'close', 'high', 'low', lookback=25, atr_length=200)
    df = kalman_filter(df, 'close', window=5)

def future_returns_label(df, n):
    df_copy = df.copy()
    #df_copy["log_close"] = np.log(df_copy["close"])
    df_copy["log_close"] = df_copy["close"]
    df_copy[f"fut_ret_{n}"] = (df_copy["log_close"].shift(-n) - df_copy["log_close"]) / df_copy["log_close"]
    
    # Reemplazar 0 por un valor pequeño y eliminar NaNs
    df_copy = df_copy.replace(0, 1e-10).dropna()
    
    # Añadir la columna con la etiqueta -1 si es negativo, 0 si es positivo
    df_copy[f"label_{n}"] = np.where(df_copy[f"fut_ret_{n}"] > 0, 1, 0)
    
    return df_copy

In [148]:
file_path = f"C:/Users/jlaho/Desktop/Quantreo/Quant/Data/FixTimeBars/{pair}_H4_MT5_READY.csv"
df = pd.read_csv(file_path, index_col="time", parse_dates=True)

In [149]:
tscv = TimeSeriesSplit(n_splits=5)
n = 1
create_features(df)
df_labeled = future_returns_label(df, n)
print(df_labeled.columns)

Index(['open', 'high', 'low', 'close', 'tick_volume', 'spread', 'real_volume',
       'low_time', 'high_time', '%K005', '%K010', '%K015', '%K020', 'sma_5',
       'sma_10', 'sma_15', 'sma_20', 'std_5', 'std_10', 'std_15', 'std_20',
       'RSI', 'RSI_50', 'RSI_lowess', 'cmma_5', 'cmma_10', 'cmma_15',
       'cmma_21', 'cmma_25', 'kalman_close', 'log_close', 'fut_ret_1',
       'label_1'],
      dtype='object')


In [150]:
list_X = ['%K005','%K020',
          'sma_5',
          'std_5','std_20',
          'RSI_50',
          'kalman_close',
          'cmma_5','cmma_21']

col_y = f"label_{n}"
file_path = 'df_labeled.xlsx'
df_labeled.to_excel(file_path, index=True)

X = df_labeled[list_X]
col_y = f"label_{n}"  # Columna objetivo que contiene las etiquetas binarias
# Seleccionar la columna objetivo para y
y = df_labeled[col_y]  # Etiquetas (target)
# División en conjunto de entrenamiento y prueba
train_size = int(len(df_labeled) * 0.8)

# Seleccionar X_train y y_train de manera equivalente
X_train = X.iloc[:train_size, :]  
y_train = y.iloc[:train_size]     
# Seleccionar X_test y y_test
X_test = X.iloc[train_size:, :]  
y_test = y.iloc[train_size:] 

# Guardar X_train en un archivo Excel
y_train.to_excel(train_file_path, index=True)
train_file_path = 'y_train.xlsx'
test_file_path = 'y_test.xlsx'
#print(f"X_train se ha guardado en '{train_file_path}'")

# Guardar X_test en un archivo Excel
y_test.to_excel(test_file_path, index=True)
#print(f"X_test se ha guardado en '{test_file_path}'")

model = RandomForestClassifier(random_state=42)
param_grid = {
    'bootstrap': [True,False],
    'criterion': ["gini","entropy"],
    'max_depth': [6,8],
    'min_samples_leaf': [2,4],
    'min_samples_split': [2,4],
    'n_estimators': [10,100]
  }
grid_search = GridSearchCV(estimator=model, param_grid=param_grid, scoring='accuracy', cv=tscv, verbose=1, n_jobs=-1)
grid_search.fit(X_train, y_train) 
        
# BEST
best_model       = grid_search.best_estimator_
best_params      = grid_search.best_params_
best_valid_score = grid_search.best_score_ 
print("Best params:", grid_search.best_params_)
print("Best accury:", grid_search.best_score_)

Fitting 5 folds for each of 64 candidates, totalling 320 fits
Best params: {'bootstrap': True, 'criterion': 'entropy', 'max_depth': 6, 'min_samples_leaf': 2, 'min_samples_split': 2, 'n_estimators': 100}
Best accury: 0.5201802704056083


In [151]:
model = RandomForestClassifier(
    random_state=42,
    bootstrap=True,
    criterion="gini",
    max_depth=12,
    min_samples_leaf=6,
    min_samples_split=2,
    n_estimators=100
)

# Entrenar el modelo directamente sin validación cruzada
model.fit(X_train, y_train)

# Hacer predicciones en el conjunto de entrenamiento
y_train_pred = model.predict(X_train)

# Calcular la exactitud en el conjunto de entrenamiento
train_accuracy = accuracy_score(y_train, y_train_pred)
print(f"Accuracy en conjunto de entrenamiento: {train_accuracy:.4f}")

# Hacer predicciones en el conjunto de prueba
y_test_pred = model.predict(X_test)

# Calcular la exactitud en el conjunto de prueba
test_accuracy = accuracy_score(y_test, y_test_pred)
print(f"Accuracy en conjunto de prueba: {test_accuracy:.4f}")

Accuracy en conjunto de entrenamiento: 0.8068
Accuracy en conjunto de prueba: 0.5103
