In [1]:
pip install --upgrade websockets

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
pip install nest-asyncio

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


# Sistema Propuesto

Este sistema implementa un monitor de trading automatizado que se conecta a Finnhub mediante WebSocket para recibir datos de mercado en tiempo real. El sistema analiza tendencias utilizando el indicador ADX (Average Directional Index) y puede generar alertas basadas en condiciones espec√≠ficas del mercado.

In [3]:
# Importaci√≥n de bibliotecas necesarias
import pandas as pd                    # Para manipulaci√≥n y an√°lisis de datos
from datetime import datetime, timedelta # Para manejo de fechas y tiempos
import websockets                      # Para conexiones websocket
import asyncio                         # Para programaci√≥n as√≠ncrona
import json                           # Para manejo de datos JSON
import numpy as np                    # Para c√°lculos num√©ricos
import nest_asyncio                   # Para permitir loops asyncio anidados
import time                           # Para funciones relacionadas con tiempo
from IPython.display import clear_output # Para limpiar salida en notebooks
import threading                      # Para manejo de hilos
from concurrent.futures import ThreadPoolExecutor # Para pool de hilos
import pytz                          # Para manejo de zonas horarias
from ta.trend import ADXIndicator    # Para el indicador t√©cnico ADX
import logging                       # Para registro de eventos

# Habilitar loops anidados para Jupyter
nest_asyncio.apply()

class ConnectionStatus:
    """
    Clase que define los estados posibles de la conexi√≥n al websocket.
    Utiliza constantes en espa√±ol para mejor comprensi√≥n en el contexto local.
    """
    DISCONNECTED = "Desconectado"     # Estado cuando no hay conexi√≥n
    CONNECTING = "Conectando"         # Estado durante el intento de conexi√≥n
    CONNECTED = "Conectado"           # Estado cuando la conexi√≥n est√° establecida
    ERROR = "Error"                   # Estado cuando ocurre un error
    RECONNECTING = "Reconectando"     # Estado durante un intento de reconexi√≥n

class TradingConfig:
    """
    Clase principal de configuraci√≥n para el sistema de trading.
    Maneja los par√°metros de configuraci√≥n y constantes necesarias para la operaci√≥n.
    """
    def __init__(self,
                symbol,                    # S√≠mbolo del instrumento financiero
                adx_threshold=25,          # Umbral para considerar tendencia fuerte
                di_threshold=20,           # Umbral para se√±ales de cruce DI
                alert_cooldown=300,        # Tiempo m√≠nimo entre alertas (en segundos)
                data_window=300,           # Ventana de tiempo para datos hist√≥ricos
                update_interval=2,         # Frecuencia de actualizaci√≥n de c√°lculos
                api_key=None):             # Clave API para autenticaci√≥n
        
        # Configuraci√≥n b√°sica
        self.symbol = symbol.upper()        # Convertir s√≠mbolo a may√∫sculas
        self.adx_threshold = adx_threshold  # Umbral ADX para detectar tendencia fuerte
        self.di_threshold = di_threshold    # Umbral para detectar cruces significativos
        self.alert_cooldown = alert_cooldown # Evita m√∫ltiples alertas en poco tiempo
        self.data_window = data_window      # Cantidad de datos hist√≥ricos a mantener
        self.update_interval = update_interval # Frecuencia de actualizaciones
        
        # Configuraci√≥n de la API
        self.ws_url = "wss://ws.finnhub.io"  # URL del websocket de Finnhub
        self.api_key = api_key or "cukll41r01qo08i8jju0cukll41r01qo08i8jjug" # API key con fallback
        
        # Par√°metros de reconexi√≥n
        self.max_retries = 3                # N√∫mero m√°ximo de intentos de reconexi√≥n
        self.reconnect_delay = 5            # Tiempo entre intentos de reconexi√≥n

In [4]:
!python -m pip install ta





[notice] A new release of pip is available: 24.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [5]:
import time
from datetime import datetime

class AlertSystem:
    def __init__(self):
        """
        Constructor de la clase AlertSystem.
        Inicializa las variables para rastrear la √∫ltima alerta y la √∫ltima se√±al generada.
        """
        self.last_alert = None  # Marca de tiempo de la √∫ltima alerta enviada
        self.last_signal = None  # √öltima se√±al de trading generada (BUY, SELL o NEUTRAL)
        
    def check_signals(self, data, config):
        """
        Analiza los datos de entrada y genera se√±ales de trading, incluyendo estado neutral.
        
        Par√°metros:
        - data (dict): Diccionario con los datos del mercado, que debe incluir:
            'ADX': √çndice de Movimiento Direccional Medio (Average Directional Index).
            'DI+': Indicador Direccional Positivo (+DI).
            'DI-': Indicador Direccional Negativo (-DI).
            'timestamp': Marca de tiempo del dato en segundos.
            'close': Precio de cierre del activo en el momento de an√°lisis.
        - config (objeto): Configuraci√≥n con los siguientes atributos:
            - adx_threshold: Umbral m√≠nimo de ADX para considerar una tendencia fuerte.
            - di_threshold: Umbral m√≠nimo de DI+ o DI- para confirmar una tendencia.
            - alert_cooldown: Tiempo m√≠nimo (en segundos) entre alertas repetitivas.

        Retorna:
        - dict: Mensaje de alerta con el estado actual del mercado si se genera una se√±al nueva.
        - None: Si no se genera una nueva alerta (para evitar repetici√≥n de se√±ales).
        """
        current_time = time.time()  # Obtiene la marca de tiempo actual
        
        # Extrae los indicadores t√©cnicos desde el diccionario de datos
        adx = data['ADX']
        di_plus = data['DI+']
        di_minus = data['DI-']
        
        # Determinar el estado del mercado con base en ADX y DI+ / DI-
        if adx > config.adx_threshold:  # ADX por encima del umbral ‚Üí tendencia fuerte
            if di_plus > di_minus and di_plus > config.di_threshold:
                signal_type = 'BUY'  # Se√±al de compra
                market_state = 'Tendencia alcista fuerte'
            elif di_minus > di_plus and di_minus > config.di_threshold:
                signal_type = 'SELL'  # Se√±al de venta
                market_state = 'Tendencia bajista fuerte'
            else:
                signal_type = 'NEUTRAL'  # Tendencia fuerte sin direcci√≥n clara
                market_state = 'Tendencia fuerte sin direcci√≥n clara'
        else:
            signal_type = 'NEUTRAL'  # ADX por debajo del umbral ‚Üí tendencia d√©bil o mercado en rango
            if adx < 20:
                market_state = 'Mercado sin tendencia clara - Posible rango lateral'
            else:
                market_state = 'Tendencia d√©bil - Esperar confirmaci√≥n'

        # Evitar alertas repetitivas para BUY/SELL
        if (signal_type in ['BUY', 'SELL'] and 
            self.last_signal == signal_type and 
            self.last_alert is not None and 
            current_time - self.last_alert < config.alert_cooldown):
            return None  # Se evita enviar una alerta repetitiva en un corto per√≠odo de tiempo

        # Actualizar la √∫ltima alerta y la √∫ltima se√±al registrada
        self.last_alert = current_time
        self.last_signal = signal_type
        
        # Generar y retornar el mensaje de alerta
        return self._generate_alert(data, signal_type, market_state)
        
    def _generate_alert(self, data, signal_type, market_state):
        """
        Genera un mensaje de alerta con los detalles del mercado.
        
        Par√°metros:
        - data (dict): Diccionario con los datos de mercado.
        - signal_type (str): Tipo de se√±al generada ('BUY', 'SELL' o 'NEUTRAL').
        - market_state (str): Descripci√≥n del estado del mercado.

        Retorna:
        - dict: Mensaje de alerta con la informaci√≥n relevante del mercado.
        """
        return {
            'type': signal_type,  # Tipo de se√±al generada
            'state': market_state,  # Estado descriptivo del mercado
            'timestamp': datetime.fromtimestamp(data['timestamp']).strftime('%Y-%m-%d %H:%M:%S'),  # Fecha y hora formateada
            'price': data['close'],  # Precio de cierre del activo
            'adx': data['ADX'],  # Valor de ADX
            'di_plus': data['DI+'],  # Valor de DI+
            'di_minus': data['DI-']  # Valor de DI-
        }

