Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QoS>0 messages stop prematurely in Windows. #1417

Open
ghaqua opened this issue Sep 20, 2019 · 14 comments

Comments

@ghaqua
Copy link

@ghaqua ghaqua commented Sep 20, 2019

Qos>0 messages in Windows are not published more than 20 messages. Specifying max_inflight_messages 0 in the broker's configuration file does not publish.

@karlp

This comment has been minimized.

Copy link
Contributor

@karlp karlp commented Sep 23, 2019

You're going to need to provide more detail, but I suspect that you're client applications are not calling mosquitto_loop or similar, so you simply hit max inflight and then it blocks waiting for the rest of the qos handshakes to complete.

@ghaqua

This comment has been minimized.

Copy link
Author

@ghaqua ghaqua commented Sep 24, 2019

Thank you for your comment.

Mosquitto_loop is not used, but subscribes using mosquitto_loop_forever.

Take a look at the sample source below. Is mosquitto_loop_forever not good?

// Mosquitto_Sample.cpp
//

#include "stdafx.h"
#include <mosquitto.h>

void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
	printf("my_Message_callback start!\n");
	if (message->payloadlen) {
		printf("%s %s\n", message->topic, message->payload);
	}
	else {
		printf("%s (null)\n", message->topic);
	}
	fflush(stdout);
	printf("my_Message_callback end!\n");
}

void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
	int i;
	if (!result) {
		/* Subscribe to broker information topics on successful connect. */
		mosquitto_subscribe(mosq, NULL, "sndata", 1);
	}
	else {
		fprintf(stderr, "Connect failed\n");
	}
}

void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
	int i;

	printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
	for (i = 1; i < qos_count; i++) {
		printf(", %d", granted_qos[i]);
	}
	printf("\n");
}

void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
	/* Pring all log messages regardless of level. */
	printf("%s\n", str);
}

int main()
{
	int i;
	char *host = (char *)"localhost";
	int port = 1883;
	int keepalive = 60;
	bool clean_session = true;
	struct mosquitto *mosq = NULL;

	mosquitto_lib_init();
	mosq = mosquitto_new(NULL, clean_session, NULL);
	if (!mosq) {
		fprintf(stderr, "Error: Out of memory.\n");
		return 1;
	}

	mosquitto_log_callback_set(mosq, my_log_callback);
	mosquitto_connect_callback_set(mosq, my_connect_callback);
	mosquitto_message_callback_set(mosq, my_message_callback);
	mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);

	if (mosquitto_connect(mosq, host, port, keepalive)) {
		fprintf(stderr, "Unable to connect.\n");
		return 1;
	}
	mosquitto_max_inflight_messages_set(mosq, 0);

	mosquitto_loop_forever(mosq, -1, 1);

	mosquitto_destroy(mosq);
	mosquitto_lib_cleanup();
	
	return 0;
}
// Mosquitto_Sample_Pub_Console.cpp
//

#include "stdafx.h"
#include <windows.h>
#include <mosquitto.h>

void on_connect(struct mosquitto *mosq, void *userdata, int result)
{
	fprintf(stderr, "on_connect callback!\n");
}

int main()
{
	char *host = (char *)"localhost";
	int port = 1883;
	int keepalive = 60;
	bool clean_session = true;
	int imid = 1;
	char	id[5] = "1212";

	struct mosquitto*	mosq;

	mosquitto_lib_init();
	mosq = mosquitto_new(NULL, clean_session, NULL);
	if (!mosq) {
		fprintf(stderr, "Error: Out of memory.\n");
	}

	int ii = mosquitto_max_inflight_messages_set(mosq, 0);

	mosquitto_connect_callback_set(mosq, on_connect);

	int major, minor, revision;

	mosquitto_lib_version(&major, &minor, &revision);

	mosquitto_connect_callback_set(mosq, on_connect);

	if (mosquitto_connect(mosq, host, port, keepalive)) {
		fprintf(stderr, "Unable to connect.\n");
	}


	char	cPayload[4];
	for (int iCnt = 0; iCnt <= 500; iCnt++) {
		sprintf_s(cPayload, sizeof(cPayload), "%0d", iCnt);
		Sleep(1000);
		mosquitto_publish(mosq, &imid, "sndata", sizeof(cPayload), cPayload, 1, false);
	}

	mosquitto_destroy(mosq);
	mosquitto_lib_cleanup();

	return 0;
}
@ralight

