In [8]:
# YYYY.MM.DD;HH:MM;Open;High;Low;Close;Volume
import pandas as pd
import os
from functions.pre_processing import *
import ta

In [None]:
# List all CSVs (adjust path)
file_list = sorted(glob.glob("data/DAT_ASCII_EURUSD_M1_*.csv"))

# Load and concatenate
df_all = pd.concat([pd.read_csv(f, sep=';', header=None) for f in file_list], ignore_index=True)

# Save as one big file
df_all.to_csv("EURUSD_M1_combined.csv", index=False, header=False)

KeyboardInterrupt: 

Test other hours

In [4]:
df = build_eurusd_dataset("training_data/EURUSD_M1_combined.csv", hours_ahead=1)
df.to_csv("eurusd_final_dataset.csv", index=True)

In [9]:
import numpy as np

def simulate_trade(entry, highs, lows, direction, sl, tp):
    for i, (h, l) in enumerate(zip(highs, lows)):
        if direction == 1 and (h - entry) * 10_000 >= tp:
            return 'TP', i+1
        if direction == 1 and (entry - l) * 10_000 >= sl:
            return 'SL', i+1
        if direction == -1 and (entry - l) * 10_000 >= tp:
            return 'TP', i+1
        if direction == -1 and (h - entry) * 10_000 >= sl:
            return 'SL', i+1
    return 'NONE', len(highs)

def optimize_sl_tp_per_class(y, close_prices, highs, lows, sl_values, tp_values, window_size, val_size, step, class_to_direction, cost_per_trade=1.5):
    sl_tp_map = {}
    for cls in [0, 1, 3, 4]:
        best_profit = -np.inf
        best_pair = (12, 20)
        for sl in sl_values:
            for tp in tp_values:
                temp_profit = 0
                count = 0
                for start in range(0, len(y) - window_size - val_size, step):
                    val_y = y[start+window_size:start+window_size+val_size]
                    val_start = start + window_size
                    max_len = min(val_size, len(y) - val_start)

                    entry_prices = close_prices[val_start:val_start + max_len]
                    future_highs_seq = [highs[t:t+12] for t in range(val_start, val_start + max_len)]
                    future_lows_seq = [lows[t:t+12] for t in range(val_start, val_start + max_len)]
                    val_preds = val_y[:max_len]

                    for pred, entry, highs_seq, lows_seq in zip(val_preds, entry_prices, future_highs_seq, future_lows_seq):
                        if pred != cls:
                            continue
                        direction = class_to_direction[pred]
                        outcome, _ = simulate_trade(entry, highs_seq, lows_seq, direction, sl, tp)
                        if outcome == 'TP':
                            temp_profit += tp
                        elif outcome == 'SL':
                            temp_profit -= sl
                        temp_profit -= cost_per_trade
                        count += 1
                if count > 0 and temp_profit > best_profit:
                    best_profit = temp_profit
                    best_pair = (sl, tp)
        sl_tp_map[cls] = {'sl': best_pair[0], 'tp': best_pair[1]}
    sl_tp_map[2] = {'sl': None, 'tp': None}  # no-trade
    return sl_tp_map

def estimate_avg_duration_per_class(y, close_prices, highs, lows, sl_tp_map, window_size, val_size, step, class_to_direction):
    duration_by_class = {0: [], 1: [], 3: [], 4: []}
    for start in range(0, len(y) - window_size - val_size, step):
        val_y = y[start+window_size:start+window_size+val_size]
        val_start = start + window_size
        max_len = min(val_size, len(y) - val_start)

        entry_prices = close_prices[val_start:val_start + max_len]
        future_highs_seq = [highs[t:t+12] for t in range(val_start, val_start + max_len)]
        future_lows_seq = [lows[t:t+12] for t in range(val_start, val_start + max_len)]
        val_preds = val_y[:max_len]

        for pred, entry, highs_seq, lows_seq in zip(val_preds, entry_prices, future_highs_seq, future_lows_seq):
            direction = class_to_direction.get(pred, 0)
            if direction == 0:
                continue
            sltp = sl_tp_map.get(pred, {'sl': None, 'tp': None})
            if sltp['sl'] is None or sltp['tp'] is None:
                continue
            result, duration = simulate_trade(entry, highs_seq, lows_seq, direction, sltp['sl'], sltp['tp'])
            duration_by_class[pred].append(duration)

    avg_duration_by_class = {
        k: round(np.mean(v)) if v else 12 for k, v in duration_by_class.items()
    }
    return avg_duration_by_class


In [10]:
sl_tp_map = {0: {'sl': 14, 'tp': 20},
            1: {'sl': 14, 'tp': 10},
            3: {'sl': 14, 'tp': 10},
            4: {'sl': 14, 'tp': 20}}

In [11]:
avg_duration_by_class = {0: 7, 1: 7, 3: 7, 4: 7}

In [12]:
def preprocess_and_label(
    filename,
    sl_tp_map,
    avg_duration_by_class,
    cost_per_trade=1.5
):
    # ---- 1. Load raw M1 ASCII -----------------------------------------------
    file_path = os.path.join(os.getcwd(), filename)
    df = pd.read_csv(
        file_path,
        sep=',',
        header=None,
        names=['Date_Time', 'Open', 'High', 'Low', 'Close', 'Volume']
    )
    df = df.astype({'Open': float, 'High': float, 'Low': float, 'Close': float, 'Volume': float})
    df.index = pd.to_datetime(df['Date_Time'], format='%Y%m%d %H%M%S')
    df = df.drop(columns=['Date_Time'])

    # ---- 2. Resample to M5 ---------------------------------------------------
    def resample_ohlc(data, rule):
        return data.resample(rule).agg({
            "Open": "first",
            "High": "max",
            "Low":  "min",
            "Close":"last",
            "Volume":"sum"
        }).dropna()

    m5 = resample_ohlc(df, '5min')

    # ---- 3. M5 Technical Indicators -------------------------------------------
    m5["rsi"]  = ta.momentum.RSIIndicator(m5["Close"], 14).rsi()
    macd = ta.trend.MACD(m5["Close"])
    m5["macd"] = macd.macd_diff()
    m5["body"] = m5["Close"] - m5["Open"]
    m5["vol_local"] = m5["High"] - m5["Low"]

    # ---- 4. Rolling M15 / H1 Features ----------------------------------------
    m5["roll_m15_high"]  = m5["High"].rolling(3).max()
    m5["roll_m15_low"]   = m5["Low"].rolling(3).min()
    m5["roll_m15_close"] = m5["Close"].rolling(3).apply(lambda x: x.iloc[-1], raw=False)
    m5["roll_m15_trend"] = m5["Close"] - m5["Close"].rolling(3).mean()
    m5["roll_m15_body"]  = (m5["Close"] - m5["Open"]).rolling(3).mean()

    m5["roll_h1_high"]  = m5["High"].rolling(12).max()
    m5["roll_h1_low"]   = m5["Low"].rolling(12).min()
    m5["roll_h1_close"] = m5["Close"].rolling(12).apply(lambda x: x.iloc[-1], raw=False)
    m5["roll_h1_vol"]   = m5["roll_h1_high"] - m5["roll_h1_low"]
    m5["roll_h1_relpos"] = (m5["Close"] - m5["roll_h1_low"]) / (m5["roll_h1_high"] - m5["roll_h1_low"] + 1e-6)

    # ---- 5. Label using SL/TP-based durations --------------------------------
    m5['label'] = np.nan
    class_priority = [0, 1, 4, 3]  # check stronger first

    for idx in range(len(m5) - max(avg_duration_by_class.values())):
        entry = m5['Close'].iloc[idx]
        labeled = False
        for cls in class_priority:
            steps = avg_duration_by_class.get(cls, 12)
            future_high = m5['High'].iloc[idx+1 : idx+1+steps].max()
            future_low = m5['Low'].iloc[idx+1 : idx+1+steps].min()

            up_pips = (future_high - entry) * 10_000
            down_pips = (entry - future_low) * 10_000

            if cls == 0 and down_pips > 20:
                m5.at[m5.index[idx], 'label'] = 0; labeled = True; break
            elif cls == 1 and down_pips > 10:
                m5.at[m5.index[idx], 'label'] = 1; labeled = True; break
            elif cls == 4 and up_pips > 20:
                m5.at[m5.index[idx], 'label'] = 4; labeled = True; break
            elif cls == 3 and up_pips > 10:
                m5.at[m5.index[idx], 'label'] = 3; labeled = True; break

        if not labeled:
            m5.at[m5.index[idx], 'label'] = 2

    # ---- 6. Drop NaNs --------------------------------------------------------
    m5 = m5.dropna()
    return m5


In [13]:
df = preprocess_and_label(
    "training_data/EURUSD_M1_combined.csv",
    sl_tp_map,
    avg_duration_by_class,
    cost_per_trade=1.5)

df.to_csv("eurusd_final_dataset.csv", index=True)