In [6]:
class StockDataStream:
    def __init__(self, config=None):
        """
        Inicializa la clase StockDataStream, que gestiona la recepci√≥n de datos en tiempo real a trav√©s de WebSockets.

        Par√°metros:
        - config (TradingConfig, opcional): Configuraci√≥n del sistema de trading. Si no se proporciona, se usa una configuraci√≥n predeterminada.
        """
        self.config = config or TradingConfig()  # Carga la configuraci√≥n o usa la predeterminada
        self.data_buffer = []  # Buffer para almacenar los datos de precios en tiempo real
        self.last_update = None  # √öltima actualizaci√≥n de datos
        self.running = True  # Estado de ejecuci√≥n del stream
        self.connection_status = ConnectionStatus.DISCONNECTED  # Estado de conexi√≥n inicial
        self.messages_received = 0  # Contador de mensajes recibidos
        self.alert_system = AlertSystem()  # Sistema de alertas para se√±ales de compra y venta
        self.websocket = None  # Conexi√≥n WebSocket (inicialmente nula)
        self.task = None  # Tarea as√≠ncrona para manejar el WebSocket
        self.loop = None  # Event loop de asyncio
        self.connection_retries = 0  # Contador de intentos de reconexi√≥n

        # Configuraci√≥n del sistema de logging para registrar eventos
        self.setup_logger()

    def setup_logger(self):
        """
        Configura el sistema de logging para registrar eventos del stream de datos en un archivo de log.
        
        - El nombre del archivo de log sigue el formato 'trading_alerts_<symbol>_<YYYYMMDD>.log'.
        - Se usa un nivel de logging INFO para capturar eventos importantes.
        - Se evita agregar m√∫ltiples manejadores al logger si ya existen.
        """
        
        # Obtiene la fecha actual en formato YYYYMMDD para incluirla en el nombre del archivo de log
        fecha_actual = datetime.now().strftime('%Y%m%d')
        
        # Define el nombre del archivo de log con el s√≠mbolo de la acci√≥n y la fecha actual
        log_filename = f'trading_alerts_{self.config.symbol}_{fecha_actual}.log'

        # Crea un logger espec√≠fico para el s√≠mbolo de la acci√≥n
        self.logger = logging.getLogger(f'trading_{self.config.symbol}')
        
        # Establece el nivel de logging en INFO para registrar eventos relevantes
        self.logger.setLevel(logging.INFO)

        # Verifica si el logger ya tiene manejadores para evitar duplicados
        if not self.logger.handlers:
            # Crea un manejador de archivo para guardar los logs
            file_handler = logging.FileHandler(log_filename)
            
            # Define el formato de los logs: timestamp | nivel de log | mensaje
            file_formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')
            
            # Asigna el formato al manejador
            file_handler.setFormatter(file_formatter)
            
            # Agrega el manejador de archivo al logger
            self.logger.addHandler(file_handler)

    def log_alert(self, alert):
        """
        Registra una alerta en el sistema de logging.

        Par√°metros:
        alert (dict): Un diccionario que contiene los detalles de la alerta.
            - 'type' (str): Tipo de alerta ('BUY', 'SELL' o estado neutral).
            - 'price' (float): Precio de la acci√≥n en el momento de la alerta.
            - 'adx' (float): Indicador ADX en el momento de la alerta.
            - 'di_plus' (float): Valor del indicador DI+.
            - 'di_minus' (float): Valor del indicador DI-.
            - 'state' (str): Estado de la tendencia.

        Funcionamiento:
        - Si la alerta es de tipo 'BUY' o 'SELL', se registra con nivel INFO.
        - Si la alerta es neutral, se registra con nivel DEBUG.
        """

        # Verifica si la alerta es de compra ('BUY') o venta ('SELL')
        if alert['type'] in ['BUY', 'SELL']:
            # Formatea el mensaje de la alerta con los indicadores t√©cnicos relevantes
            mensaje = (
                f"ALERTA DE {alert['type']} | "
                f"S√≠mbolo: {self.config.symbol} | "
                f"Precio: ${alert['price']:.2f} | "
                f"ADX: {alert['adx']:.2f} | "
                f"DI+: {alert['di_plus']:.2f} | "
                f"DI-: {alert['di_minus']:.2f} | "
                f"Estado: {alert['state']}"
            )
            # Registra la alerta con nivel INFO en el log
            self.logger.info(mensaje)
        
        else:
            # Si la alerta no es de compra ni venta, se considera un estado neutral
            mensaje = (
                f"ESTADO NEUTRAL | "
                f"S√≠mbolo: {self.config.symbol} | "
                f"Precio: ${alert['price']:.2f} | "
                f"Estado: {alert['state']}"
            )
            # Registra la alerta con nivel DEBUG, ya que no requiere acci√≥n inmediata
            self.logger.debug(mensaje)

    def _get_signal_emoji(self, signal_type):
        """
        Retorna el emoji correspondiente a un tipo de se√±al de trading.

        Par√°metros:
        signal_type (str): Tipo de se√±al, puede ser 'BUY', 'SELL' o 'NEUTRAL'.

        Retorna:
        str: Emoji que representa visualmente la se√±al.

        Mapeo de se√±ales:
        - 'BUY' (Compra) ‚Üí üü¢
        - 'SELL' (Venta) ‚Üí üî¥
        - 'NEUTRAL' (Sin tendencia definida) ‚Üí ‚ö™
        - Cualquier otro valor ‚Üí ‚ùì (desconocido)
        """

        # Diccionario que asocia cada tipo de se√±al con su emoji correspondiente
        emojis = {
            'BUY': 'üü¢',     # Se√±al de compra
            'SELL': 'üî¥',    # Se√±al de venta
            'NEUTRAL': '‚ö™'  # Se√±al neutral
        }
        
        # Retorna el emoji correspondiente o '‚ùì' si el tipo de se√±al no est√° definido
        return emojis.get(signal_type, '‚ùì')

    def calculate_adx(self, df, period=14):
        """
        Calcula el indicador ADX (Average Directional Index) y sus componentes DI+ y DI-.

        Par√°metros:
        df (pd.DataFrame): DataFrame con las columnas 'high', 'low' y 'close' de los precios del activo.
        period (int): Per√≠odo de c√°lculo del ADX (por defecto 14).

        Retorna:
        tuple(pd.Series, pd.Series, pd.Series): 
            - ADX (Serie de valores de tendencia)
            - DI+ (Serie de valores positivos de la direcci√≥n del √≠ndice)
            - DI- (Serie de valores negativos de la direcci√≥n del √≠ndice)
        """

        # Verifica que haya suficientes datos para calcular el ADX
        if len(df) < period:
            return pd.Series(np.nan, index=df.index)

        # Crea una instancia del indicador ADX utilizando la librer√≠a ta (technical analysis)
        adx_indicator = ADXIndicator(
            high=df['high'],   # Precios m√°ximos
            low=df['low'],     # Precios m√≠nimos
            close=df['close'], # Precios de cierre
            window=period      # Per√≠odo del indicador
        )

        # Retorna el ADX y sus componentes DI+ y DI-
        return adx_indicator.adx(), adx_indicator.adx_pos(), adx_indicator.adx_neg()

    async def print_summary(self, data):
        """
        Genera y muestra un resumen en la consola con informaci√≥n clave del mercado.

        Par√°metros:
        data (dict): Diccionario con los datos m√°s recientes del mercado, incluyendo precios, indicadores y se√±ales.

        Funcionalidad:
        - Limpia la consola antes de mostrar el resumen.
        - Presenta los datos del activo, incluyendo el precio actual y los indicadores ADX, DI+ y DI-.
        - Eval√∫a si hay una se√±al de trading (BUY, SELL o NEUTRAL).
        - Muestra la se√±al correspondiente con emojis para una mejor visualizaci√≥n.
        - Registra las alertas en el log de la aplicaci√≥n.
        - Imprime informaci√≥n sobre la conexi√≥n y la cantidad de mensajes recibidos.
        - Maneja posibles errores durante la ejecuci√≥n e informa en el log.
        """

        try:
            # Limpia la salida de la consola para mostrar siempre informaci√≥n actualizada
            clear_output(wait=True)
            
            # Encabezado del resumen de mercado
            print("\n" + "="*50)
            print(f"RESUMEN DE MERCADO - {self.config.symbol}")
            print("="*50)

            # Muestra la fecha y hora del √∫ltimo dato recibido
            print(f"\nFecha y Hora: {datetime.fromtimestamp(data['timestamp']).strftime('%Y-%m-%d %H:%M:%S')}")

            # Muestra los precios e indicadores t√©cnicos
            print("\nPRECIOS E INDICADORES:")
            print(f"Precio actual: ${data['close']:.2f}")
            print(f"ADX: {data['ADX']:.2f}")
            print(f"DI+: {data['DI+']:.2f}")
            print(f"DI-: {data['DI-']:.2f}")

            # Analiza si hay una se√±al de trading
            alert = self.alert_system.check_signals(data, self.config)

            print("\nAN√ÅLISIS DE MERCADO:")
            print("-" * 30)

            if alert:
                # Obtiene el emoji correspondiente a la se√±al (ejemplo: üü¢ para BUY)
                emoji = self._get_signal_emoji(alert['type'])
                print(f"SE√ëAL ACTUAL: {emoji} {alert['type']}")
                print(f"Estado: {alert['state']}")

                # Si la se√±al es de compra o venta, muestra una alerta destacada
                if alert['type'] in ['BUY', 'SELL']:
                    print("\n" + "!"*50)
                    print(f"üö® ALERTA DE TRADING - {alert['type']}")
                    print(f"Precio: ${alert['price']:.2f}")
                    print(f"ADX: {alert['adx']:.2f}")
                    print(f"DI+: {alert['di_plus']:.2f}")
                    print(f"DI-: {alert['di_minus']:.2f}")
                    print("!"*50)

                # Registra la alerta en el log
                self.log_alert(alert)

            # Muestra el estado de la conexi√≥n y el n√∫mero de mensajes recibidos
            print(f"\nESTADO DE CONEXI√ìN: {self.connection_status}")
            print(f"Mensajes recibidos: {self.messages_received}")

            # Indica la ubicaci√≥n del archivo de log donde se almacenan las alertas
            print(f"Log: trading_alerts_{self.config.symbol}_{datetime.now().strftime('%Y%m%d')}.log")
            print("="*50)

        except Exception as e:
            # Captura errores y los muestra en la consola, adem√°s de registrarlos en el log
            print(f"Error en print_summary: {e}")
            self.logger.error(f"Error en print_summary: {e}")

    async def handle_trade_message(self, message):
        """
        Maneja los mensajes recibidos en tiempo real desde el WebSocket de trading.

        Par√°metros:
        message (str): Mensaje en formato JSON recibido desde el WebSocket.

        Funcionalidad:
        - Incrementa el contador de mensajes recibidos.
        - Procesa los mensajes de tipo 'ping' y responde con 'pong' para mantener la conexi√≥n activa.
        - Extrae los datos de las transacciones si el mensaje es de tipo 'trade'.
        - Agrega los datos al buffer, asegurando que solo se mantengan los datos dentro de la ventana de tiempo configurada.
        - Calcula los indicadores ADX, DI+ y DI- si hay suficientes datos.
        - Llama a `print_summary` para actualizar el resumen si ha pasado el tiempo m√≠nimo de actualizaci√≥n.
        - Maneja posibles errores y los registra en el log.
        """

        try:
            # Incrementa el contador de mensajes recibidos
            self.messages_received += 1

            # Decodifica el mensaje JSON recibido
            data = json.loads(message)

            # Si el mensaje es un 'ping', responde con 'pong' para mantener la conexi√≥n activa
            if data.get('type') == 'ping':
                if self.websocket:
                    await self.websocket.send(json.dumps({'type': 'pong'}))
                return  # Sale de la funci√≥n ya que no hay m√°s procesamiento necesario
            
            # Procesa solo los mensajes de tipo 'trade' que contengan datos
            if data['type'] == 'trade' and 'data' in data:
                trades = data['data']
                
                for trade in trades:
                    timestamp = trade['t'] / 1000  # Convierte el timestamp de milisegundos a segundos
                    price = float(trade['p'])  # Extrae el precio de la transacci√≥n
                    
                    # Agrega los datos de la transacci√≥n al buffer, asignando valores de precio a cierre, m√°ximo y m√≠nimo
                    self.data_buffer.append({
                        'timestamp': timestamp,
                        'close': price,
                        'high': price,
                        'low': price,
                    })
                
                # Filtra el buffer para mantener solo los datos dentro del intervalo configurado
                current_time = time.time()
                self.data_buffer = [d for d in self.data_buffer 
                                    if current_time - d['timestamp'] <= self.config.data_window]

                # Calcula los indicadores ADX, DI+ y DI- solo si hay suficientes datos en el buffer
                if len(self.data_buffer) > 14:
                    df = pd.DataFrame(self.data_buffer)  # Convierte el buffer en un DataFrame
                    adx, di_pos, di_neg = self.calculate_adx(df)  # Calcula los indicadores
                    
                    # Agrega los indicadores calculados al DataFrame
                    df['ADX'] = adx
                    df['DI+'] = di_pos
                    df['DI-'] = di_neg
                    
                    # Verifica si ha pasado el tiempo m√≠nimo de actualizaci√≥n antes de imprimir el resumen
                    if (self.last_update is None or 
                        current_time - self.last_update >= self.config.update_interval):
                        self.last_update = current_time  # Actualiza el tiempo de la √∫ltima actualizaci√≥n
                        await self.print_summary(df.iloc[-1])  # Llama a print_summary con el √∫ltimo dato disponible

        except Exception as e:
            # Maneja cualquier error en la ejecuci√≥n y lo registra en el log
            print(f"Error procesando mensaje: {e}")
            self.logger.error(f"Error procesando mensaje: {e}")

    async def connect_websocket(self):
        """
        Establece y mantiene la conexi√≥n WebSocket con el servidor de datos financieros.

        Funcionalidad:
        - Intenta conectar al WebSocket hasta alcanzar el n√∫mero m√°ximo de reintentos.
        - Maneja errores de conexi√≥n y reintenta en caso de fallos.
        - Se suscribe a los datos de la acci√≥n configurada.
        - Procesa los mensajes recibidos y los env√≠a a `handle_trade_message`.
        - Controla el estado de la conexi√≥n e imprime informaci√≥n relevante.

        Par√°metros:
        - No recibe par√°metros directamente, pero usa `self.config` para obtener la configuraci√≥n.

        Estado de conexi√≥n:
        - CONNECTING: Intentando conectar al WebSocket.
        - CONNECTED: Conexi√≥n establecida exitosamente.
        - ERROR: Ocurri√≥ un error y se intentar√° reconectar.
        """

        while self.running:  # Mientras el stream est√© activo, intenta mantener la conexi√≥n
            try:
                # Si se supera el n√∫mero m√°ximo de reintentos, detiene la conexi√≥n
                if self.connection_retries >= self.config.max_retries:
                    error_msg = f"\n‚ùå Se super√≥ el n√∫mero m√°ximo de intentos ({self.config.max_retries})"
                    print(error_msg)
                    self.logger.error(error_msg)
                    self.running = False  # Finaliza el bucle
                    break

                # Indica que se est√° intentando conectar
                self.connection_status = ConnectionStatus.CONNECTING
                print(f"\nüîÑ Intento de conexi√≥n {self.connection_retries + 1}/{self.config.max_retries}")

                # Intenta establecer la conexi√≥n con el servidor WebSocket
                async with websockets.connect(f"{self.config.ws_url}?token={self.config.api_key}") as websocket:
                    self.websocket = websocket  # Guarda la conexi√≥n activa
                    self.connection_status = ConnectionStatus.CONNECTED  # Marca la conexi√≥n como establecida
                    
                    # Registra e imprime la conexi√≥n exitosa
                    self.logger.info(f"Conexi√≥n WebSocket establecida para {self.config.symbol}")
                    print(f"\nüü¢ Conexi√≥n WebSocket establecida para {self.config.symbol}")

                    # Env√≠a mensaje de suscripci√≥n para recibir datos de la acci√≥n especificada
                    subscribe_message = {
                        "type": "subscribe",
                        "symbol": self.config.symbol
                    }
                    await websocket.send(json.dumps(subscribe_message))
                    print("‚úÖ Suscripci√≥n enviada")

                    # Loop para recibir y manejar mensajes en tiempo real mientras el WebSocket est√© activo
                    while self.running:
                        try:
                            message = await websocket.recv()  # Recibe un mensaje
                            await self.handle_trade_message(message)  # Procesa el mensaje recibido
                        except websockets.exceptions.ConnectionClosed:
                            # Si la conexi√≥n se cierra inesperadamente, registra y avisa
                            print("\nüî¥ Conexi√≥n cerrada")
                            self.logger.warning("Conexi√≥n WebSocket cerrada")
                            break  # Sale del loop interno para intentar reconectar

            except Exception as e:
                # Captura cualquier error de conexi√≥n y actualiza el estado
                self.connection_status = ConnectionStatus.ERROR
                self.connection_retries += 1  # Aumenta el contador de intentos fallidos
                
                error_msg = f"\n‚ùå Error de conexi√≥n: {e}"
                print(error_msg)
                self.logger.error(error_msg)

                if self.running:
                    # Espera antes de reintentar la conexi√≥n
                    print(f"Reintentando en {self.config.reconnect_delay} segundos...")
                    await asyncio.sleep(self.config.reconnect_delay)

    def is_market_open(self):
        """
        Determina si el mercado de valores de EE.UU. (NYSE/NASDAQ) est√° abierto en tiempo real.

        Retorna:
        - (bool) True si el mercado est√° abierto, False si est√° cerrado.
        - (str) Mensaje indicando el estado actual del mercado.
        - (datetime) Pr√≥xima apertura del mercado si est√° cerrado.

        Consideraciones:
        - El horario del mercado es de 9:30 AM a 4:00 PM (hora del este, US/Eastern).
        - Los fines de semana (s√°bado y domingo), el mercado est√° cerrado.
        - Se maneja la apertura del siguiente d√≠a h√°bil si el mercado est√° cerrado.
        """
        
        # Definir la zona horaria del mercado (Hora del Este, EE.UU.)
        est = pytz.timezone('US/Eastern')
        now = datetime.now(est)  # Obtener la hora actual en la zona horaria de NY
        
        # Verificar si hoy es fin de semana (s√°bado=5, domingo=6)
        if now.weekday() in [5, 6]:  # Si es s√°bado o domingo
            next_open = now
            while next_open.weekday() in [5, 6]:  # Buscar el siguiente d√≠a h√°bil (lunes)
                next_open += timedelta(days=1)
            next_open = next_open.replace(hour=9, minute=30, second=0, microsecond=0)  # Fijar la pr√≥xima apertura
            return False, "Mercado cerrado (fin de semana)", next_open  # Retorna estado y pr√≥xima apertura

        # Definir horarios de apertura y cierre del mercado
        market_open = now.replace(hour=9, minute=30, second=0, microsecond=0)
        market_close = now.replace(hour=16, minute=0, second=0, microsecond=0)

        # Si la hora actual es antes de la apertura, indicar pre-market
        if now < market_open:
            return False, "Mercado cerrado (pre-market)", market_open  # Pr√≥xima apertura a las 9:30 AM

        # Si la hora actual es despu√©s del cierre, indicar post-market y calcular la pr√≥xima apertura
        if now >= market_close:
            next_day = now + timedelta(days=1)  # Ir al siguiente d√≠a
            while next_day.weekday() in [5, 6]:  # Saltar fines de semana
                next_day += timedelta(days=1)
            next_open = next_day.replace(hour=9, minute=30, second=0, microsecond=0)  # Fijar apertura del pr√≥ximo d√≠a h√°bil
            return False, "Mercado cerrado (post-market)", next_open

        # Si ninguna de las condiciones anteriores se cumple, el mercado est√° abierto
        return True, "Mercado abierto", market_close  # Indica el estado abierto y la pr√≥xima hora de cierre

    def start(self):
        """
        Inicia el stream de datos verificando primero si el mercado est√° abierto.

        - Si el mercado est√° cerrado, muestra un mensaje de alerta con la pr√≥xima apertura y detiene el programa.
        - Si el mercado est√° abierto, inicia el loop de eventos as√≠ncrono en un hilo separado para manejar la conexi√≥n WebSocket.

        Retorna:
        - (bool) True si el stream inicia correctamente.
        - (bool) False si el mercado est√° cerrado y no se inicia el stream.
        """
        
        # Verificar si el mercado est√° abierto
        market_open, status, next_open = self.is_market_open()
        
        # Si el mercado est√° cerrado, mostrar advertencia y detener el proceso
        if not market_open:
            print("\n" + "=" * 50)
            print("‚ö†Ô∏è ALERTA DE MERCADO CERRADO")
            print("=" * 50)
            print(f"\nEstado: {status}")
            
            # Obtener la zona horaria local y convertir la pr√≥xima apertura a la hora local del usuario
            local_tz = datetime.now().astimezone().tzinfo
            next_open_local = next_open.astimezone(local_tz)
            
            print(f"\nPr√≥xima apertura del mercado:")
            print(f"Fecha: {next_open_local.strftime('%Y-%m-%d')}")
            print(f"Hora: {next_open_local.strftime('%H:%M:%S')} (hora local)")
            print("\nEl programa se detendr√°.")
            
            # Registrar advertencia en el log
            self.logger.warning(f"Intento de inicio con mercado cerrado: {status}")
            return False  # No se inicia el stream

        # Configurar la ejecuci√≥n en un hilo separado
        self.running = True
        self.executor = ThreadPoolExecutor(max_workers=1)
        self.loop = asyncio.new_event_loop()
        
        def run_async_loop():
            """Ejecuta el loop as√≠ncrono en un hilo separado para manejar WebSockets."""
            asyncio.set_event_loop(self.loop)
            self.task = self.loop.create_task(self.connect_websocket())
            try:
                self.loop.run_until_complete(self.task)
            except asyncio.CancelledError:
                pass  # Manejar cancelaciones seguras del loop

        # Crear y lanzar un hilo para ejecutar el loop de eventos as√≠ncrono
        self.thread = threading.Thread(target=run_async_loop)
        self.thread.start()
        
        # Registrar en logs y mostrar mensaje de inicio
        self.logger.info("Stream iniciado")
        print("Stream iniciado. Para detener, usa stream.stop()")
        
        return True  # Stream iniciado exitosamente

    def stop(self):
        """ 
        M√©todo para detener el stream de datos de manera segura.

        - Cambia la bandera `self.running` a False para indicar que el proceso debe detenerse.
        - Cancela la tarea as√≠ncrona `self.task` si est√° en ejecuci√≥n.
        - Cierra la conexi√≥n WebSocket si est√° activa.
        - Espera a que el hilo de ejecuci√≥n principal termine de forma ordenada.
        - Apaga el `ThreadPoolExecutor` sin esperar a que finalicen todas las tareas.
        - Registra en logs y muestra un mensaje en consola indicando que el stream ha sido detenido.
        """
        
        print("\nüõë Deteniendo el stream de datos...")
        self.running = False  # Se√±al para detener la ejecuci√≥n

        # Si la tarea as√≠ncrona `self.task` est√° en ejecuci√≥n, se cancela de forma segura
        if hasattr(self, 'task') and self.task:
            self.loop.call_soon_threadsafe(self.task.cancel)

        # Si la conexi√≥n WebSocket est√° activa, se cierra de forma segura
        if hasattr(self, 'websocket') and self.websocket:
            self.loop.call_soon_threadsafe(lambda: asyncio.ensure_future(self.websocket.close()))

        # Espera a que el hilo de ejecuci√≥n termine, con un tiempo l√≠mite de 5 segundos
        if hasattr(self, 'thread'):
            self.thread.join(timeout=5)

        # Cierra el `ThreadPoolExecutor`, deteniendo todas las tareas en segundo plano
        if hasattr(self, 'executor'):
            self.executor.shutdown(wait=False)

        # Registrar en el log que el stream ha sido detenido
        self.logger.info("Stream detenido")
        print("Stream detenido exitosamente")


In [7]:
def create_stock_stream(symbol, adx_threshold=25, di_threshold=20, api_key=None):
    """
    Crea y retorna una instancia del stream con configuraci√≥n personalizada.
    
    Par√°metros:
    - symbol (str): S√≠mbolo de la acci√≥n a monitorear (ej: 'AAPL', 'MSFT', 'GOOGL').
    - adx_threshold (float): Umbral m√≠nimo del ADX para considerar una tendencia fuerte (default: 25).
    - di_threshold (float): Umbral m√≠nimo de los indicadores DI+ y DI- para confirmar la direcci√≥n de la tendencia (default: 20).
    - api_key (str): Clave de la API de Finnhub para la conexi√≥n con WebSockets (opcional).
    
    Retorna:
    - StockDataStream: Instancia del stream configurado con los par√°metros especificados.
    """
    
    # Se crea una instancia de TradingConfig con los par√°metros proporcionados.
    config = TradingConfig(
        symbol=symbol,  # S√≠mbolo de la acci√≥n
        adx_threshold=adx_threshold,  # Umbral para identificar tendencias fuertes
        di_threshold=di_threshold,  # Umbral para confirmar direcci√≥n de la tendencia
        api_key=api_key  # Clave de la API para la conexi√≥n con Finnhub
    )
    
    # Mensajes informativos sobre la configuraci√≥n del sistema
    print("\nIniciando sistema de trading en tiempo real...")
    print("\nConfiguraci√≥n:")
    print(f"S√≠mbolo: {config.symbol}")  # Muestra el s√≠mbolo de la acci√≥n
    print(f"Umbral ADX: {config.adx_threshold}")  # Muestra el umbral de ADX configurado
    print(f"Umbral DI: {config.di_threshold}")  # Muestra el umbral de DI configurado
    print(f"Tiempo entre alertas: {config.alert_cooldown}s")  # Muestra el tiempo m√≠nimo entre alertas
    
    # Se crea una instancia del flujo de datos burs√°tiles con la configuraci√≥n establecida
    stream = StockDataStream(config)
    
    # Retorna la instancia del stream para su ejecuci√≥n y monitoreo
    return stream

In [8]:
# Crear una instancia del stream para AAPL (Apple)
stream = create_stock_stream(
    symbol="NVDA",         # S√≠mbolo de la acci√≥n
    adx_threshold=25,      # Umbral para considerar una tendencia fuerte
    di_threshold=20        # Umbral para confirmar direcci√≥n de tendencia
)

# Iniciar el stream
stream.start()

# El stream comenzar√° a mostrar actualizaciones en tiempo real
# Para detenerlo cuando quieras, ejecuta:
#stream.stop()


Iniciando sistema de trading en tiempo real...

Configuraci√≥n:
S√≠mbolo: NVDA
Umbral ADX: 25
Umbral DI: 20
Tiempo entre alertas: 300s

‚ö†Ô∏è ALERTA DE MERCADO CERRADO

Estado: Mercado cerrado (fin de semana)

Pr√≥xima apertura del mercado:
Fecha: 2025-03-03
Hora: 09:30:00 (hora local)

El programa se detendr√°.


False

# Prueba del sistema

In [9]:
import pandas as pd
from datetime import datetime, timedelta
import websockets
import asyncio
import json
import numpy as np
import nest_asyncio
import time
from IPython.display import clear_output
import threading
from concurrent.futures import ThreadPoolExecutor
import pytz
from ta.trend import ADXIndicator
import logging

