Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug]The subscriber stop receiving messages because keepalive failed #506

Closed
neo502721 opened this issue May 11, 2021 · 4 comments
Closed
Labels
Details Required Further information (logs etc) is required before this issue can be investigated

Comments

@neo502721
Copy link

neo502721 commented May 11, 2021

What happend

I use the emqtt_bech https://github.com/emqx/emqtt-bench to create 100 publishers. These publishers sent messages to the test topic in parallel. Then use golang subscribe the topic and save the message to the databases.
However, subscriber will stop receving messages after running for some time.

DEBUG    13:40:32 [net]      logic waiting for msg on ibound
DEBUG    13:40:32 [net]      startIncomingComms: got msg on ibound
DEBUG    13:40:32 [net]      startIncomingComms: received publish, msgId: 0
DEBUG    13:40:32 [net]      startIncoming Received Message

DEBUG    13:40:32 [pinger]   ping check 14.999959993000001
CRITICAL 13:40:32 [pinger]   pingresp not received, disconnecting     ### Note that this line
DEBUG    13:40:32 [client]   internalConnLost called
DEBUG    13:40:32 [client]   stopCommsWorkers called
DEBUG    13:40:32 [client]   internalConnLost waiting on workers
DEBUG    13:40:32 [client]   stopCommsWorkers waiting for workers

DEBUG    13:40:33 [router]   matchAndDispatch exiting
DEBUG    13:40:33 [client]   startCommsWorkers output redirector finished
DEBUG    13:40:33 [client]   stopCommsWorkers waiting for comms
DEBUG    13:40:33 [net]      outgoing waiting for an outbound message
DEBUG    13:40:33 [net]      outgoing waiting for an outbound message
DEBUG    13:40:33 [net]      outgoing comms stopping
DEBUG    13:40:33 [net]      startComms closing outError
DEBUG    13:40:33 [client]   incoming comms goroutine done
DEBUG    13:40:33 [client]   stopCommsWorkers done
DEBUG    13:40:33 [client]   internalConnLost workers stopped
DEBUG    13:40:33 [client]   internalConnLost complete
DEBUG    13:40:33 Connection lost: pingresp not received, disconnecting
DEBUG    13:40:33 [client]   enter reconnect
DEBUG    13:40:33 [client]   about to write new connect msg
DEBUG    13:40:33 [client]   socket connected to broker
DEBUG    13:40:33 [client]   Using MQTT 3.1.1 protocol
DEBUG    13:40:33 [net]      connect started
DEBUG    13:40:33 [net]      received connack
DEBUG    13:40:33 [client]   startCommsWorkers called
DEBUG    13:40:33 [client]   client is connected/reconnected
DEBUG    13:40:33 [net]      incoming started
DEBUG    13:40:33 [net]      startIncomingComms started
DEBUG    13:40:33 [net]      outgoing started
DEBUG    13:40:33 [net]      startComms started
DEBUG    13:40:33 [client]   startCommsWorkers done
DEBUG    13:40:33 [store]    enter Resume
DEBUG    13:40:33 [store]    exit resume
DEBUG    13:40:33 [pinger]   keepalive starting
DEBUG    13:40:33 [net]      logic waiting for msg on ibound
DEBUG    13:40:33 [net]      startIncomingComms: inboundFromStore complete
DEBUG    13:40:33 [net]      logic waiting for msg on ibound
DEBUG    13:40:33 [net]      outgoing waiting for an outbound message

What caused it?

I spent some time and found this bug might be caused by the keepalive.

paho.mqtt.golang/ping.go

Lines 26 to 28 in 8e87e5f

// keepalive - Send ping when connection unused for set period
// connection passed in to avoid race condition on shutdown
func keepalive(c *client, conn io.Writer) {

The function called by startCommsWorkers when the mqtt client's option KeepAlive is not 0.
func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool {

The keepalive function annotation said Send ping when connection unused. But it still send ping when my connection is in use. The ping resp failed sometimes under the a large number of parallel messages scenario. The mqtt client called stopCommsWorkers and reconnect to mqtt broker. But my subscriber's callback function never run again.

How to testify

In order to prove my idea, I set the KeepAlive option to 0. And this bug seems solved.

Environment

golang: go version go1.14.2 linux/amd64
github.com/eclipse/paho.mqtt.golang v1.3.3
emqx: 4.2.5

@neo502721 neo502721 changed the title The subscriber stop receiving messages because keepalive failed [bug]The subscriber stop receiving messages because keepalive failed May 11, 2021
@MattBrittan
Copy link
Contributor

MattBrittan commented May 11, 2021

The ping is sent if:

  • Nothing has been sent for options.KeepAlive OR;
  • Nothing has been received for options.KeepAlive.

So your log does not contain enough information to assess whether the ping is being sent appropriately (given what is logged it looks like the last send was 15 seconds ago but I don't know what your keepalive interval was set to and the ping was sent before the section of log you included). As per the readme please attempt to provide a Minimal, Reproducible Example, or at a minimum the code you are using to connect, so I can see what options are set. This library has so many options that without knowing your configuration its impossible to assess issues. Note that, as per the readme, the most common cause of this kind of issue is when your handler is not returning in a timely manner (try setting SetOrderMatters(false) to avoid this restriction).

The mqtt client called stopCommsWorkers and reconnect to mqtt broker. But my subscriber's callback function never run again.

That is to be expected if you have set CleanSession to true (or potentially QOS=0 if the messages are all published before the connection is re-established)..

In order to prove my idea, I set the KeepAlive option to 0. And this bug seems solved.

This just turns off the keep alive; so yes it would prevent the issue occurring but does not really help (and with unreliable links keepalives are recommended).

@MattBrittan MattBrittan added the Details Required Further information (logs etc) is required before this issue can be investigated label May 11, 2021
@neo502721
Copy link
Author

Hi, @MattBrittan , this is my source code. The main grountine sub the EMQ broker. The handler function will send the message to channel When get the message. Another grountine read the channel and insert the meesage to ClickHouse DB.
I set 100 producers to send messages all the time.

The log has been attached. I recorded the time to insert the message into ClickHouse DB. You can see in the log file that it's almost always 60ms

package main

import (
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"log"
	"net/http"
	"os"
	"time"

	_ "net/http/pprof"
)

type MessageStruct struct {
	Topic string
	Message []byte
}

func main() {
	mqtt.DEBUG = log.New(os.Stdout, "DEBUG    ", log.Ltime)
	mqtt.WARN = log.New(os.Stdout, "WARNING  ", log.Ltime)
	mqtt.CRITICAL = log.New(os.Stderr, "CRITICAL ", log.Ltime)
	mqtt.ERROR = log.New(os.Stderr, "ERROR    ", log.Ltime)
	go func() {
		log.Println(http.ListenAndServe("localhost:6060", nil))
	}()

	ch := make(chan MessageStruct, 1)
	var c ClickHouseOption
	c.newConnect()
	c.newClilcHoseTable()
	go c.insertFromChannel(ch)
	mymqtt(ch)
}

func mymqtt(ch chan MessageStruct)  {
	quit := make(chan int)
	fmt.Println("sub start")
	opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.3.32:1883")
	opts.SetClientID("123456678")
	opts.SetUsername("admin")
	opts.SetPassword("admin")
	opts.SetKeepAlive(5 * time.Second)
	c := mqtt.NewClient(opts)
	token := c.Connect()
	if token.Wait() && token.Error() != nil {
		panic(token.Error())
	}
	var msgRcvd mqtt.MessageHandler= func(client mqtt.Client, message mqtt.Message) {
		ch  <- MessageStruct{Topic: message.Topic(),Message: message.Payload()}
		//fmt.Println("buff len when recive msg", len(ch))
	}
	fmt.Println("start read")
	if token := c.Subscribe("test/topic", 0, msgRcvd); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
	}
	<- quit
}

192.168.3.33_2021-05-14_14-08-37.log

@MattBrittan
Copy link
Contributor

MattBrittan commented May 14, 2021

So the ping is triggered here:

DEBUG    14:08:52 [pinger]   ping check 6.000124424
DEBUG    14:08:52 [pinger]   keepalive sending ping

and time out occurs here:

CRITICAL 14:09:04 [pinger]   pingresp not received, disconnecting
DEBUG    14:09:04 [client]   internalConnLost called
DEBUG    14:09:04 [client]   stopCommsWorkers called
DEBUG    14:09:04 [client]   internalConnLost waiting on workers
DEBUG    14:09:04 [client]   stopCommsWorkers waiting for workers
...
DEBUG    14:09:04 [client]   internalConnLost complete
DEBUG    14:09:04 Connection lost: pingresp not received, disconnecting
DEBUG    14:09:04 [client]   enter reconnect
  1. Why did the pingcheck occur when data is being received contentiously?

You are using QOS=0 so the client does not send any response when a message is received. As mentioned above a PING is triggered when the KeepAlive is exceeded on EITHER sending or receiving (sending in this case). As nothing had been sent for over 5 seconds a PING was triggered.

This does means that if the only thing you are doing is to receive QOS0 messages then you should expect a PING to be sent fairly regularly. This check was added back in 2017 (in this commit) and appears in line with the spec requirement that:

It is the responsibility of the Client to ensure that the interval between Control Packets being sent does not exceed the Keep Alive value. In the absence of sending any other Control Packets, the Client MUST send a PINGREQ Packet [MQTT-3.1.2-23].

  1. Why was the PINGRESP not received?

Insufficient info; however if you are hammering the link with messages it's quite possible that the ping response will be delayed (you would need to check the broker logs to see if it received/responded to the request). Perhaps try increasing PingTimeout (e.g. SetPingTimeout(x)).

  1. Why were no messages received after the connection came back up?

Messages were received; but only ping checks:

DEBUG    14:09:10 [pinger]   keepalive sending ping
DEBUG    14:09:10 [net]      startIncoming Received Message
DEBUG    14:09:10 [net]      startIncomingComms: got msg on ibound
DEBUG    14:09:10 [net]      startIncomingComms: received pingresp

You are using Cleansession = true (the default) so the broker will not remember your subscription (if you want it to call SetCleanSession(false)). You will also want to use QOS 1+ if you don't want the broker to drop messages while you are disconnected (note that some brokers can be configured to do this at QOS=0).

In summary I'm not seeing a bug here; everything seems to be operating as per the spec (but I don't know what is happening on the broker side in regards to responding to the PING).

@neo502721
Copy link
Author

Hi @MattBrittan,Thank you for your help very mush. I will close this issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Details Required Further information (logs etc) is required before this issue can be investigated
Projects
None yet
Development

No branches or pull requests

2 participants