Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

What is the right way to use this code (Paho MQTT) as GoRoutine and pass messages via channel to publish via websockets #357

Closed
KabDeveloper opened this issue Sep 30, 2019 · 3 comments

Comments

@KabDeveloper
Copy link

As standard code I am using to publish message for testing purpose:

func main() {

	opts := MQTT.NewClientOptions().AddBroker("tcp://127.0.0.1:1883")
	opts.SetClientID("myclientid_")
	opts.SetDefaultPublishHandler(f)
	opts.SetConnectionLostHandler(connLostHandler)

	opts.OnConnect = func(c MQTT.Client) {
		fmt.Printf("Client connected, subscribing to: test/topic\n")

		if token := c.Subscribe("logs", 0, nil); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			os.Exit(1)
		}
	}

	c := MQTT.NewClient(opts)
	if token := c.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}


	for i := 0; i < 5; i++ {
		text := fmt.Sprintf("this is msg #%d!", i)
		token := c.Publish("logs", 0, false, text)
		token.Wait()
	}

	time.Sleep(3 * time.Second)

	if token := c.Unsubscribe("logs"); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	c.Disconnect(250)
}

This works well ! but passing messages in mass while doing high latency tasks, performance of my program will be low, so I have to use goroutine and channel.

So, I was looking for a way to make a Worker inside goroutine for PUBLISHING messages to the browser using Paho MQTT library for GOlang, I had a hard time to find a better solution that feet my need, but after some searches, I found this code:

package main

import (
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "io/ioutil"
    "strings"
    "time"

    MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
    "linksmart.eu/lc/core/catalog"
    "linksmart.eu/lc/core/catalog/service"
)

// MQTTConnector provides MQTT protocol connectivity
type MQTTConnector struct {
    config        *MqttProtocol
    clientID      string
    client        *MQTT.Client
    pubCh         chan AgentResponse
    subCh         chan<- DataRequest
    pubTopics     map[string]string
    subTopicsRvsd map[string]string // store SUB topics "reversed" to optimize lookup in messageHandler
}

const defaultQoS = 1

func (c *MQTTConnector) start() {
    logger.Println("MQTTConnector.start()")

    if c.config.Discover && c.config.URL == "" {
        err := c.discoverBrokerEndpoint()
        if err != nil {
            logger.Println("MQTTConnector.start() failed to start publisher:", err.Error())
            return
        }
    }

    // configure the mqtt client
    c.configureMqttConnection()

    // start the connection routine
    logger.Printf("MQTTConnector.start() Will connect to the broker %v\n", c.config.URL)
    go c.connect(0)

    // start the publisher routine
    go c.publisher()
}

// reads outgoing messages from the pubCh und publishes them to the broker
func (c *MQTTConnector) publisher() {
    for resp := range c.pubCh {
        if !c.client.IsConnected() {
            logger.Println("MQTTConnector.publisher() got data while not connected to the broker. **discarded**")
            continue
        }
        if resp.IsError {
            logger.Println("MQTTConnector.publisher() data ERROR from agent manager:", string(resp.Payload))
            continue
        }
        topic := c.pubTopics[resp.ResourceId]
        c.client.Publish(topic, byte(defaultQoS), false, resp.Payload)
        // We dont' wait for confirmation from broker (avoid blocking here!)
        //<-r
        logger.Println("MQTTConnector.publisher() published to", topic)
    }
}


func (c *MQTTConnector) stop() {
    logger.Println("MQTTConnector.stop()")
    if c.client != nil && c.client.IsConnected() {
        c.client.Disconnect(500)
    }
}

func (c *MQTTConnector) connect(backOff int) {
    if c.client == nil {
        logger.Printf("MQTTConnector.connect() client is not configured")
        return
    }
    for {
        logger.Printf("MQTTConnector.connect() connecting to the broker %v, backOff: %v sec\n", c.config.URL, backOff)
        time.Sleep(time.Duration(backOff) * time.Second)
        if c.client.IsConnected() {
            break
        }
        token := c.client.Connect()
        token.Wait()
        if token.Error() == nil {
            break
        }
        logger.Printf("MQTTConnector.connect() failed to connect: %v\n", token.Error().Error())
        if backOff == 0 {
            backOff = 10
        } else if backOff <= 600 {
            backOff *= 2
        }
    }

    logger.Printf("MQTTConnector.connect() connected to the broker %v", c.config.URL)
    return
}

func (c *MQTTConnector) onConnected(client *MQTT.Client) {
    // subscribe if there is at least one resource with SUB in MQTT protocol is configured
    if len(c.subTopicsRvsd) > 0 {
        logger.Println("MQTTPulbisher.onConnected() will (re-)subscribe to all configured SUB topics")

        topicFilters := make(map[string]byte)
        for topic, _ := range c.subTopicsRvsd {
            logger.Printf("MQTTPulbisher.onConnected() will subscribe to topic %s", topic)
            topicFilters[topic] = defaultQoS
        }
        client.SubscribeMultiple(topicFilters, c.messageHandler)
    } else {
        logger.Println("MQTTPulbisher.onConnected() no resources with SUB configured")
    }
}

func (c *MQTTConnector) onConnectionLost(client *MQTT.Client, reason error) {
    logger.Println("MQTTPulbisher.onConnectionLost() lost connection to the broker: ", reason.Error())

    // Initialize a new client and reconnect
    c.configureMqttConnection()
    go c.connect(0)
}

func (c *MQTTConnector) configureMqttConnection() {
    connOpts := MQTT.NewClientOptions().
        AddBroker(c.config.URL).
        SetClientID(c.clientID).
        SetCleanSession(true).
        SetConnectionLostHandler(c.onConnectionLost).
        SetOnConnectHandler(c.onConnected).
        SetAutoReconnect(false) // we take care of re-connect ourselves

    // Username/password authentication
    if c.config.Username != "" && c.config.Password != "" {
        connOpts.SetUsername(c.config.Username)
        connOpts.SetPassword(c.config.Password)
    }

    // SSL/TLS
    if strings.HasPrefix(c.config.URL, "ssl") {
        tlsConfig := &tls.Config{}
        // Custom CA to auth broker with a self-signed certificate
        if c.config.CaFile != "" {
            caFile, err := ioutil.ReadFile(c.config.CaFile)
            if err != nil {
                logger.Printf("MQTTConnector.configureMqttConnection() ERROR: failed to read CA file %s:%s\n", c.config.CaFile, err.Error())
            } else {
                tlsConfig.RootCAs = x509.NewCertPool()
                ok := tlsConfig.RootCAs.AppendCertsFromPEM(caFile)
                if !ok {
                    logger.Printf("MQTTConnector.configureMqttConnection() ERROR: failed to parse CA certificate %s\n", c.config.CaFile)
                }
            }
        }
        // Certificate-based client authentication
        if c.config.CertFile != "" && c.config.KeyFile != "" {
            cert, err := tls.LoadX509KeyPair(c.config.CertFile, c.config.KeyFile)
            if err != nil {
                logger.Printf("MQTTConnector.configureMqttConnection() ERROR: failed to load client TLS credentials: %s\n",
                    err.Error())
            } else {
                tlsConfig.Certificates = []tls.Certificate{cert}
            }
        }

        connOpts.SetTLSConfig(tlsConfig)
    }

    c.client = MQTT.NewClient(connOpts)
}

This code do exactly what I am looking for !

But as noob in Golang, I can't figure out how to run START() function inside my main function and what argument to pass !

And espacially, how I will process to pass messages to the worker (Publisher) using channel ?!

Your help will be appreciated !

@MattBrittan
Copy link
Contributor

Hi @bcashier,

When you say "passing messages in mass while doing high latency tasks" I assume that you mean that you want to send the messages asynchronously (so the message is handled by a different go-routine than your main code is running on).

If that is the case then a very simple change to your initial example will give you that:

for i := 0; i < 5; i++ {
		text := fmt.Sprintf("this is msg #%d!", i)
		token := c.Publish("logs", 0, false, text)
		// comment out... token.Wait()
	}

The only reason that your initial code stopped until the message was sent was that you called token.Wait(). If you don't care about errors (and you are not checking for them so I assume you dont care) then there is little point in calling token.Wait() (it just waits until the message is sent; the message will go out whether you call token.Wait or not)..

If you want to log any errors you could use something like:

for i := 0; i < 5; i++ {
		text := fmt.Sprintf("this is msg #%d!", i)
		token := c.Publish("logs", 0, false, text)
                go func(){
		      token.Wait()
                      err := token.Error()
                      if err != nil {
                           fmt.Printf("Error: %s\n", err.Error()) // or whatever you want to do with your error
                      }
                }()
	}

Note that there are a few more things that you need to do if message delivery is critical (but as you are not checking for errors I'm assuming its not).

In terms of the code you found; I suspect that this would add complexity you dont need (and more info would be required to work this out; for example the MqttProtocol struct is not defined within the bit you pasted).

PS. It's better to ask this kind of question somewhere like stack overflow (and I can see that you have now done this).

@MattBrittan
Copy link
Contributor

Same question was asked on Stack Overflow and answer has been accepted.

@KabDeveloper
Copy link
Author

This question was solved by @MattBrittan on StackOverflow.

Thanks again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants