{[Click aquí para ver este documento en Google Colab](https://colab.research.google.com/drive/1UoFLlek8Yvpyo0CPs4vpCRFFndgAFnBm)}

<head><link rel = "stylesheet" href = "https://drive.google.com/uc?id=1zYOH-_Mb9jOjRbQmghdhsmZ2g6xAwakk"></head>

<table class = "header" width = 100%><tr>
    <th align = "left">The S<code>AI</code>X Guys | AI Trading, Septiembre 2020</th>
    <th align = "right">Escrito por: Gaston Solari Loudet</th>
</tr></table>

## Clase "ZMQL"

In [1]:
import zmq, time, datetime, pandas, threading, IPython
ZMQ = zmq.Context()

### Inicialización

In [2]:
class ZMQL(object):
    #| Nombres de puertos; deben llevar al principio y en mayusculas, el TIPO (ej.: "PUSH Data", "SUB Trading", etc.)
    _PortsDef = {"PUSH": 32768, "PULL": 32769, "SUB": 32770} #| IDs para "Dirección" de cada puerto. Deben ser ints.
    #| Lista de códigos de error de MT4 con sus descripciones, en formato "pandas.Series".
    _MQErrors = pandas.read_csv(".\MQL4ErrorCodes.csv").set_index("#")["Error"]
    def __init__(self, context, ports = _PortsDef, verbose = False):
        assert isinstance(verbose, bool), "((INIT)) ERROR! \"verbose\" may either be \"True\", \"False\" or \"None\"."
        assert isinstance(context, zmq.sugar.context.Context), "((INIT)) ERROR! Use a valid (zmq.) \"context\" input."
        warn = "((INIT)) ERROR! \"ports\" dict may be something like: {\"PUSH ...\": (int), \"PULL ...\": (int), ...}."
        assert isinstance(ports, dict) and all([isinstance(port, int) for port in ports.values()]), warn
        assert (list(ports.values()) == list(set(ports.values()))), warn + " Each key/value must be unique."
        #| Dataframe con objetos de comunicación: con todas las variables de acceso al protocolo.
        self.Comm = pandas.DataFrame(columns = ["Port"], index = ports.keys(), data = ports.values())
        self.Enable = {"comm": True, "verbose": verbose} #| Diccionario con parámetros de control.
        self.Poller = zmq.Poller() #| Inicializar "poll" para empezar a detectar llegada de mensajes.
        print("---------------------------------------"*2)
        for label in self.Comm.index: #| Ir creando cada uno de los puertos de comunicación con MetaTrader.
            assert isinstance(ports[label], int), "((INIT)) ERROR! Ports must be integers and respond to MT4."
            if (verbose != None): print(f">>{label}<< Connecting to port {ports[label]}. Await for response...")
            enum = eval("zmq." + label.split(' ')[0]) #| "Enum" de ZMQ que distingue que tipo de puerto es.
            self.Comm.loc[label, "Enum"] = enum #| Guardar este "Enum" en el DataFrame de objetos de comunicación.
            Socket = context.socket(enum) #| Crear socket: portal de envío, recepción e interpretación de mensajes.
            self.Comm.loc[label, "Socket"] = Socket #| Guardar socket en DataFrame de objetos de comunicación.
            Socket.connect(f"tcp://localhost:{ports[label]}") #| Conectar cada dirección al protocolo común (ethernet).
            if (enum == zmq.PUSH): Socket.setsockopt(zmq.SNDHWM, 1) #| Limitar memoria para cache de puertos PUSH.
            if (enum == zmq.PULL): Socket.setsockopt(zmq.RCVHWM, 1) #| Limitar memoria para cache de puertos PULL.
            if (enum == zmq.SUB): Socket.setsockopt(zmq.SUBSCRIBE, b"") #| Limitar memoria para cache de puertos SUB.
            if enum in (zmq.PULL, zmq.SUB): self.Poller.register(Socket, zmq.POLLIN) #| Crear "listeners" para recepción.
            self.Comm.loc[label, "Cache"] = "" #| Crear columna en DataFrame como "inbox de últimos mensajes".
            #| Initialize poll set for message parsing.
        print("---------------------------------------"*2)
        self.Comm[["Port", "Enum"]] = self.Comm[["Port", "Enum"]].astype(int) #| Guardar "Enums" de ZMQ como ints.

### Deinicialización

In [3]:
class ZMQL(ZMQL):
    def __init__(self, context, ports = ZMQL._PortsDef, verbose = False):
        super().__init__(context, ports = ports, verbose = verbose)
    def shutdown(self):
        print("---------------------------------------"*2)
        for label in self.Comm.index: #| Por cada puerto...
            #| Dar el "listener" de baja, si es un puerto de recepción.
            if ("PULL" in label) or ("SUB" in label):
                self.Poller.unregister(self.Comm["Socket"][label])
            port = self.Comm["Port"][label] #| Adquirir ID de cada puerto.
            address = f"tcp://localhost:{port}" #| Armar dirección en protocolo.
            self.Comm["Socket"][label].disconnect(address) #| Disasociar de dirección.
            print(f">>{label}<< Disconnected from port {port}.") #| Informar en consola.
        print("---------------------------------------"*2)

#### Prueba

In [4]:
INST = ZMQL(context = ZMQ, verbose = True)   ;   INST.shutdown()

------------------------------------------------------------------------------
>>PUSH<< Connecting to port 32768. Await for response...
>>PULL<< Connecting to port 32769. Await for response...
>>SUB<< Connecting to port 32770. Await for response...
------------------------------------------------------------------------------
------------------------------------------------------------------------------
>>PUSH<< Disconnected from port 32768.
>>PULL<< Disconnected from port 32769.
>>SUB<< Disconnected from port 32770.
------------------------------------------------------------------------------


### Transmisión

In [5]:
class ZMQL(ZMQL):
    def __init__(self, context, ports = ZMQL._PortsDef, verbose = False):
        super().__init__(context, ports = ports, verbose = verbose)
        #| Enviar un primer mensaje, como prueba.
        self._send(socket = "PUSH", message = "Check; ; ; ; ; ; ; ; ; ")
    def _send(self, socket, message):
        if not isinstance(socket, str) or not isinstance(message, str):
            print(f">>{socket}<< ERROR! Message to be sent, must be string") ; return
        if "PUSH" not in socket: return #| Los puertos no PUSH no estan hechos para enviar mensajes.
        self.Comm.loc[socket, "Cache"] = message #| Antes que nada, conservar copia del mensaje.
        try: #| Poner mensaje en "fila de espera" del socket. Enviar de inmediato apenas disponible.
            self.Comm["Socket"][socket].send_string(message, zmq.DONTWAIT) 
            if (self.Enable["verbose"] != None): print(f"[{socket}] Command sent: [{message}]")
            time.sleep(0.02) #| Esperar un poco para darle tiempo a la llegada de la respuesta.
        except zmq.error.Again: #| Limitar la espera del mensaje. Informar cuando esperó demasiado.
            print(f">>{socket}<< Warning! Timeout with no response... try again.")

#### Prueba

In [6]:
INST = ZMQL(context = ZMQ)   ;   INST.shutdown()

------------------------------------------------------------------------------
>>PUSH<< Connecting to port 32768. Await for response...
>>PULL<< Connecting to port 32769. Await for response...
>>SUB<< Connecting to port 32770. Await for response...
------------------------------------------------------------------------------
[PUSH] Command sent: [Check; ; ; ; ; ; ; ; ; ]
------------------------------------------------------------------------------
>>PUSH<< Disconnected from port 32768.
>>PULL<< Disconnected from port 32769.
>>SUB<< Disconnected from port 32770.
------------------------------------------------------------------------------


### Recepción

In [7]:
class ZMQL(ZMQL):
    def __init__(self, context, ID, ports = ZMQL._PortsDef, verbose = False):
        super().__init__(context, ports = ports, verbose = verbose)
        self.Thread = threading.Thread(name = ID, target = self._receive)
        self.Thread.daemon = True
        self.Thread.start()
    def _receive(self):
        while self.Enable["comm"]: #| Dentro del hilo paralelo, siempre y cuando este parámetro de control sea "True"...
            sockets_polled = dict(self.Poller.poll()) #| Chequear cuales son los puertos que han recibido algo.
            for label in self.Comm.index: #| Tomar a cada uno de los puertos que tengo registrados.
                Socket = self.Comm["Socket"][label] #| De ellos, tomar a cada uno de los sockets.
                if (Socket not in sockets_polled.keys()): continue #| Saltear los que no son de recepción. 
                if (sockets_polled[Socket] != zmq.POLLIN): continue #| Saltear los que no hayan recibido nada.
                try: message = self.Comm["Socket"][label].recv_string(zmq.DONTWAIT) #| Formular respuesta como string.
                except: message = None #| Si no se pudo formular el string de respuesta, es que no hubo respuesta.
                if message: #| Si se formuló una respuesta, parsearla literalmente como una "linea de Python".
                    try: self._process(eval(message)) #| Por ejemplo: "eval('x = 2')" hace que "x" sea "2".
                    #| "self._process" va a ser una función que va a procesar la variable implicada.
                    except Exception as ex: #| Cuando hubo un error al procesarla...
                        if (self.Enable["verbose"] != None): #| Si se activó el verbose de errores "no graves"...
                            warning = f"Type: {type(ex)}. Args: {ex.args}" #| Mostrar mensaje de error.
                            print(f">>{label}<< Warning! Reception error --> {warning}")
                        continue #| No bloquear el bucle paralelo de recepción luego de un error de recepción.
                    self.Comm.loc[label, "Cache"] = message #| Guardar string en Cache de puerto tal y como llegó.
                    #| Verbose de 2º grado (False/True): Mostrar mensajes PULL. En general, son respuestas de sends.
                    if (self.Enable["verbose"] != None) and (label == "PULL"):
                        print(f">>{label}<< Received --> {message}", flush = True)
                    #| Verbose de 3º grado (solo True): Mostrar mensajes SUB. En general, datos asincrónicos (ticks).
                    if (self.Enable["verbose"] == True) and (label == "SUB"):
                        print(f">>{label}<< Received --> {message}", flush = True)
                    time.sleep(0.001) #| Apenas pausar al sistema para impedir saturación de punto de recepción.

In [8]:
class ZMQL(ZMQL):
    _IDError = "((INIT)) ERROR! \"ID\" must be a different string from other block IDs."
    def __init__(self, context, ID, ports = ZMQL._PortsDef, verbose = False):
        assert isinstance(ID, str), ZMQL._IDError 
        super().__init__(context, ID = ID, ports = ports, verbose = verbose)
        self._DebugLog = pandas.Series({datetime.datetime.now(): "Start"})
    def _process(self, message):
        self._DebugLog[datetime.datetime.now()] = str(message) #| Añadir al final del log.
        excess = max(0, len(self._DebugLog) - 100) #| Medir si hay un exceso de mensajes.
        self._DebugLog = self._DebugLog[excess :]  #| Conservar siempre los últimos 100.

#### Prueba

In [26]:
INST = ZMQL(ID = "SAIX", context = ZMQ, verbose = True)

In [47]:
INST._send("PUSH", f"Ticks;EURUSD;-20;3; ; ; ; ; ; ")

[PUSH] Command sent: [Ticks;EURUSD;-20;3; ; ; ; ; ; ]


In [43]:
#symbol = "ETHUSD"
INST._send("PUSH", f"Ticks;EURUSD;1;0; ; ; ; ; ; ")
INST._send("PUSH", f"Ticks;BTCUSD;-5;1; ; ; ; ; ; ")
INST._send("PUSH", f"Ticks;US500;10;2; ; ; ; ; ; ")
INST._send("PUSH", f"Ticks;ETHUSD;60;3; ; ; ; ; ; ")
INST._send("PUSH", f"Ticks;XAUUSD;-20;4; ; ; ; ; ; ")
# x = eval(INST.Comm["Cache"]["PULL"])
INST.Comm

[PUSH] Command sent: [Ticks;EURUSD;1;0; ; ; ; ; ; ]
>>PULL<< Received --> {'Ticks': ['EURUSD', 1]}
[PUSH] Command sent: [Ticks;BTCUSD;-5;1; ; ; ; ; ; ]
>>PULL<< Received --> {'Ticks': ['BTCUSD', -5]}
[PUSH] Command sent: [Ticks;US500;10;2; ; ; ; ; ; ]
>>PULL<< Received --> {'Ticks': ['US500', 10]}
[PUSH] Command sent: [Ticks;ETHUSD;60;3; ; ; ; ; ; ]
>>PULL<< Received --> {'Ticks': ['ETHUSD', 60]}
[PUSH] Command sent: [Ticks;XAUUSD;-20;4; ; ; ; ; ; ]


Unnamed: 0,Port,Enum,Socket,Cache
PUSH,32768,8,<zmq.sugar.socket.Socket object at 0x000001E3B...,Ticks;XAUUSD;-20;4; ; ; ; ; ;
PULL,32769,7,<zmq.sugar.socket.Socket object at 0x000001E3B...,"{'Ticks': ['ETHUSD', 60]}"
SUB,32770,2,<zmq.sugar.socket.Socket object at 0x000001E3B...,"{'US500': [1602805504.565525,1.17,11545.38,1.1..."


In [48]:
INST._DebugLog[-50:]

2020-10-15 17:44:07.010284    {'EURUSD': [1602805446.025144, 1.17087, 1.1708...
2020-10-15 17:44:08.011793    {'EURUSD': [1602805447.028248, 1.17087, 1.1708...
2020-10-15 17:44:09.002516    {'EURUSD': [1602805448.024322, 1.17087, 1.1708...
2020-10-15 17:44:10.011538    {'EURUSD': [1602805449.009824, 1.17087, 1.1708...
2020-10-15 17:44:11.009286    {'EURUSD': [1602805450.027448, 1.17087, 1.1708...
2020-10-15 17:44:12.011681    {'EURUSD': [1602805451.028715, 1.17087, 1.1708...
2020-10-15 17:44:13.009341    {'EURUSD': [1602805452.024697, 1.17087, 1.1708...
2020-10-15 17:44:14.012707    {'EURUSD': [1602805453.02645, 1.17087, 1.17087...
2020-10-15 17:44:15.009542    {'EURUSD': [1602805454.02689, 1.17087, 1.17087...
2020-10-15 17:44:16.010090    {'EURUSD': [1602805455.024517, 1.17087, 1.1708...
2020-10-15 17:44:17.013340    {'EURUSD': [1602805456.027671, 1.17088, 1.1708...
2020-10-15 17:44:18.015180    {'EURUSD': [1602805457.025441, 1.17088, 1.1708...
2020-10-15 17:44:19.005785    {'EURUSD':

### Datos OHLCVS

#### Descarga

#### Prueba

#### Procesamiento

#### Prueba

In [None]:
INST.MsgDebugLog

### "SUB" para suscripción.

Antes que nada, una aclaración: cuando alguien en el mundo envia una orden a mi broker (ya sea de compra, venta, modificación o cierre), los números del Order Book son obviamente alterados. A esta alteración se le llama "tick". Un "dato de tick" es la obtención de únicamente la primera fila del Order Book (comunmente llamada "nivel 1") cuando sucede un tick. Por definición, dicha fila contiene aquellos precios de "Bid" y "Ask" que están mas cercanos entre sí, y que dan lugar al spread.

Un dato de tick puede ser:
- "<u>bilateral</u>": incluye ambos valores de volúmen. Uno para "Bid" y otro para "Ask".
- "<u>unilateral</u>": comprende solamente un único valor de volúmen, dado por la suma de ambos lados.

La mayoría de los brokers no proveen de datos a nivel Order Book, ni tampoco datos de tick bilaterales. Sin embargo, tener en cuenta que los Expert Advisors de MetaTrader tienen la capacidad de ejecutar su rutina de trabajo cada vez que sucede un tick. De eso mismo se encarga la función "``OnTick()``" en MQL. De hecho, lo que el conector de DWX para MetaTrader contiene dentro de su "``OnTick()``" es la rutina de guardar (en un array) y luego enviar a mi Python (via ZMQ) los últimos valores de ``B``id, ``A``sk y (único) ``V``olumen.

#### Prueba

#### Procesamiento

#### Prueba

In [None]:
datetime.datetime.now().timestamp()

In [None]:
ZMQL._MQErrors.loc["4073"]

In [None]:
ZMQL._MQErrors[4073]