Skip to content
This repository has been archived by the owner on Dec 26, 2022. It is now read-only.

Commit

Permalink
feat(MQTT): Implement request handler
Browse files Browse the repository at this point in the history
The request handler would parse the requests and
send responses to the device's exclusive topic `topic_root/<API>/<device_id>`.

An example request which can be sent by a mosquitto publishing client is
```
mosquitto_pub -h <Broker IP> -t root/topics/generate_address -m "{\"device_id\":\"cb692268436743c9bbe17c1721741db1\"}"

```
  • Loading branch information
howjmay committed Aug 28, 2019
1 parent 00edce5 commit e5ab76d
Show file tree
Hide file tree
Showing 16 changed files with 324 additions and 25 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ Use the `--host_force_python=PY2` parameter to force the Bazel to use the Python
$ make && bazel run //accelerator:push_docker --host_force_python=PY2
```

### Optional: Enable MQTT connectivity
MQTT connectivity is an optional feature allowing IoT endpoint devices to collaborate with `Tangle-Accelerator`.

```
make && bazel run //accelerator_mqtt
```

Note you may need to set up the `MQTT_HOST` and `TOPIC_ROOT` in `config.h` to connect to a MQTT broker.
For more information for MQTT connectivity of `tangle-accelerator`, you could read `connectivity/mqtt/usage.md`.

## Developing

The codebase of this repository follows [Google's C++ guidelines](https://google.github.io/styleguide/cppguide.html):
Expand Down
10 changes: 9 additions & 1 deletion accelerator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,23 @@ cc_binary(

cc_binary(
name = "accelerator_mqtt",
srcs = ["mqtt_interface.c"],
srcs = [
"config.c",
"config.h",
"conn_mqtt.c",
],
copts = [
"-DLOGGER_ENABLE",
"-DENABLE_MQTT",
],
deps = [
":message",
":ta_config",
":ta_errors",
"//connectivity/mqtt:mqtt_utils",
"//utils:cache",
"//utils:pow",
"@entangled//cclient/api",
"@entangled//utils/handles:signal",
],
)
Expand Down
5 changes: 4 additions & 1 deletion accelerator/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ status_t ta_config_default_init(ta_config_t* const info, iota_config_t* const ic
info->host = TA_HOST;
info->port = TA_PORT;
info->thread_count = TA_THREAD_COUNT;

#ifdef ENABLE_MQTT
info->mqtt_host = MQTT_HOST;
info->mqtt_topic_root = TOPIC_ROOT;
#endif
log_info(config_logger_id, "Initializing Redis information\n");
cache->host = REDIS_HOST;
cache->port = REDIS_PORT;
Expand Down
6 changes: 6 additions & 0 deletions accelerator/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ extern "C" {

#define TA_VERSION "tangle-accelerator/0.5.0"
#define TA_HOST "localhost"

#ifdef ENABLE_MQTT
#define MQTT_HOST "localhost"
#define TOPIC_ROOT "root/topics"
#endif

#define TA_PORT "8000"
#define TA_THREAD_COUNT 10
#define IRI_HOST "localhost"
Expand All @@ -56,6 +58,10 @@ typedef struct ta_info_s {
char* host; /**< Binding address of tangle-accelerator */
char* port; /**< Binding port of tangle-accelerator */
uint8_t thread_count; /**< Thread count of tangle-accelerator instance */
#ifdef ENABLE_MQTT
char* mqtt_host; /**< Address of MQTT broker host */
char* mqtt_topic_root; /**< The topic root of MQTT topic */
#endif
} ta_config_t;

/** struct type of iota configuration */
Expand Down
30 changes: 26 additions & 4 deletions accelerator/mqtt_interface.c → accelerator/conn_mqtt.c
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include "accelerator/config.h"
#include "config.h"
#include "connectivity/mqtt/client_common.h"
#include "connectivity/mqtt/duplex_callback.h"
#include "connectivity/mqtt/duplex_utils.h"
#include "errors.h"

#define MQTT_INTERFACE_LOGGER "mqtt-interface"
#define CONN_MQTT_LOGGER "conn-mqtt"

ta_core_t ta_core;
static logger_id_t mqtt_logger_id;

int main(int argc, char *argv[]) {
Expand All @@ -18,7 +20,22 @@ int main(int argc, char *argv[]) {
return EXIT_FAILURE;
}

mqtt_logger_id = logger_helper_enable(MQTT_INTERFACE_LOGGER, LOGGER_DEBUG, true);
mqtt_logger_id = logger_helper_enable(CONN_MQTT_LOGGER, LOGGER_DEBUG, true);

// Initialize configurations with default value
if (ta_config_default_init(&ta_core.info, &ta_core.iconf, &ta_core.cache, &ta_core.service) != SC_OK) {
return EXIT_FAILURE;
}

// Initialize configurations with CLI value
if (ta_config_cli_init(&ta_core, argc, argv) != SC_OK) {
return EXIT_FAILURE;
}

if (ta_config_set(&ta_core.cache, &ta_core.service) != SC_OK) {
log_critical(mqtt_logger_id, "[%s:%d] Configure failed %s.\n", __func__, __LINE__, CONN_MQTT_LOGGER);
return EXIT_FAILURE;
}

if (verbose_mode) {
mqtt_utils_logger_init();
Expand All @@ -36,6 +53,7 @@ int main(int argc, char *argv[]) {
// if we want to opertate this program under multi-threading, see https://github.com/eclipse/mosquitto/issues/450
ret = duplex_config_init(&mosq, &cfg);
if (ret != SC_OK) {
log_critical(mqtt_logger_id, "[%s:%d:%d]\n", __func__, __LINE__, ret);
goto done;
}

Expand All @@ -47,18 +65,22 @@ int main(int argc, char *argv[]) {
// gossip_channel_set(&cfg, MQTT_HOST, "NB/test/room1", "NB/test/room2");

// Set the configures and message for testing
ret = gossip_api_channels_set(&cfg, MQTT_HOST, TOPIC_ROOT);
ret = gossip_api_channels_set(&cfg, ta_core.info.mqtt_host, ta_core.info.mqtt_topic_root);
if (ret != SC_OK) {
log_critical(mqtt_logger_id, "[%s:%d:%d]\n", __func__, __LINE__, ret);
goto done;
}

// Set cfg as `userdata` field of `mosq` which allows the callback functions to use `cfg`.
mosquitto_user_data_set(mosq, &cfg);

log_info(mqtt_logger_id, "Starting...\n");

// Start listening subscribing topics, once we received a message from the listening topics, we can send corresponding
// message.
// if we need to take the above task forever, just put it in a infinite loop.
do {
// TODO Use logger to log some important steps in processing requests.
log_info(mqtt_logger_id, "Listening new requests.\n");
ret = duplex_client_start(mosq, &cfg);
} while (!ret);

Expand Down
5 changes: 5 additions & 0 deletions connectivity/mqtt/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ cc_library(
"duplex_callback.h",
"duplex_utils.h",
],
copts = ["-DENABLE_MQTT"],
visibility = ["//visibility:public"],
deps = [
":mqtt_common",
"//accelerator:apis",
"//accelerator:common_core",
"//accelerator:ta_errors",
"@entangled//common/model:transaction",
],
)

Expand All @@ -27,6 +31,7 @@ cc_library(
"pub_utils.h",
"sub_utils.h",
],
copts = ["-DENABLE_MQTT"],
deps = [
"//accelerator:ta_config",
"//accelerator:ta_errors",
Expand Down
2 changes: 1 addition & 1 deletion connectivity/mqtt/client_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ extern "C" {
*/

#define ID_LEN 32
#define API_NUM 9
#define API_NUM 7

typedef enum client_type_s { client_pub, client_sub, client_duplex } client_type_t;

Expand Down
83 changes: 69 additions & 14 deletions connectivity/mqtt/duplex_callback.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

#include "duplex_callback.h"
#include <stdlib.h>
#include "utils/logger_helper.h"
#include <string.h>

#define MQTT_CALLBACK_LOGGER "mqtt-callback"
static logger_id_t mqtt_callback_logger_id;

extern ta_core_t ta_core;

void mqtt_callback_logger_init() {
mqtt_callback_logger_id = logger_helper_enable(MQTT_CALLBACK_LOGGER, LOGGER_DEBUG, true);
}
Expand All @@ -28,18 +30,74 @@ int mqtt_callback_logger_release() {
return 0;
}

static status_t mqtt_request_handler(mosq_config_t *cfg, char *sub_topic, char *req) {
static status_t mqtt_request_handler(mosq_config_t *cfg, char *subscribe_topic, char *req) {
if (cfg == NULL || subscribe_topic == NULL || req == NULL) {
return SC_MQTT_NULL;
}

status_t ret = SC_OK;
// TODO: process MQTT requests here. Deserialize the request and process it against corresponding api_* functions
char *json_result = NULL;
char device_id[ID_LEN];

// get the Device ID.
ret = mqtt_device_id_deserialize(req, device_id);
if (ret != SC_OK) {
log_error(mqtt_callback_logger_id, "[%s:%d:%d]\n", __func__, __LINE__, ret);
goto done;
}

// after finishing processing the request, set the response message with the following functions
char *api_sub_topic = subscribe_topic + strlen(ta_core.info.mqtt_topic_root);
char *p;
if (!strncmp(api_sub_topic, "address", 7)) {
ret = api_generate_address(&ta_core.iconf, &ta_core.service, &json_result);
} else if (p = strstr(api_sub_topic, "tag")) {
if (!strncmp(p + 4, "hashes", 6)) {
char tag[NUM_TRYTES_TAG + 1];
mqtt_tag_req_deserialize(req, tag);
ret = api_find_transactions_by_tag(&ta_core.service, tag, &json_result);
} else if (!strncmp(p + 4, "object", 6)) {
char tag[NUM_TRYTES_TAG + 1];
mqtt_tag_req_deserialize(req, tag);
ret = api_find_transactions_obj_by_tag(&ta_core.service, tag, &json_result);
}
} else if (p = strstr(api_sub_topic, "transaction")) {
if (!strncmp(p + 12, "object", 6)) {
char hash[NUM_TRYTES_HASH + 1];
mqtt_transaction_hash_req_deserialize(req, hash);
ret = api_find_transaction_object_single(&ta_core.service, hash, &json_result);
} else if (!strncmp(p + 12, "send", 4)) {
ret = api_send_transfer(&ta_core.iconf, &ta_core.service, req, &json_result);
}
} else if (p = strstr(api_sub_topic, "tips")) {
if (!strncmp(p + 5, "all", 3)) {
ret = api_get_tips(&ta_core.service, &json_result);
} else if (!strncmp(p + 5, "pair", 4)) {
ret = api_get_tips_pair(&ta_core.iconf, &ta_core.service, &json_result);
}
}
if (ret != SC_OK) {
log_error(mqtt_callback_logger_id, "[%s:%d:%d]\n", __func__, __LINE__, ret);
goto done;
}

// 1. Set response publishing topic with the topic we got message and the Device ID (client ID) we got in the message/
// gossip_channel_set()
// Set response publishing topic with the topic we got message and the Device ID (client ID) we got in the message
int res_topic_len = strlen(subscribe_topic) + 1 + ID_LEN + 1;
char *res_topic = (char *)malloc(res_topic_len);
snprintf(res_topic, res_topic_len, "%s/%s", subscribe_topic, device_id);
ret = gossip_channel_set(cfg, NULL, NULL, res_topic);
if (ret != SC_OK) {
log_error(mqtt_callback_logger_id, "[%s:%d:%d]\n", __func__, __LINE__, ret);
goto done;
}

// 2. Set recv_message as publishing message
// gossip_message_set(cfg, cfg->sub_config->recv_message);
// Set recv_message as publishing message
ret = gossip_message_set(cfg, json_result);
if (ret != SC_OK) {
log_error(mqtt_callback_logger_id, "[%s:%d:%d]\n", __func__, __LINE__, ret);
}

done:
free(json_result);
return ret;
}

Expand All @@ -57,11 +115,6 @@ static void message_callback_duplex_func(struct mosquitto *mosq, void *obj, cons
// Process received requests
mqtt_request_handler(cfg, message->topic, cfg->sub_config->recv_message);

// After running `message_callback_duplex_func()`, the message will be sent by the `publish_loop()` in
// `duplex_client_start()`
// printf("listening: %s \n", message->topic);
// printf("recv msg: %s \n", cfg->sub_config->recv_message);

// The following one line is used for testing if this server work fine with requests with given topics.
// Uncomment it if it is necessitated
// gossip_message_set(cfg, cfg->sub_config->recv_message);
Expand All @@ -88,7 +141,7 @@ static void subscribe_callback_duplex_func(struct mosquitto *mosq, void *obj, in
}

static void log_callback_duplex_func(struct mosquitto *mosq, void *obj, int level, const char *str) {
log_error(mqtt_callback_logger_id, "log: %s\n", str);
log_info(mqtt_callback_logger_id, "log: %s\n", str);
}

status_t duplex_callback_func_set(struct mosquitto *mosq) {
Expand All @@ -103,4 +156,6 @@ status_t duplex_callback_func_set(struct mosquitto *mosq) {
mosquitto_disconnect_v5_callback_set(mosq, disconnect_callback_duplex_func);
mosquitto_publish_v5_callback_set(mosq, publish_callback_duplex_func);
mosquitto_message_v5_callback_set(mosq, message_callback_duplex_func);

return SC_OK;
}
5 changes: 5 additions & 0 deletions connectivity/mqtt/duplex_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
#ifndef DUPLEX_CALLBACK_H
#define DUPLEX_CALLBACK_H

#include "accelerator/apis.h"
#include "accelerator/common_core.h"
#include "common/model/transaction.h"
#include "duplex_utils.h"
#include "serializer/serializer.h"
#include "utils/logger_helper.h"

#ifdef __cplusplus
extern "C" {
Expand Down
5 changes: 2 additions & 3 deletions connectivity/mqtt/duplex_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,8 @@ status_t gossip_api_channels_set(mosq_config_t *channel_cfg, char *host, char *r
char *sub_topic = NULL;
int sub_topic_len, api_name_len;
int root_path_len = strlen(root_path);
char *api_names[API_NUM] = {"generate_address", "get_tips_pair", "get_tips", "receive_mam_message",
"mam_send_message", "send_transfer", "find_transactions", "find_transaction_objects",
"send_trytes"};
char *api_names[API_NUM] = {"address", "tag/hashes", "tag/object", "transaction/object",
"transaction/send", "tips/all", "tips/pair"};

for (int i = 0; i < API_NUM; i++) {
api_name_len = strlen(api_names[i]);
Expand Down
2 changes: 1 addition & 1 deletion connectivity/mqtt/sub_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void subscribe_callback_sub_func(struct mosquitto *mosq, void *obj, int mid, int
snprintf(qos_digit, 4, ", %d", granted_qos[i]);
strcat(qos_str, qos_digit);
}
log_error(mqtt_sub_logger_id, "Subscribed (mid: %d): %d%s\n", mid, granted_qos[0], qos_str);
log_info(mqtt_sub_logger_id, "Subscribed (mid: %d): %d%s\n", mid, granted_qos[0], qos_str);

free(qos_str);
}
49 changes: 49 additions & 0 deletions connectivity/mqtt/usage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
## General Usage on MQTT Protocol support
The format of MQTT request is the same as http request, but MQTT request has one more field is `Device ID`.

## Topic vs API table

| topic | API | HTTP Method |
| ------------- |:-------------: | -------------:|
| address | api_generate_address | GET |
| tag/hashes | api_find_transactions_by_tag | GET |
| tag/object | api_find_transactions_obj_by_tag | GET |
| transaction/object | api_find_transaction_object_single | GET |
| transaction/send | api_send_transfer | POST |
| tips/all | api_get_tips | GET |
| tips/pair | api_get_tips_pair | GET |

## API request format
APIs in POST method have almost the same format as MQTT requests have, there are one more field, `device_id` in MQTT requests.
However, APIs in GET method would in a more different format, so the following are the examples of the requests of these APIs.

### api_generate_address
```
{"device_id":"<device_id>"}
```
### api_find_transactions_by_tag
```
{"device_id":"<device_id>", "tag":"<tag>"}
```
### api_find_transactions_obj_by_tag
```
{"device_id":"<device_id>", "tag":"<tag>"}
```
### api_find_transaction_object_single
```
{"device_id":"<device_id>", "hash":"<transaction hash>"}
```
### api_get_tips
```
{"device_id":"<device_id>"}
```
### api_get_tips_pair
```
{"device_id":"<device_id>"}
```

## Examples
Here is an example which uses mosquitto client to publish requests.
```
mosquitto_pub -h <Broker IP> -t root/topics/tips/all -m "{\"device_id\":\"<device_id>\"}"
```
Loading

0 comments on commit e5ab76d

Please sign in to comment.