Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does this library support connecting to mqtt cluster? #308

Closed
tjmcclure0501 opened this issue Jun 30, 2018 · 4 comments
Closed

Does this library support connecting to mqtt cluster? #308

tjmcclure0501 opened this issue Jun 30, 2018 · 4 comments

Comments

@tjmcclure0501
Copy link

tjmcclure0501 commented Jun 30, 2018

Does this library support connecting to a mqtt cluster - it is not mentioned in the documentation - it is a hard requirement for us.

@jamesmyatt
Copy link
Contributor

Is there anything about the interface between your implementation of a cluster of MQTT brokers and the MQTT clients that is different from just a regular interface?

@tjmcclure0501
Copy link
Author

I am trying to leverage an MQTT cluster with 3 nodes. MQTT packages in other lang such as node https://www.npmjs.com/package/mqtt allow for the passing in of a list of MQTT urls and handle reconnecting to another MQTT server if the first one goes down. This library does not support the passing of multiple URLS. Is there a document way to handle failover using this library?

@gustaebel
Copy link

import random
from paho.mqtt.client import *


class ClusterClient(Client):
    """A subclass of paho.mqtt.Client that supports connecting to a cluster of
       mqtt brokers. connect() and connect_async() additionally accept a list
       of hostnames or host/port tuples:

           connect("host1")

           connect(["host1", "host2", "host3"]) # use default port 1883

           connect(["host1", ("host2", 8883), ("host3", 8883)])

       Hosts to connect to are chosen randomly. If a host disappears the client
       automatically connects to another host from the list.
    """

    def __init__(self, client_id="", clean_session=True, userdata=None,
            protocol=MQTTv311, transport="tcp"):
        super().__init__(client_id, clean_session, userdata, protocol, transport)
        self._hosts = []

    def connect_async(self, host, port=1883, keepalive=60, bind_address=""):
        if isinstance(host, (list, tuple)):
            self._hosts = [(t, 1883) if isinstance(t, str) else t for t in host]
        else:
            self._hosts = [(host, port)]

        for host, port in self._hosts:
            if host is None or len(host) == 0:
                raise ValueError('Invalid host.')
            if port <= 0:
                raise ValueError('Invalid port number.')

        host, port = random.choice(self._hosts)

        super().connect_async(host, port, keepalive, bind_address)

    def reconnect(self):
        hosts = self._hosts[:]
        random.shuffle(hosts)
        while True:
            self._host, self._port = hosts.pop(0)
            try:
                return super().reconnect()
            except socket.error:
                if not hosts:
                    raise

@MattBrittan
Copy link
Contributor

I'm going to close this because it looks like a solution has been proposed (alternatives include using on_pre_connect or implement via DNS).

It would be possible to add the ability to specify multiple hosts but that creates further complications. If those hosts are not clustered then they will have differing session state stores...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants