In [6]:
# Importa características de compatibilidad del futuro para asegurar
# que el código se ejecute correctamente en versiones antiguas de Python
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import backtrader as bt 
import datetime
import os
import pandas as pd
import numpy as np
import json
from backtesting.run_cerebro_ny_liquid_strategy import plotly_candlestick_with_pdh_pdl
from backtesting.get_fmp_financial_data import download_data, add_wick_info_to_csv, add_pdh_pdl_to_csv
from backtesting.financial_data_traitment import add_wick_info_to_csv, add_pdh_pdl_to_csv, obtener_datos_pdh_pdl_unicos
import matplotlib
matplotlib.use('widget')
import io
import sys
from backtesting.financial_data_traitment import save_log_to_file

download_data_flag = False #Colocar en True para descargar datos.

In [7]:
if download_data_flag:
    # Documentación oficial de la API de Financial Modeling Prep
    # Puedes consultar todos los endpoints y parámetros disponibles en:
    # https://site.financialmodelingprep.com/developer/docs/dashboard

    # Clave de acceso a la API de Financial Modeling Prep
    # Reemplaza este valor por tu propia API Key para evitar límites o restricciones
    API_KEY = 'KHaX7pC1mrq6xbHFHLY60qgDmORDUDFb'

    # Ruta donde se guardarán los datos descargados
    # Aquí se indica un directorio relativo a la ubicación del script
    OUTPUT_PATH = '../Data'

    # Llama a la función `download_data` para descargar datos históricos
    # Parámetros:
    #   1. API_KEY: Clave de acceso a la API
    #   2. "2025-01-01": Fecha de inicio del rango de datos
    #   3. "2025-08-08": Fecha de fin del rango de datos
    #   4. "5min": Intervalo de tiempo de las velas (5 minutos)
    #   5. "EURUSD": Símbolo del activo (par de divisas Euro/Dólar estadounidense)
    #   6. OUTPUT_PATH: Carpeta donde se guardará el archivo con los datos descargados
    download_data(API_KEY, "2025-01-01", "2025-08-14", "5min", "EURUSD", OUTPUT_PATH)

In [8]:
path_csv = '../Data/EURUSD_5min_20250101_20250814.csv'
add_wick_info_to_csv(path_csv)

Columnas 'upper_wick', 'lower_wick' y 'total_wick' agregadas a ../Data/EURUSD_5min_20250101_20250814.csv. Total de registros: 2219


In [9]:
# Definir path_csv como variable global
path_csv = '../Data/EURUSD_5min_20250101_20250814.csv'

class NewYorkLiquidityStrategy(bt.Strategy):
    params = (
        ('rr_ratio', 2.0),
        ('stop_loss_pips', 20),
        ('max_trades_per_day', 2),
        ('max_weekly_loss', -3.0),
        ('risk_per_trade', 1.0),
        ('new_york_session_start', 8),
        ('new_york_session_end', 17),
        ('active_trading_hours', (8, 11)),
    )

    def __init__(self):
        self.current_date = None
        self.current_week = None
        self.trades_today = 0
        self.weekly_pnl = 0.0
        self.trade_count = 0
        self.pdh_values = self._precalculate_pdh_pdl('high')
        # ⭐ Corrección aquí
        # De 'self._precalculate_pdl('low')' a 'self._precalculate_pdh_pdl('low')'
        self.pdl_values = self._precalculate_pdh_pdl('low')
        # ⭐ Añadido: Pre-calcular todos los FVG
        self.fvg_values = self._precalculate_fvg()
        self.pdh = None
        self.pdl = None
        # ⭐ Cambiado: fvg_zone_list ahora es una lista
        self.fvg_zone_list = []
        self.fvg_direction = None
        self.pip_value = 0.0001
        if hasattr(self.data, '_name') and 'XAUUSD' in str(self.data._name):
            self.pip_value = 0.1
        self.pdh_points = []
        self.pdl_points = []
        self.log("ESTRATEGIA INICIALIZADA", f"Parámetros: RR={self.p.rr_ratio}, SL={self.p.stop_loss_pips}pips")



    def _precalculate_pdh_pdl(self, column):
        """Precalcula los valores de PDH o PDL para cada día"""
        datapath = os.path.join(path_csv)
        data = pd.read_csv(datapath)
        data['date'] = pd.to_datetime(data['date'])
        data.set_index('date', inplace=True)
        daily_data = data.groupby(data.index.date)
        if column == 'high':
            daily_values = daily_data['high'].max()
        else:
            daily_values = daily_data['low'].min()
        return daily_values.to_dict()

    def _precalculate_fvg(self):
        """Precalcula todos los FVG y los almacena en un diccionario de listas por fecha"""
        datapath = os.path.join(path_csv)
        df = pd.read_csv(datapath)
        df['date'] = pd.to_datetime(df['date'])
        
        fvg_dict = {}
        # Iterar sobre el DataFrame para encontrar todos los FVG
        for i in range(2, len(df)):
            current_date = df.loc[i, 'date'].date()
            if current_date not in fvg_dict:
                fvg_dict[current_date] = []
            
            # FVG Alcista (Bullish FVG)
            if df.loc[i, 'low'] > df.loc[i-2, 'high']:
                fvg_zone = (df.loc[i-2, 'high'], df.loc[i, 'low'])
                fvg_dict[current_date].append({'zone': fvg_zone, 'type': 'bull'})
                
            # FVG Bajista (Bearish FVG)
            if df.loc[i, 'high'] < df.loc[i-2, 'low']:
                fvg_zone = (df.loc[i, 'high'], df.loc[i-2, 'low'])
                fvg_dict[current_date].append({'zone': fvg_zone, 'type': 'bear'})
                
        return fvg_dict

    def next(self):
        dt = self.data.datetime.datetime(0)
        
        if self.current_date is None or self.current_date != dt.date():
            self._start_new_day(dt)
            
        if self.current_week != dt.isocalendar()[1]:
            self.current_week = dt.isocalendar()[1]
            self.weekly_pnl = 0.0
            self.log("NUEVA SEMANA", f"Reinicio PnL semanal. Semana #{self.current_week}")

        if self._check_trading_limits():
            return
        
        if not (self.p.active_trading_hours[0] <= dt.hour < self.p.active_trading_hours[1]):
            return
        
        if self.order:
            return
        
        if self.pdh is not None and self.pdl is not None:
            self._check_pdh_pdl_rejection()
            self._check_liquidity_sweep()
            self._check_fvg_formation()
            self._check_fvg_fill_confirmation()

    def _start_new_day(self, dt):
        self.current_date = dt.date()
        self.trades_today = 0
        prev_date = self._get_previous_trading_day(self.current_date)
        
        if prev_date in self.pdh_values:
            self.pdh = self.pdh_values[prev_date]
            self.pdl = self.pdl_values[prev_date]
            self.pdh_points.append((dt, self.pdh))
            self.pdl_points.append((dt, self.pdl))
            self.log("PDH/PDL ACTUALIZADOS", 
                f"Fecha de referencia: {prev_date} | "
                f"PDH: {self.pdh:.5f} | PDL: {self.pdl:.5f}")
            try:
                add_pdh_pdl_to_csv(path_csv, prev_date.strftime('%Y-%m-%d'))
                self.log("ACTUALIZACIÓN CSV", f"PDH/PDL para {prev_date} escritos en el CSV.")
            except Exception as e:
                self.log("ERROR CSV", f"No se pudo escribir PDH/PDL en el CSV: {e}")
        else:
            self.pdh = None
            self.pdl = None
            self.log("ADVERTENCIA", f"No se encontraron datos para la fecha de referencia: {prev_date}")
        
        self.fvg_zone_list = self.fvg_values.get(self.current_date, [])
        self.log("NUEVO DÍA", f"Iniciando {dt.date()} - Precio apertura: {self.data.open[0]:.5f}")
        for fvg in self.fvg_zone_list:
            self.log("FVG DEL DÍA", f"Encontrado FVG {fvg['type']} en {fvg['zone'][0]:.5f}-{fvg['zone'][1]:.5f}")


    def _get_previous_trading_day(self, current_date):
        prev_date = current_date - datetime.timedelta(days=1)
        while prev_date.weekday() >= 5:
            prev_date -= datetime.timedelta(days=1)
        return prev_date

    def _check_pdh_pdl_rejection(self):
        if (self.data.high[0] > self.pdh and 
            self.data.close[0] < self.pdh and 
            self.data.open[0] < self.data.close[0]):
            self.log("POTENCIAL TRADE", f"Rechazo bajista tras ruptura PDH: Ruptura en {self.data.high[0]:.5f}, Cierre {self.data.close[0]:.5f}")
        if (self.data.low[0] < self.pdl and 
            self.data.close[0] > self.pdl and 
            self.data.open[0] > self.data.close[0]):
            self.log("POTENCIAL TRADE", f"Rechazo alcista tras ruptura PDL: Ruptura en {self.data.low[0]:.5f}, Cierre {self.data.close[0]:.5f}")

    def _check_trading_limits(self):
        if self.trades_today >= self.p.max_trades_per_day:
            return True
        if self.weekly_pnl <= self.p.max_weekly_loss:
            self.log("LÍMITE PÉRDIDAS", f"Pérdida semanal máxima alcanzada ({self.weekly_pnl:.2f}%)")
            return True
        return False

    def _check_liquidity_sweep(self):
        if self.pdh is None or self.pdl is None:
            return
        price = self.data.close[0]
        if price > self.pdh:
            if self.fvg_direction != 'short':
                self.log("BARRIDO LIQUIDEZ", f"Ruptura PDH {self.pdh:.5f} @ {price:.5f} - Esperando reversión")
            self.fvg_direction = 'short'
        elif price < self.pdl:
            if self.fvg_direction != 'long':
                self.log("BARRIDO LIQUIDEZ", f"Ruptura PDL {self.pdl:.5f} @ {price:.5f} - Esperando reversión")
            self.fvg_direction = 'long'

    def _check_fvg_formation(self):
        # Lógica modificada para iterar sobre la lista de FVG
        if not self.fvg_zone_list:
            return
        
        # Aquí puedes decidir cómo manejar múltiples FVG. Por ejemplo, podrías
        # buscar una entrada en el FVG más cercano o el primero que se llene.
        # Por ahora, la lógica simplemente confirma su existencia.
        for fvg in self.fvg_zone_list:
            self.log("FVG DETECTADO", 
                f"FVG {fvg['type']} precalculado: "
                f"{fvg['zone'][0]:.5f}-{fvg['zone'][1]:.5f}")

    def _check_fvg_fill_confirmation(self):
        # Lógica modificada para iterar sobre la lista de FVG
        if not self.fvg_zone_list:
            return
        
        price = self.data.close[0]
        for fvg in self.fvg_zone_list:
            fvg_low, fvg_high = min(fvg['zone']), max(fvg['zone'])
            fvg_type = fvg['type']
            
            if fvg_type == 'bull' and fvg_low <= price <= fvg_high:
                self.log("FVG RELLENO", f"Precio {price:.5f} dentro FVG {fvg_low:.5f}-{fvg_high:.5f}")
                if self._has_confirmation_candle():
                    self._enter_trade(fvg_type)
                    
            elif fvg_type == 'bear' and fvg_low <= price <= fvg_high:
                self.log("FVG RELLENO", f"Precio {price:.5f} dentro FVG {fvg_low:.5f}-{fvg_high:.5f}")
                if self._has_confirmation_candle():
                    self._enter_trade(fvg_type)

    def _has_confirmation_candle(self):
        body_size = abs(self.data.close[0] - self.data.open[0])
        min_body = 10 * self.pip_value
        if body_size < min_body:
            return False
        if self.fvg_direction == 'long':
            confirm = self.data.close[0] > self.data.open[0] and self.data.close[0] > self.data.close[-1]
            self.log("CONFIRMACIÓN", f"Vela alcista: {confirm} (Cierre: {self.data.close[0]:.5f} > Anterior: {self.data.close[-1]:.5f})")
            return confirm
        else:
            confirm = self.data.close[0] < self.data.open[0] and self.data.close[0] < self.data.close[-1]
            self.log("CONFIRMACIÓN", f"Vela bajista: {confirm} (Cierre: {self.data.close[0]:.5f} < Anterior: {self.data.close[-1]:.5f})")
            return confirm

    def _enter_trade(self, direction):
        self.fvg_direction = direction
        risk_pct = self.p.risk_per_trade / (2 if self.trades_today >= 1 else 1)
        risk_amount = self.broker.getvalue() * risk_pct / 100
        if self.fvg_direction == 'long':
            sl_price = self.pdl - self.p.stop_loss_pips * self.pip_value
        else:
            sl_price = self.pdh + self.p.stop_loss_pips * self.pip_value
        sl_distance = abs(self.data.close[0] - sl_price)
        if sl_distance == 0:
            self.log("ERROR", "Distancia de Stop Loss es cero. No se puede calcular el tamaño de la posición.")
            return
        # Ajuste para obtener el valor del pip del instrumento
        sizing = self.broker.getcommissioninfo(self.data)
        value_per_point = sizing.getsize(1, self.data.close[0]) / self.data.close[0]
        position_size = risk_amount / (sl_distance * value_per_point)
        tp_price = self.data.close[0] + (sl_distance * self.p.rr_ratio * (1 if self.fvg_direction == 'long' else -1))
        if self.fvg_direction == 'long':
            self.order = self.buy(size=position_size, exectype=bt.Order.Market)
            self.sell(exectype=bt.Order.Stop, price=sl_price, parent=self.order)
            self.sell(exectype=bt.Order.Limit, price=tp_price, parent=self.order)
        else:
            self.order = self.sell(size=position_size, exectype=bt.Order.Market)
            self.buy(exectype=bt.Order.Stop, price=sl_price, parent=self.order)
            self.buy(exectype=bt.Order.Limit, price=tp_price, parent=self.order)
        self.trades_today += 1
        self.trade_count += 1
        self.log("OPERACIÓN EJECUTADA", 
                 f"{self.fvg_direction.upper()} @ {self.data.close[0]:.5f} | "
                 f"SL: {sl_price:.5f} | TP: {tp_price:.5f} | "
                 f"Size: {position_size:.2f} | Risk: {risk_pct:.1f}%")

    def notify_order(self, order):
        if order.status in [order.Submitted, order.Accepted]:
            return
        if order.status in [order.Completed]:
            dir = 'BUY' if order.isbuy() else 'SELL'
            self.log("ORDEN EJECUTADA",
                     f"{dir} @ {order.executed.price:.5f} | "
                     f"Cost: {order.executed.value:.2f} | Comm: {order.executed.comm:.2f}")
            self.order = None
        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            self.log("ORDEN RECHAZADA", f"Estado: {order.getstatusname()}")
            self.order = None

    def notify_trade(self, trade):
        if trade.isclosed:
            self.weekly_pnl += trade.pnlcomm
            self.log("OPERACIÓN CERRADA",
                     f"Resultado: {'GANANCIA' if trade.pnlcomm >=0 else 'PÉRDIDA'} | "
                     f"Net: {trade.pnlcomm:.2f} | PnL semanal: {self.weekly_pnl:.2f}")

    def log(self, action, message):
        dt = self.data.datetime.datetime(0)
        print(f"[{dt.strftime('%Y-%m-%d %H:%M:%S')}] [{action}] {message}")


# Bloque principal mejorado para graficar PDH/PDL con Plotly
def run_and_plot_with_pdh_pdl():
    cerebro = bt.Cerebro()
    cerebro.addstrategy(NewYorkLiquidityStrategy)
    datapath = os.path.join(path_csv)
    
    # Configurar el data feed con manejo de timezone
    data = bt.feeds.GenericCSVData(
        dataname=datapath,
        dtformat='%Y-%m-%d %H:%M:%S%z',  # Agregado %z para manejar el timezone
        datetime=0, open=1, high=2, low=3, close=4, volume=3, openinterest=-1,
        fromdate=datetime.datetime(2025, 8, 1),
        todate=datetime.datetime(2025, 8, 14),
        reverse=False,
        tz='America/Bogota'  # Especificar el timezone
    )
    
    cerebro.adddata(data)
    cerebro.broker.setcash(100000.0)
    cerebro.broker.setcommission(commission=0.0002)
    cerebro.addsizer(bt.sizers.FixedSize, stake=10)
    
    # Ejecutar y obtener la instancia de la estrategia
    strategies = cerebro.run()
    strat_instance = strategies[0]
    
    # Graficar con los puntos PDH/PDL
    plotly_candlestick_with_pdh_pdl(
        datapath,
        pdh_points=strat_instance.pdh_points,
        pdl_points=strat_instance.pdl_points,
        darkmode=True
    )

if __name__ == '__main__':
    # Capturar la salida estándar
    log_buffer = io.StringIO()
    sys_stdout_original = sys.stdout
    sys.stdout = log_buffer

    try:
        run_and_plot_with_pdh_pdl()
    finally:
        sys.stdout = sys_stdout_original
        log_text = log_buffer.getvalue()
        save_log_to_file(log_text)
        log_buffer.close()

Log guardado en: logs/output.log


In [10]:
file_name = '../Data/EURUSD_5min_20250101_20250814.csv'
data_completa = obtener_datos_pdh_pdl_unicos(file_name)

if data_completa:
    # Utiliza json.dumps para indentar y formatear la salida
    json_output = json.dumps(data_completa, indent=4)
    print("Diccionario con fechas, PDH y PDL únicos (formato JSON indentado):")
    print(json_output)

Diccionario con fechas, PDH y PDL únicos (formato JSON indentado):
{
    "2025-08-04": {
        "pdh": 1.15883,
        "pdl": 1.15312
    },
    "2025-08-05": {
        "pdh": 1.15881,
        "pdl": 1.15278
    },
    "2025-08-06": {
        "pdh": 1.16988,
        "pdl": 1.15743
    },
    "2025-08-07": {
        "pdh": 1.16848,
        "pdl": 1.1611
    },
    "2025-08-11": {
        "pdh": 1.16582,
        "pdl": 1.15902
    },
    "2025-08-12": {
        "pdh": 1.17207,
        "pdl": 1.15988
    },
    "2025-08-13": {
        "pdh": 1.17303,
        "pdl": 1.1668
    }
}
