Skip to content
Permalink
Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
251 lines (209 sloc) 6.59 KB
/*******************************************************************************
* Copyright (c) 2012, 2018 IBM Corp., and others
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
* Ian Craggs - change delimiter option from char to string
* Guilherme Maciel Ferreira - add keep alive option
* Ian Craggs - add full capability
*******************************************************************************/
#include "MQTTClient.h"
#include "MQTTClientPersistence.h"
#include "pubsub_opts.h"
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <stdlib.h>
#if defined(WIN32)
#define sleep Sleep
#else
#include <sys/time.h>
#endif
volatile int toStop = 0;
struct pubsub_opts opts =
{
0, 0, 0, 0, "\n", 100, /* debug/app options */
NULL, NULL, 1, 0, 0, /* message options */
MQTTVERSION_DEFAULT, NULL, "paho-cs-sub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
NULL, NULL, 0, 0, /* will options */
0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
0, {NULL, NULL}, /* MQTT V5 options */
};
int myconnect(MQTTClient* client)
{
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer;
MQTTClient_willOptions will_opts = MQTTClient_willOptions_initializer;
int rc = 0;
if (opts.verbose)
printf("Connecting\n");
if (opts.MQTTVersion == MQTTVERSION_5)
{
MQTTClient_connectOptions conn_opts5 = MQTTClient_connectOptions_initializer5;
conn_opts = conn_opts5;
}
conn_opts.keepAliveInterval = opts.keepalive;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
conn_opts.MQTTVersion = opts.MQTTVersion;
if (opts.will_topic) /* will options */
{
will_opts.message = opts.will_payload;
will_opts.topicName = opts.will_topic;
will_opts.qos = opts.will_qos;
will_opts.retained = opts.will_retain;
conn_opts.will = &will_opts;
}
if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
strncmp(opts.connection, "wss://", 6) == 0))
{
if (opts.insecure)
ssl_opts.verify = 0;
ssl_opts.CApath = opts.capath;
ssl_opts.keyStore = opts.cert;
ssl_opts.trustStore = opts.cafile;
ssl_opts.privateKey = opts.key;
ssl_opts.privateKeyPassword = opts.keypass;
ssl_opts.enabledCipherSuites = opts.ciphers;
conn_opts.ssl = &ssl_opts;
}
if (opts.MQTTVersion == MQTTVERSION_5)
{
MQTTProperties props = MQTTProperties_initializer;
MQTTProperties willProps = MQTTProperties_initializer;
MQTTResponse response = MQTTResponse_initializer;
conn_opts.cleanstart = 1;
response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
rc = response.reasonCode;
MQTTResponse_free(response);
}
else
{
conn_opts.cleansession = 1;
rc = MQTTClient_connect(client, &conn_opts);
}
if (opts.verbose && rc == MQTTCLIENT_SUCCESS)
fprintf(stderr, "Connected\n");
else if (rc != MQTTCLIENT_SUCCESS && !opts.quiet)
fprintf(stderr, "Connect failed return code: %s\n", MQTTClient_strerror(rc));
return rc;
}
void cfinish(int sig)
{
signal(SIGINT, NULL);
toStop = 1;
}
void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
{
fprintf(stderr, "Trace : %d, %s\n", level, message);
}
int main(int argc, char** argv)
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
int rc = 0;
char* url;
const char* version = NULL;
#if !defined(WIN32)
struct sigaction sa;
#endif
const char* program_name = "paho_cs_sub";
MQTTClient_nameValue* infos = MQTTClient_getVersionInfo();
if (argc < 2)
usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
if (getopts(argc, argv, &opts) != 0)
usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
if (strchr(opts.topic, '#') || strchr(opts.topic, '+'))
opts.verbose = 1;
if (opts.connection)
url = opts.connection;
else
{
url = malloc(100);
sprintf(url, "%s:%s", opts.host, opts.port);
}
if (opts.verbose)
printf("URL is %s\n", url);
if (opts.tracelevel > 0)
{
MQTTClient_setTraceCallback(trace_callback);
MQTTClient_setTraceLevel(opts.tracelevel);
}
if (opts.MQTTVersion >= MQTTVERSION_5)
createOpts.MQTTVersion = MQTTVERSION_5;
rc = MQTTClient_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE,
NULL, &createOpts);
if (rc != MQTTCLIENT_SUCCESS)
{
if (!opts.quiet)
fprintf(stderr, "Failed to create client, return code: %s\n", MQTTClient_strerror(rc));
exit(EXIT_FAILURE);
}
#if defined(WIN32)
signal(SIGINT, cfinish);
signal(SIGTERM, cfinish);
#else
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_handler = cfinish;
sa.sa_flags = 0;
sigaction(SIGINT, &sa, NULL);
sigaction(SIGTERM, &sa, NULL);
#endif
if (myconnect(client) != MQTTCLIENT_SUCCESS)
goto exit;
if (opts.MQTTVersion >= MQTTVERSION_5)
{
MQTTResponse response = MQTTClient_subscribe5(client, opts.topic, opts.qos, NULL, NULL);
rc = response.reasonCode;
MQTTResponse_free(response);
}
else
rc = MQTTClient_subscribe(client, opts.topic, opts.qos);
if (rc != MQTTCLIENT_SUCCESS && rc != opts.qos)
{
if (!opts.quiet)
fprintf(stderr, "Error %d subscribing to topic %s\n", rc, opts.topic);
goto exit;
}
while (!toStop)
{
char* topicName = NULL;
int topicLen;
MQTTClient_message* message = NULL;
rc = MQTTClient_receive(client, &topicName, &topicLen, &message, 1000);
if (message)
{
size_t delimlen = 0;
if (opts.verbose)
printf("%s\t", topicName);
if (opts.delimiter)
delimlen = strlen(opts.delimiter);
if (opts.delimiter == NULL || (message->payloadlen > delimlen &&
strncmp(opts.delimiter, &((char*)message->payload)[message->payloadlen - delimlen], delimlen) == 0))
printf("%.*s", message->payloadlen, (char*)message->payload);
else
printf("%.*s%s", message->payloadlen, (char*)message->payload, opts.delimiter);
if (message->struct_version == 1 && opts.verbose)
logProperties(&message->properties);
fflush(stdout);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
}
if (rc != 0)
myconnect(&client);
}
exit:
MQTTClient_disconnect(client, 0);
MQTTClient_destroy(&client);
return EXIT_SUCCESS;
}
You can’t perform that action at this time.