# Libraries

In [None]:
from __future__ import annotations

import asyncio
import datetime as dt
import json
import logging
import os
import random
import re
import sys
import traceback
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple

import matplotlib.pyplot as plt
import nest_asyncio
import numpy as np
import pandas as pd
import requests
import talib as ta
import warnings

try:
    from metaapi_cloud_sdk import MetaApi
    from metaapi_cloud_sdk.clients.timeout_exception import TimeoutException
except Exception:  # pragma: no cover - fallback when SDK is unavailable
    MetaApi = None  # type: ignore[assignment]

    class TimeoutException(Exception):
        """Fallback TimeoutException when metaapi_cloud_sdk is unavailable."""
        pass

nest_asyncio.apply()
warnings.filterwarnings("ignore")
plt.style.use("seaborn-v0_8-darkgrid")



# Set_Up

In [None]:
@dataclass(frozen=True)
class StrategyConfig:
    symbol: str
    timeframe: str
    candles: int
    lot: float
    comment: str
    lengths: Tuple[int, int, int, int]
    smooths: Tuple[int, int, int, int]
    initial_sl: float
    first_step_atr: float
    gap_first_step_atr: float
    magic: int
    region: str
    data_path: str
    meta_api_token: str
    account_id: str


def _int_env(name: str, default: int) -> int:
    return int(os.environ.get(name, str(default)))


def _float_env(name: str, default: float) -> float:
    return float(os.environ.get(name, str(default)))


def _str_env(name: str, default: str) -> str:
    return os.environ.get(name, default)


_DEFAULT_SYMBOL = "XAUUSD"
_SYMBOL = _str_env("STRATEGY_SYMBOL", _DEFAULT_SYMBOL)
_DATA_DIR = Path(_str_env("STRATEGY_DATA_DIR", "."))
_DATA_DIR.mkdir(parents=True, exist_ok=True)
_file_name = _str_env("STRATEGY_FILE_PATH", f"{_SYMBOL.lower()}_data.csv")
_file_path = Path(_file_name)
if not _file_path.is_absolute():
    _file_path = _DATA_DIR / _file_path

_LENGTHS = (
    _int_env("STRATEGY_LENGTH_1", 600),
    _int_env("STRATEGY_LENGTH_2", 520),
    _int_env("STRATEGY_LENGTH_3", 710),
    _int_env("STRATEGY_LENGTH_4", 1130),
)
_SMOOTHS = (
    _int_env("STRATEGY_SMOOTH_1", 3),
    _int_env("STRATEGY_SMOOTH_2", 3),
    _int_env("STRATEGY_SMOOTH_3", 3),
    _int_env("STRATEGY_SMOOTH_4", 7),
)

CONFIG = StrategyConfig(
    symbol=_SYMBOL,
    timeframe=_str_env("STRATEGY_TIMEFRAME", "1m"),
    candles=_int_env("STRATEGY_CANDLE_NUMBER", 900),
    lot=_float_env("STRATEGY_LOT", 0.5),
    comment=_str_env("STRATEGY_COMMENT", "Kalman"),
    lengths=_LENGTHS,
    smooths=_SMOOTHS,
    initial_sl=_float_env("STRATEGY_INITIAL_SL", -2.0),
    first_step_atr=_float_env("STRATEGY_FIRST_STEP_ATR", 0.5),
    gap_first_step_atr=_float_env("STRATEGY_GAP_FIRST_STEP_ATR", 2.0),
    magic=_int_env("STRATEGY_MAGIC", 900001),
    region=_str_env("META_API_REGION", "london"),
    data_path=str(_file_path),
    meta_api_token=_str_env("META_API_TOKEN", ""),
    account_id=_str_env("META_API_ACCOUNT_ID", ""),
)

SYMBOL = CONFIG.symbol
FILE_PATH = CONFIG.data_path
CANDEL_NUMBER = CONFIG.candles
LOT = CONFIG.lot
LOT_ = CONFIG.lot
COMMENT = CONFIG.comment
COMMENT_ = CONFIG.comment
time_frame_data = CONFIG.timeframe
length_1, length_2, length_3, length_4 = CONFIG.lengths
smooth_1, smooth_2, smooth_3, smooth_4 = CONFIG.smooths
INITIAL_SL = CONFIG.initial_sl
FIRST_STEP_ATR = CONFIG.first_step_atr
GAP_FIRST_STEP_ATR = CONFIG.gap_first_step_atr
MAGIC = CONFIG.magic
META_API_TOKEN = CONFIG.meta_api_token
ACCOUNT_ID = CONFIG.account_id
REGION = CONFIG.region





In [None]:
# cache simple en módulo
__CONNECTION_CHECKED = False
__ACCOUNT_CONN: Optional[Tuple[object, object]] = None  # (account, rpc_conn)

async def _connect_and_validate_async(token: str, account_id: str) -> Tuple[object, object]:
    """
    Conecta vía RPC y espera sincronización. Lanza excepción si no se logra.
    Devuelve (account, rpc_conn).
    """
    api = MetaApi(token)
    account = await api.metatrader_account_api.get_account(account_id)

    # refrescamos para leer estado/connectionStatus
    try:
        await account.reload()
    except Exception:
        pass

    # Conexión RPC + sincronización del terminal
    rpc_conn = account.get_rpc_connection()
    await rpc_conn.connect()
    await rpc_conn.wait_synchronized()  # espera a que el terminal esté listo

    # Sonda rápida para confirmar conectividad real con el terminal
    try:
        _ = await rpc_conn.get_account_information()
    except Exception:
        # si falla la sonda, igual devolvemos la conexión (ya sincronizada)
        pass

    return account, rpc_conn

def _run(coro):
    """Ejecuta corutinas tanto en script como en notebook."""
    try:
        return asyncio.run(coro)
    except RuntimeError:
        # evento ya corriendo (Jupyter): usamos el loop actual
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(coro)

def check_connection_once(token: str, account_id: str) -> bool:
    """
    Valida la conexión y sincronización SOLO la primera vez que se llama.
    En llamadas posteriores no vuelve a conectar.
    """
    global __CONNECTION_CHECKED, __ACCOUNT_CONN
    if __CONNECTION_CHECKED:
        print("ℹ️ Conexión ya validada en esta sesión; no se repite.")
        return True

    try:
        account, rpc_conn = _run(_connect_and_validate_async(token, account_id))
        __ACCOUNT_CONN = (account, rpc_conn)
        __CONNECTION_CHECKED = True
        print(f"✅ Conectado y sincronizado con MetaApi. account_id={account_id}")
        return True
    except TimeoutException as e:
        print(f"❌ Timeout esperando sincronización. ¿La cuenta está CONNECTED al broker? Detalle: {e}")
        return False
    except Exception as e:
        print(f"❌ No fue posible validar la conexión. Error: {e}")
        return False

def _safe_json_dump(value):
    try:
        return json.dumps(value, indent=2, default=str)
    except Exception:
        return str(value)

def print_order_error_details(ctx: dict, err: Exception):
    """Pretty-print as much structured info as we can from MetaApi errors."""
    print("\n" + "✘" * 70)
    print("❌ Order failed")
    print("• Exception type:", type(err).__name__)
    print("• Message       :", str(err))

    # Known useful attributes often present on MetaApi exceptions
    for attr in ("details", "error", "status", "code", "description", "response", "body"):
        if hasattr(err, attr):
            val = getattr(err, attr)
            if val:
                print(f"• {attr:12}: {_safe_json_dump(val)}")

    # Try to parse a JSON object embedded in the message (common in SDKs)
    msg = str(err)
    m = re.search(r"\{.*\}", msg)
    if m:
        try:
            payload = json.loads(m.group(0))
            print("• parsed_json  :", _safe_json_dump(payload))
        except Exception:
            pass

    # Stack (useful while debugging)
    print("• traceback    :")
    traceback.print_exc()

    # Context of the attempt
    print("• context      :", _safe_json_dump(ctx))
    print("✘" * 70 + "\n")


In [None]:
if META_API_TOKEN and ACCOUNT_ID:
    check_connection_once(META_API_TOKEN, ACCOUNT_ID)
else:
    logging.info("MetaApi credentials not provided; connection check skipped.")



# Real_Life

In [None]:
async def main():
    """
    Bucle principal:
      • Crea/migra el CSV inicial (ATR interno, señales Kalman).
      • Cada 5 minutos procesa la última vela cerrada y, si corresponde,
        abre una operación con stop-loss.
    """

    if not META_API_TOKEN or not ACCOUNT_ID:
        raise RuntimeError("MetaApi credentials are not configured.")

    # ───────────────────────────────────────────────────────────────────
    # 0) Conexión MetaApi / RPC
    # ───────────────────────────────────────────────────────────────────
    account  = await connect_metaapi(META_API_TOKEN, ACCOUNT_ID)
    rpc_conn = account.get_rpc_connection()
    await rpc_conn.connect()

    # ───────────────────────────────────────────────────────────────────
    # Parámetros (con defaults tolerantes a faltantes globales)
    # ───────────────────────────────────────────────────────────────────
    MAGIC = CONFIG.magic
    LENGTHS = CONFIG.lengths
    SMOOTHS = CONFIG.smooths
    LOT_ = CONFIG.lot
    COMMENT_ = CONFIG.comment

    # ───────────────────────────────────────────────────────────────────
    # Helpers locales
    # ───────────────────────────────────────────────────────────────────

    def _parse_tf_to_delta(tf: str) -> dt.timedelta:
        """'1m','5m','15m','1h','4h','1d' → timedelta (fallback 1m)."""
        tf = (tf or "1m").strip().lower()
        if tf.endswith("mn"):
            tf = tf[:-2] + "m"
        try:
            if tf.endswith("m"):
                return dt.timedelta(minutes=max(int(tf[:-1]), 1))
            if tf.endswith("h"):
                return dt.timedelta(hours=max(int(tf[:-1]), 1))
            if tf.endswith("d"):
                return dt.timedelta(days=max(int(tf[:-1]), 1))
        except Exception:
            pass
        return dt.timedelta(minutes=1)

    def _floor_to_frame(ts: dt.datetime, delta: dt.timedelta) -> dt.datetime:
        """Floor de ts a múltiplo exacto del timeframe (UTC)."""
        if ts.tzinfo is None:
            ts = ts.replace(tzinfo=dt.timezone.utc)
        epoch = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc)
        secs  = int((ts - epoch).total_seconds())
        step  = int(delta.total_seconds()) or 60
        return epoch + dt.timedelta(seconds=(secs // step) * step)

    def _calc_atr(df: pd.DataFrame, period: int = 14) -> pd.Series:
        """
        ATR estilo Wilder: TR = max(H-L, |H-C1|, |L-C1|), ATR = RMA(TR, period).
        Usa ewm(alpha=1/period) como aproximación de RMA.
        """
        h, l, c = df["high"].astype(float), df["low"].astype(float), df["close"].astype(float)
        c1 = c.shift(1)
        tr = np.maximum.reduce([
            (h - l).to_numpy(),
            (h - c1).abs().to_numpy(),
            (l - c1).abs().to_numpy()
        ])
        atr = pd.Series(tr, index=df.index).ewm(alpha=1/period, adjust=False).mean()
        return atr.round(4)

    async def _has_open_position_magic(rpc_conn, symbol: str, magic: int) -> bool:
        """True si existe posición con ese magic (prefiere helper global si existe)."""
        try:
            pos = await get_pos_with_magic(rpc_conn, symbol=symbol, magic=magic)
            return pos is not None
        except Exception:
            positions = []
            try:
                positions = await rpc_conn.get_positions(symbol=symbol) or []
            except Exception:
                positions = []
            if not positions:
                r = _rest_get_positions(META_API_TOKEN, ACCOUNT_ID, REGION, symbol)
                if getattr(r, "status_code", 0) == 200:
                    try:
                        positions = r.json() or []
                    except Exception:
                        positions = []
            if not positions:
                return False
            for p in positions:
                pm = p.get("magic")
                cmt = str(p.get("comment") or "")
                ok = False
                if pm is not None:
                    try: ok = int(pm) == int(magic)
                    except Exception: ok = False
                if (not ok) and f"magic={magic}" in cmt:
                    ok = True
                if ok:
                    return True
            return False

    async def _get_api_type(rpc_conn, symbol: str, magic: int):
        """'Long' / 'Short' / None usando get_pos_with_magic si existe."""
        side = None
        try:
            pos = await get_pos_with_magic(rpc_conn, symbol=symbol, magic=magic)
            if not pos:
                return None
            t = pos.get("type")
            if isinstance(t, str):
                tu = t.upper()
                side = "BUY" if "BUY" in tu else ("SELL" if "SELL" in tu else None)
            elif t == 0:
                side = "BUY"
            elif t == 1:
                side = "SELL"
        except Exception:
            try:
                positions = await rpc_conn.get_positions(symbol=symbol) or []
            except Exception:
                positions = []
            for p in positions:
                pm = p.get("magic")
                cmt = str(p.get("comment") or "")
                ok = False
                if pm is not None:
                    try: ok = int(pm) == int(magic)
                    except Exception: ok = False
                if (not ok) and f"magic={magic}" in cmt:
                    ok = True
                if not ok:
                    continue
                t = p.get("type")
                if isinstance(t, str):
                    tu = t.upper()
                    side = "BUY" if "BUY" in tu else ("SELL" if "SELL" in tu else None)
                elif t == 0:
                    side = "BUY"
                elif t == 1:
                    side = "SELL"
                break
        if side is None:
            return None
        return "Long" if side == "BUY" else "Short"

    def _sync_type_in_df(df_all: pd.DataFrame, api_type: str | None) -> None:
        """Escribe 'Type' en el bloque activo o en la última fila si no se detecta bloque."""
        if not api_type or df_all.empty:
            return
        last_oid = df_all.get("orderId")
        if last_oid is not None and last_oid.notna().any():
            last_oid_val = last_oid.dropna().iloc[-1]
            mask = (df_all["orderId"] == last_oid_val)
        else:
            ed = pd.to_datetime(df_all.get("Entry_Date"), errors="coerce", utc=True)
            starts = ed.notna() & (ed != ed.shift(1))
            if starts.any():
                start_idx = df_all.index[starts].max()
                mask = (df_all.index >= start_idx)
            else:
                mask = pd.Series(False, index=df_all.index)
        if mask.any():
            df_all.loc[mask, "Type"] = api_type
        else:
            df_all.at[df_all.index[-1], "Type"] = api_type

    # ───────────────────────────────────────────────────────────────────
    # 1) Crear/migrar archivo inicial
    # ───────────────────────────────────────────────────────────────────
    if not os.path.exists(FILE_PATH):
        df = await get_candles_5m(account, start=None, limit=CANDEL_NUMBER)
        if len(df) >= 14:
            df["ATR"] = _calc_atr(df, 14)
        l1, l2, l3, l4 = LENGTHS
        s1, s2, s3, s4 = SMOOTHS
        generate_trade_signals(df, l1, l2, l3, l4, s1, s2, s3, s4)
        _ensure_order_cols(df)
        stamp_system_time(df, "last")
        save_csv(df)
        print(f"✔ Archivo inicial creado con {len(df)} velas")
    else:
        migrate_csv_if_needed(FILE_PATH)

    # ───────────────────────────────────────────────────────────────────
    # 2) Loop principal
    # ───────────────────────────────────────────────────────────────────
    async def _wait_for_closed_candle(prev_bar: dt.datetime,
                                      last_known_time: pd.Timestamp | None,
                                      delta: dt.timedelta) -> pd.DataFrame:
        """Obtiene la última vela cerrada <= prev_bar reintentando durante un margen de seguridad."""
        wait_limit = max(dt.timedelta(seconds=45), delta)
        wait_limit = min(wait_limit, dt.timedelta(minutes=5))
        deadline = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc) + wait_limit
        poll_sleep = max(3, min(20, int(delta.total_seconds() // 6) or 1))
        warned_at: dt.datetime | None = None
        latest_candidate: pd.DataFrame | None = None
        last_snapshot = pd.DataFrame()
        while True:
            fresh = await get_candles_5m(account, start=None, limit=50)
            fresh = (fresh[fresh["time"] <= prev_bar]
                     .drop_duplicates("time")
                     .sort_values("time"))
            last_snapshot = fresh.copy()
            if not fresh.empty:
                latest_candidate = fresh.iloc[[-1]].copy()
                latest_time = pd.to_datetime(latest_candidate["time"], utc=True, errors="coerce")
                latest_time = latest_time.iloc[-1] if not latest_time.empty else None
                if last_known_time is None or (latest_time is not None and latest_time > last_known_time):
                    return latest_candidate
                if (last_known_time is not None
                        and latest_time is not None and latest_time <= last_known_time):
                    now_warn = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
                    if warned_at is None or (now_warn - warned_at).total_seconds() >= 60:
                        ts_txt = prev_bar.strftime("%Y-%m-%d %H:%M:%S")
                        print(f"⚠️ Vela cerrada {ts_txt} aún no publicada; reintentando...", flush=True)
                        warned_at = now_warn
            if dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc) >= deadline:
                if latest_candidate is not None:
                    return latest_candidate
                return last_snapshot
            await asyncio.sleep(poll_sleep)

    reconnect_backoff = 5.0
    reconnect_backoff_max = 60.0

    last_no_new_warning_at: dt.datetime | None = None
    last_no_new_warning_bar: dt.datetime | None = None

    while True:
        try:
            await asyncio.sleep(seconds_until_next_tf(time_frame_data, offset_sec=3))

            now_utc  = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
            delta    = _parse_tf_to_delta(time_frame_data)
            this_bar = _floor_to_frame(now_utc, delta)
            prev_bar = this_bar - delta  # última vela CERRADA

            df_all = _load_csv()
            last_known_time = None
            if not df_all.empty and "time" in df_all.columns:
                known_times = pd.to_datetime(df_all["time"], utc=True, errors="coerce").dropna()
                if not known_times.empty:
                    last_known_time = known_times.iloc[-1]

            df_new = await _wait_for_closed_candle(prev_bar, last_known_time, delta)

            prev_bar_txt = prev_bar.strftime("%Y-%m-%d %H:%M:%S")
            latest_new_time = None
            if not df_new.empty and "time" in df_new.columns:
                latest_times = pd.to_datetime(df_new["time"], utc=True, errors="coerce").dropna()
                if not latest_times.empty:
                    latest_new_time = latest_times.iloc[-1]
            if last_known_time is not None and (latest_new_time is None or latest_new_time <= last_known_time):
                now_warn = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
                should_warn = True
                if last_no_new_warning_at is not None:
                    elapsed = (now_warn - last_no_new_warning_at).total_seconds()
                    same_bar = (last_no_new_warning_bar == prev_bar)
                    if same_bar and elapsed < 60:
                        should_warn = False
                if should_warn:
                    print(f"⚠️ Sin nueva vela cerrada para {prev_bar_txt} UTC; se reintentará en el siguiente ciclo.", flush=True)
                    last_no_new_warning_at = now_warn
                    last_no_new_warning_bar = prev_bar

            existing_times = (
                set(pd.to_datetime(df_all["time"], utc=True))
                if (not df_all.empty and "time" in df_all.columns)
                else set()
            )

            if df_all.empty:
                df_all = df_new.copy()
            else:
                df_all = (pd.concat([df_all, df_new], ignore_index=True)
                          .drop_duplicates("time")
                          .sort_values("time")
                          .reset_index(drop=True))

            if len(df_all) >= 14:
                df_all["ATR"] = _calc_atr(df_all, 14)

            l1, l2, l3, l4 = LENGTHS
            s1, s2, s3, s4 = SMOOTHS
            generate_trade_signals(df_all, l1, l2, l3, l4, s1, s2, s3, s4)
            _ensure_order_cols(df_all)

            if not df_new.empty:
                new_times = set(pd.to_datetime(df_new["time"], utc=True)) - existing_times
                if new_times:
                    df_all.loc[pd.to_datetime(df_all["time"], utc=True).isin(new_times), "source"] = 1

            await open_trade(df_all, rpc_conn, symbol=SYMBOL, lot=LOT_, comment=COMMENT_, magic=MAGIC)

            api_type = await _get_api_type(rpc_conn, SYMBOL, MAGIC)
            _sync_type_in_df(df_all, api_type)
            await sync_stop_loss_from_df(df_all, rpc_conn, symbol=SYMBOL, magic=MAGIC)

            stamp_system_time(df_all, "last")
            save_csv(df_all)
            print(dt.datetime.utcnow().strftime("%H:%M:%S"), "| actualización (ciclo "+time_frame_data+")")

            reconnect_backoff = 5.0
        except asyncio.CancelledError:
            raise
        except Exception as loop_err:
            logging.exception("Error en ciclo principal MetaApi: %s", loop_err)
            print(f"⚠️ Error en ciclo principal: {loop_err}", flush=True)
            await asyncio.sleep(reconnect_backoff)
            reconnect_backoff = min(reconnect_backoff * 2, reconnect_backoff_max)
            try:
                account = await connect_metaapi(META_API_TOKEN, ACCOUNT_ID)
                rpc_conn = account.get_rpc_connection()
                try:
                    await rpc_conn.connect()
                except Exception:
                    pass
                try:
                    await asyncio.wait_for(rpc_conn.wait_synchronized(), timeout=30)
                except Exception:
                    pass
                reconnect_backoff = 5.0
                print("🔄 Reconexión MetaApi completada.", flush=True)
            except Exception as recon_err:
                logging.exception("Fallo reconectando a MetaApi: %s", recon_err)
                print(f"❌ Falló la reconexión a MetaApi: {recon_err}", flush=True)
            continue





# Run

In [None]:
df_plot_source = globals().get("df")
if isinstance(df_plot_source, pd.DataFrame) and not df_plot_source.empty:
    df_plot = df_plot_source.tail(250).copy()

    plt.figure(figsize=(14, 7))
    plt.plot(df_plot['time'], df_plot['close'], label='Close', color='black', linewidth=1)
    plt.plot(df_plot['time'], df_plot['kal_1'], label='Kal_1', color='blue', linestyle='--')
    plt.plot(df_plot['time'], df_plot['kal_2'], label='Kal_2', color='red', linestyle='--')
    plt.plot(df_plot['time'], df_plot['kal_3'], label='Kal_3', color='green', linestyle='--')

    for _, row in df_plot.iterrows():
        if row.get('Open_Trade') == 1:
            plt.axvline(row['time'], color='blue', linestyle='-', linewidth=1, alpha=0.7)
        elif row.get('Open_Trade') == -1:
            plt.axvline(row['time'], color='red', linestyle='-', linewidth=1, alpha=0.7)

    plt.title('Close Price and Kalman Lines with Trade Signals')
    plt.xlabel('Time')
    plt.ylabel('Price')
    plt.legend()
    plt.show()
else:
    print("No data available to plot Kalman signals.")