# Enable nested loops for Jupyter
nest_asyncio.apply()

class ConnectionStatus:
    DISCONNECTED = "Desconectado"
    CONNECTING = "Conectando"
    CONNECTED = "Conectado"
    ERROR = "Error"
    RECONNECTING = "Reconectando"

class TradingConfig:
    def __init__(self, 
                 symbol,                    # S√≠mbolo obligatorio
                 adx_threshold=25,          # Umbral ADX para tendencia fuerte
                 di_threshold=20,           # Umbral para cruces DI
                 alert_cooldown=300,        # Tiempo entre alertas (segundos)
                 data_window=300,           # Ventana de datos (segundos)
                 update_interval=2,         # Intervalo de actualizaci√≥n (segundos)
                 api_key=None):             # API key opcional
        
        self.symbol = symbol.upper()        # Convertir a may√∫sculas
        self.adx_threshold = adx_threshold
        self.di_threshold = di_threshold
        self.alert_cooldown = alert_cooldown
        self.data_window = data_window
        self.update_interval = update_interval
        
        # Configuraci√≥n de API
        self.ws_url = "wss://ws.finnhub.io"
        self.api_key = api_key or "cukll41r01qo08i8jju0cukll41r01qo08i8jjug"
        self.max_retries = 3
        self.reconnect_delay = 5