This comment has been minimized.

Copy link
Contributor

@ralight ralight commented Sep 24, 2019

You're missing a mosquitto_loop_start() call in the publishing client code, which means the network loop doesn't run. That means the client doesn't think it has received any PUBACK messages from the server, and so stops when it hits the limit of 20 inflight messages. It looks like you've found a bug, in that mosquitto_max_inflight_messages_set() isn't working properly.

@karlp

This comment has been minimized.

Copy link
Contributor

@karlp karlp commented Sep 24, 2019

I don't think anything is wrong with max in flight set(), they only call that in their subscriber call, and it's the publisher that's missing a loop_start/loop_forever that is simply unable to get any more messages out before it exits. (missed it in the publisher, yes looks valid problem)

@ralight

This comment has been minimized.

Copy link
Contributor

@ralight ralight commented Sep 24, 2019

	int ii = mosquitto_max_inflight_messages_set(mosq, 0);

It's there in the pub code as well.

@ghaqua

This comment has been minimized.

Copy link
Author

@ghaqua ghaqua commented Sep 26, 2019

Thank you for the release and correction of 1.6.7.
Confirmation to set max_inflight_messages 0 with QoS=1 setting and publish more than 20 messages. However, after the client receives PUBREC when QoS=2 is made, it stops action and the message cannot be sent.

@ralight

This comment has been minimized.

Copy link
Contributor

@ralight ralight commented Sep 26, 2019

Have you fixed your code to have one of mosquitto_loop_start() or mosquitto_loop_forever()? You can confirm whether the problem is with your code by using mosquitto_sub to receive the messages. If that fails, it's a problem with the mosquitto code somewhere. If it works, I'm afraid it's your end.

@ghaqua

This comment has been minimized.

Copy link
Author

@ghaqua ghaqua commented Sep 27, 2019

Yes, I am using mosauitto _ loop _ forever (). As shown in the above sample program. I checked with mosquitto _ sub, but I still can't receive messages when QoS = 2.

@ralight

This comment has been minimized.

Copy link
Contributor

@ralight ralight commented Sep 27, 2019

Sorry, I got the programs the wrong way round. I meant that your publishing code above doesn't have a loop call in it, so it will not process messages.

@ghaqua

This comment has been minimized.

Copy link
Author

@ghaqua ghaqua commented Sep 30, 2019

If QoS = 2 is specified in mosquitto _ sub and mosquitto _ pub, no message is sent.

mosquitto_sub -h localhost -t sndata -q 2
mosquitto_pub -h localhost -t sndata -q 2 -m ABCD

Sorry, the mosquitto _ sub and mosquitto _ pub commands worked fine.

@ralight

This comment has been minimized.

Copy link
Contributor

@ralight ralight commented Oct 1, 2019

Good to hear. So now what happens if you call mosquitto_loop_start() after mosquitto_connect() in your publishing client?

@ghaqua

This comment has been minimized.

Copy link
Author

@ghaqua ghaqua commented Oct 2, 2019

I don't know if I'm using it correctly, but calling mosquitto _ loop () after mosquitto _ connect () and calling mosquitto _ loop () after mosquitto _ publish () produced the expected results.

@ralight

This comment has been minimized.

Copy link
Contributor

@ralight ralight commented Oct 2, 2019

If your application does not want to do anything other than process incoming MQTT messages, then the recommendation is to use mosquitto_loop_forever().

If your application does need to do anything other than process incoming messages (for example you are publishing messages on a fixed interval), then the recommendation is to use mosquitto_loop_start() after calling mosquitto_connect(). That's it, nothing more is needed.

If you have other requirements, so as not wanting to use the extra thread created in mosquitto_loop_start(), then you may use mosquitto_loop(), but it must be called frequently. You should only do this if you are sure of what you are doing. You will need to implement reconnection logic, this is already covered for you by mosquitto_loop_forever() and mosquitto_loop_start().

In short - use mosquitto_loop_start() for your publisher unless you have a compelling reason not to.

@ghaqua

This comment has been minimized.

Copy link
Author

@ghaqua ghaqua commented Oct 3, 2019

I understand.

However, calling mosquitto _ loop _ start () returned MOSQ _ ERR _ NOT _ SUPPORTED and was not available.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.