In [10]:
class AlertSystem:
    def __init__(self):
        self.last_alert = None
        self.last_signal = None
        
    def check_signals(self, data, config):
        """
        Analiza los datos y genera se√±ales de trading incluyendo estado neutral
        Retorna: (dict) mensaje de alerta con el estado actual del mercado
        """
        current_time = time.time()
        adx = data['ADX']
        di_plus = data['DI+']
        di_minus = data['DI-']
        
        # Determinar el estado del mercado
        if adx > config.adx_threshold:
            if di_plus > di_minus and di_plus > config.di_threshold:
                signal_type = 'BUY'
                market_state = 'Tendencia alcista fuerte'
            elif di_minus > di_plus and di_minus > config.di_threshold:
                signal_type = 'SELL'
                market_state = 'Tendencia bajista fuerte'
            else:
                signal_type = 'NEUTRAL'
                market_state = 'Tendencia fuerte sin direcci√≥n clara'
        else:
            signal_type = 'NEUTRAL'
            if adx < 20:
                market_state = 'Mercado sin tendencia clara - Posible rango lateral'
            else:
                market_state = 'Tendencia d√©bil - Esperar confirmaci√≥n'

        # Evitar alertas repetitivas para BUY/SELL
        if (signal_type in ['BUY', 'SELL'] and 
            self.last_signal == signal_type and 
            self.last_alert is not None and 
            current_time - self.last_alert < config.alert_cooldown):
            return None

        self.last_alert = current_time
        self.last_signal = signal_type
        
        return self._generate_alert(data, signal_type, market_state)
        
    def _generate_alert(self, data, signal_type, market_state):
        return {
            'type': signal_type,
            'state': market_state,
            'timestamp': datetime.fromtimestamp(data['timestamp']).strftime('%Y-%m-%d %H:%M:%S'),
            'price': data['close'],
            'adx': data['ADX'],
            'di_plus': data['DI+'],
            'di_minus': data['DI-']
        }

In [11]:
class StockDataStream:
    def __init__(self, config=None):
        self.config = config or TradingConfig()
        self.data_buffer = []
        self.last_update = None
        self.running = True
        self.connection_status = ConnectionStatus.DISCONNECTED
        self.messages_received = 0
        self.alert_system = AlertSystem()
        self.websocket = None
        self.task = None
        self.loop = None
        self.connection_retries = 0
        # Variables para simulaci√≥n
        self.base_price = 150.0
        self.trend = 0
        self.volatility = 0.02
        # Configuraci√≥n del logger
        self.setup_logger()

    def setup_logger(self):
        """Configura el sistema de logging"""
        fecha_actual = datetime.now().strftime('%Y%m%d')
        log_filename = f'trading_alerts_{self.config.symbol}_{fecha_actual}.log'
        
        self.logger = logging.getLogger(f'trading_{self.config.symbol}')
        self.logger.setLevel(logging.INFO)
        
        if not self.logger.handlers:
            file_handler = logging.FileHandler(log_filename)
            file_formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')
            file_handler.setFormatter(file_formatter)
            self.logger.addHandler(file_handler)

    def log_alert(self, alert):
        """Registra una alerta en el log"""
        if alert['type'] in ['BUY', 'SELL']:
            mensaje = (
                f"ALERTA DE {alert['type']} | "
                f"S√≠mbolo: {self.config.symbol} | "
                f"Precio: ${alert['price']:.2f} | "
                f"ADX: {alert['adx']:.2f} | "
                f"DI+: {alert['di_plus']:.2f} | "
                f"DI-: {alert['di_minus']:.2f} | "
                f"Estado: {alert['state']}"
            )
            self.logger.info(mensaje)
        else:
            mensaje = (
                f"ESTADO NEUTRAL | "
                f"S√≠mbolo: {self.config.symbol} | "
                f"Precio: ${alert['price']:.2f} | "
                f"Estado: {alert['state']}"
            )
            self.logger.debug(mensaje)

    def _get_signal_emoji(self, signal_type):
        """Retorna el emoji apropiado para cada tipo de se√±al"""
        emojis = {
            'BUY': 'üü¢',
            'SELL': 'üî¥',
            'NEUTRAL': '‚ö™'
        }
        return emojis.get(signal_type, '‚ùì')

    def calculate_adx(self, df, period=14):
        if len(df) < period:
            return pd.Series(np.nan, index=df.index)
        
        adx_indicator = ADXIndicator(
            high=df['high'],
            low=df['low'],
            close=df['close'],
            window=period
        )
        
        return adx_indicator.adx(), adx_indicator.adx_pos(), adx_indicator.adx_neg()

    def _simulate_price(self):
        """Simula movimientos de precio m√°s realistas"""
        # Cambiar tendencia ocasionalmente
        if random.random() < 0.05:  # 5% de probabilidad de cambio de tendencia
            self.trend = random.uniform(-0.1, 0.1)
        
        # Simular movimiento de precio
        price_change = (
            self.trend +  # Componente de tendencia
            random.normalvariate(0, self.volatility)  # Ruido aleatorio
        )
        
        self.base_price = max(self.base_price * (1 + price_change), 1.0)
        current_price = self.base_price
        
        # Simular high y low
        volatility_range = self.volatility * 0.5
        high = current_price * (1 + random.uniform(0, volatility_range))
        low = current_price * (1 - random.uniform(0, volatility_range))
        
        # Asegurar que high > low
        high, low = max(high, low), min(high, low)
        
        return {
            'timestamp': time.time(),
            'close': current_price,
            'high': high,
            'low': low
        }

    async def print_summary(self, data):
        try:
            clear_output(wait=True)
            print("\n" + "="*50)
            print(f"RESUMEN DE MERCADO - {self.config.symbol} (SIMULACI√ìN)")
            print("="*50)
            
            print(f"\nFecha y Hora: {datetime.fromtimestamp(data['timestamp']).strftime('%Y-%m-%d %H:%M:%S')}")
            
            print("\nPRECIOS E INDICADORES:")
            print(f"Precio actual: ${data['close']:.2f}")
            print(f"ADX: {data['ADX']:.2f}")
            print(f"DI+: {data['DI+']:.2f}")
            print(f"DI-: {data['DI-']:.2f}")
            
            alert = self.alert_system.check_signals(data, self.config)
            
            print("\nAN√ÅLISIS DE MERCADO:")
            print("-" * 30)
            
            if alert:
                emoji = self._get_signal_emoji(alert['type'])
                print(f"SE√ëAL ACTUAL: {emoji} {alert['type']}")
                print(f"Estado: {alert['state']}")
                
                if alert['type'] in ['BUY', 'SELL']:
                    print("\n" + "!"*50)
                    print(f"üö® ALERTA DE TRADING - {alert['type']}")
                    print(f"Precio: ${alert['price']:.2f}")
                    print(f"ADX: {alert['adx']:.2f}")
                    print(f"DI+: {alert['di_plus']:.2f}")
                    print(f"DI-: {alert['di_minus']:.2f}")
                    print("!"*50)
                
                self.log_alert(alert)
            
            print(f"\nESTADO: Simulaci√≥n en ejecuci√≥n")
            print(f"Actualizaciones: {self.messages_received}")
            print(f"Log: trading_alerts_{self.config.symbol}_{datetime.now().strftime('%Y%m%d')}.log")
            print("="*50)
        except Exception as e:
            print(f"Error en print_summary: {e}")
            self.logger.error(f"Error en print_summary: {e}")

    async def simulate_trading(self):
        """Simula datos de trading"""
        self.connection_status = ConnectionStatus.CONNECTED
        print("Inicializando datos hist√≥ricos...")
        
        # Inicializar el buffer con datos hist√≥ricos
        for _ in range(20):
            trade_data = self._simulate_price()
            self.data_buffer.append(trade_data)
            await asyncio.sleep(0.1)
        
        print("Comenzando simulaci√≥n en tiempo real...")
        
        while self.running:
            try:
                self.messages_received += 1
                
                trade_data = self._simulate_price()
                self.data_buffer.append(trade_data)
                
                current_time = time.time()
                self.data_buffer = [d for d in self.data_buffer 
                                  if current_time - d['timestamp'] <= self.config.data_window]
                
                if len(self.data_buffer) > 14:
                    df = pd.DataFrame(self.data_buffer)
                    adx, di_pos, di_neg = self.calculate_adx(df)
                    
                    latest_data = {
                        'timestamp': trade_data['timestamp'],
                        'close': trade_data['close'],
                        'high': trade_data['high'],
                        'low': trade_data['low'],
                        'ADX': float(adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 0),
                        'DI+': float(di_pos.iloc[-1] if not pd.isna(di_pos.iloc[-1]) else 0),
                        'DI-': float(di_neg.iloc[-1] if not pd.isna(di_neg.iloc[-1]) else 0)
                    }
                    
                    await self.print_summary(latest_data)
                
                await asyncio.sleep(1)
                
            except Exception as e:
                print(f"Error en simulaci√≥n: {e}")
                self.logger.error(f"Error en simulaci√≥n: {e}")
                await asyncio.sleep(1)

    def is_market_open(self):
        return True, "Simulaci√≥n activa", datetime.now()

    def start(self):
        """Inicia la simulaci√≥n"""
        self.running = True
        self.executor = ThreadPoolExecutor(max_workers=1)
        self.loop = asyncio.new_event_loop()
        
        def run_simulation():
            asyncio.set_event_loop(self.loop)
            self.task = self.loop.create_task(self.simulate_trading())
            try:
                self.loop.run_until_complete(self.task)
            except asyncio.CancelledError:
                pass
            
        self.thread = threading.Thread(target=run_simulation)
        self.thread.start()
        print("Simulaci√≥n iniciada. Para detener, usa stream.stop()")
        return True

    def stop(self):
        """Detiene la simulaci√≥n"""
        print("\nüõë Deteniendo simulaci√≥n...")
        self.running = False
        
        if hasattr(self, 'task') and self.task:
            self.loop.call_soon_threadsafe(self.task.cancel)
        
        if hasattr(self, 'thread'):
            self.thread.join(timeout=5)
        
        if hasattr(self, 'executor'):
            self.executor.shutdown(wait=False)
        
        self.logger.info("Simulaci√≥n detenida")
        print("Simulaci√≥n detenida exitosamente")

In [12]:
def create_stock_stream(symbol, adx_threshold=25, di_threshold=20, api_key=None):
    """
    Crea y retorna una instancia del stream con configuraci√≥n personalizada
    
    Par√°metros:
    symbol (str): S√≠mbolo de la acci√≥n (ej: 'AAPL', 'MSFT', 'GOOGL')
    adx_threshold (float): Umbral para considerar tendencia fuerte (default: 25)
    di_threshold (float): Umbral para confirmar direcci√≥n de tendencia (default: 20)
    api_key (str): API key de Finnhub (opcional)
    """
    config = TradingConfig(
        symbol=symbol,
        adx_threshold=adx_threshold,
        di_threshold=di_threshold,
        api_key=api_key
    )
    
    print("\nIniciando sistema de trading en tiempo real...")
    print("\nConfiguraci√≥n:")
    print(f"S√≠mbolo: {config.symbol}")
    print(f"Umbral ADX: {config.adx_threshold}")
    print(f"Umbral DI: {config.di_threshold}")
    print(f"Tiempo entre alertas: {config.alert_cooldown}s")
    
    stream = StockDataStream(config)
    return stream

In [17]:
import random

# Crear el stream de prueba
test_stream = create_stock_stream(
    symbol="NVDA",
    adx_threshold=25,
    di_threshold=20
)

# Iniciar la simulaci√≥n
test_stream.start()

# Para detener cuando quieras:
# test_stream.stop()


RESUMEN DE MERCADO - NVDA (SIMULACI√ìN)

Fecha y Hora: 2025-03-01 13:26:12

PRECIOS E INDICADORES:
Precio actual: $1.01
ADX: 96.37
DI+: 4.70
DI-: 78.80

AN√ÅLISIS DE MERCADO:
------------------------------

ESTADO: Simulaci√≥n en ejecuci√≥n
Actualizaciones: 105
Log: trading_alerts_NVDA_20250301.log


In [18]:
test_stream.stop()


üõë Deteniendo simulaci√≥n...
Simulaci√≥n detenida exitosamente
