Skip to content

Commit

Permalink
feat(mqtt): Optimize mqtt transport list and remove unused transport
Browse files Browse the repository at this point in the history
  • Loading branch information
ESP-YJM authored and david-cermak committed May 23, 2022
1 parent 684843a commit 647e0ef
Showing 1 changed file with 87 additions and 57 deletions.
144 changes: 87 additions & 57 deletions mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ static const char *TAG = "mqtt_client";
ESP_EVENT_DEFINE_BASE(MQTT_EVENTS);
#endif

#define MQTT_OVER_TCP_SCHEME "mqtt"
#define MQTT_OVER_SSL_SCHEME "mqtts"
#define MQTT_OVER_WS_SCHEME "ws"
#define MQTT_OVER_WSS_SCHEME "wss"

typedef struct mqtt_state {
uint8_t *in_buffer;
uint8_t *out_buffer;
Expand Down Expand Up @@ -201,7 +206,7 @@ static esp_err_t esp_mqtt_set_cert_key_data(esp_transport_handle_t ssl, enum esp

static esp_err_t esp_mqtt_set_ssl_transport_properties(esp_transport_list_handle_t transport_list, mqtt_config_storage_t *cfg)
{
esp_transport_handle_t ssl = esp_transport_list_get_transport(transport_list, "mqtts");
esp_transport_handle_t ssl = esp_transport_list_get_transport(transport_list, MQTT_OVER_SSL_SCHEME);

if (cfg->use_global_ca_store == true) {
esp_transport_ssl_enable_global_ca_store(ssl);
Expand Down Expand Up @@ -321,7 +326,7 @@ static esp_err_t esp_mqtt_check_cfg_conflict(const mqtt_config_storage_t *cfg, c
bool ssl_cfg_enabled = cfg->use_global_ca_store || cfg->cacert_buf || cfg->clientcert_buf || cfg->psk_hint_key || cfg->alpn_protos;
bool is_ssl_scheme = false;
if (cfg->scheme) {
is_ssl_scheme = (strcasecmp(cfg->scheme, "mqtts") == 0) || (strcasecmp(cfg->scheme, "wss") == 0);
is_ssl_scheme = (strcasecmp(cfg->scheme, MQTT_OVER_SSL_SCHEME) == 0) || (strcasecmp(cfg->scheme, MQTT_OVER_WSS_SCHEME) == 0);
}

if (!is_ssl_scheme && ssl_cfg_enabled) {
Expand Down Expand Up @@ -353,6 +358,77 @@ static bool set_if_config(char const *const new_config, char **old_config)
return true;
}

static esp_err_t esp_mqtt_client_create_transport(esp_mqtt_client_handle_t client)
{
esp_err_t ret = ESP_OK;
if (client->transport_list) {
esp_transport_list_destroy(client->transport_list);
client->transport_list = NULL;
}
if (client->config->scheme) {
client->transport_list = esp_transport_list_init();
ESP_MEM_CHECK(TAG, client->transport_list, return ESP_ERR_NO_MEM);

if ((strcasecmp(client->config->scheme, MQTT_OVER_TCP_SCHEME) == 0) || (strcasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME) == 0)) {
esp_transport_handle_t tcp = esp_transport_tcp_init();
ESP_MEM_CHECK(TAG, tcp, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(tcp, MQTT_TCP_DEFAULT_PORT);
esp_transport_list_add(client->transport_list, tcp, MQTT_OVER_TCP_SCHEME);
if (strcasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME) == 0) {
#if MQTT_ENABLE_WS
esp_transport_handle_t ws = esp_transport_ws_init(tcp);
ESP_MEM_CHECK(TAG, ws, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(ws, MQTT_WS_DEFAULT_PORT);
if (client->config->path) {
esp_transport_ws_set_path(ws, client->config->path);
}
#ifdef MQTT_SUPPORTED_FEATURE_WS_SUBPROTOCOL
esp_transport_ws_set_subprotocol(ws, MQTT_OVER_TCP_SCHEME);
#endif
esp_transport_list_add(client->transport_list, ws, MQTT_OVER_WS_SCHEME);
#else
ESP_LOGE(TAG, "Please enable MQTT_ENABLE_WS to use %s", client->config->scheme);
ret = ESP_FAIL;
#endif
}
} else if ((strcasecmp(client->config->scheme, MQTT_OVER_SSL_SCHEME) == 0) || (strcasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME) == 0)) {
#if MQTT_ENABLE_SSL
esp_transport_handle_t ssl = esp_transport_ssl_init();
ESP_MEM_CHECK(TAG, ssl, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(ssl, MQTT_SSL_DEFAULT_PORT);
esp_transport_list_add(client->transport_list, ssl, MQTT_OVER_SSL_SCHEME);
if (strcasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME) == 0) {
#if MQTT_ENABLE_WS
esp_transport_handle_t wss = esp_transport_ws_init(ssl);
ESP_MEM_CHECK(TAG, wss, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(wss, MQTT_WSS_DEFAULT_PORT);
if (client->config->path) {
esp_transport_ws_set_path(wss, client->config->path);
}
#ifdef MQTT_SUPPORTED_FEATURE_WS_SUBPROTOCOL
esp_transport_ws_set_subprotocol(wss, MQTT_OVER_TCP_SCHEME);
#endif
esp_transport_list_add(client->transport_list, wss, MQTT_OVER_WSS_SCHEME);
#else
ESP_LOGE(TAG, "Please enable MQTT_ENABLE_WS to use %s", client->config->scheme);
ret = ESP_FAIL;
#endif
}
#else
ESP_LOGE(TAG, "Please enable MQTT_ENABLE_SSL to use %s", client->config->scheme);
ret = ESP_FAIL;
#endif
} else {
ESP_LOGE(TAG, "Not support this mqtt scheme %s", client->config->scheme);
ret = ESP_FAIL;
}
} else {
ESP_LOGE(TAG, "No scheme found");
ret = ESP_FAIL;
}
return ret;
}

esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
{
if (!client) {
Expand Down Expand Up @@ -531,24 +607,24 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
if (config->transport) {
free(client->config->scheme);
if (config->transport == MQTT_TRANSPORT_OVER_TCP) {
client->config->scheme = create_string("mqtt", 4);
client->config->scheme = create_string(MQTT_OVER_TCP_SCHEME, strlen(MQTT_OVER_TCP_SCHEME));
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}
#if MQTT_ENABLE_WS
else if (config->transport == MQTT_TRANSPORT_OVER_WS) {
client->config->scheme = create_string("ws", 2);
client->config->scheme = create_string(MQTT_OVER_WS_SCHEME, strlen(MQTT_OVER_WS_SCHEME));
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}
#endif
#if MQTT_ENABLE_SSL
else if (config->transport == MQTT_TRANSPORT_OVER_SSL) {
client->config->scheme = create_string("mqtts", 5);
client->config->scheme = create_string(MQTT_OVER_SSL_SCHEME, strlen(MQTT_OVER_SSL_SCHEME));
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}
#endif
#if MQTT_ENABLE_WSS
else if (config->transport == MQTT_TRANSPORT_OVER_WSS) {
client->config->scheme = create_string("wss", 3);
client->config->scheme = create_string(MQTT_OVER_WSS_SCHEME, strlen(MQTT_OVER_WSS_SCHEME));
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}
#endif
Expand Down Expand Up @@ -749,42 +825,6 @@ static bool create_client_data(esp_mqtt_client_handle_t client)
client->api_lock = xSemaphoreCreateRecursiveMutex();
ESP_MEM_CHECK(TAG, client->api_lock, return false);

client->transport_list = esp_transport_list_init();
ESP_MEM_CHECK(TAG, client->transport_list, return false);

esp_transport_handle_t tcp = esp_transport_tcp_init();
ESP_MEM_CHECK(TAG, tcp, return false);
esp_transport_set_default_port(tcp, MQTT_TCP_DEFAULT_PORT);
esp_transport_list_add(client->transport_list, tcp, "mqtt");

#if MQTT_ENABLE_WS
esp_transport_handle_t ws = esp_transport_ws_init(tcp);
ESP_MEM_CHECK(TAG, ws, return false);
esp_transport_set_default_port(ws, MQTT_WS_DEFAULT_PORT);
#ifdef MQTT_SUPPORTED_FEATURE_WS_SUBPROTOCOL
esp_transport_ws_set_subprotocol(ws, "mqtt");
#endif
esp_transport_list_add(client->transport_list, ws, "ws");
#endif

#if MQTT_ENABLE_SSL
esp_transport_handle_t ssl = esp_transport_ssl_init();
ESP_MEM_CHECK(TAG, ssl, return false);
esp_transport_set_default_port(ssl, MQTT_SSL_DEFAULT_PORT);

esp_transport_list_add(client->transport_list, ssl, "mqtts");
#endif

#if MQTT_ENABLE_WSS
esp_transport_handle_t wss = esp_transport_ws_init(ssl);
ESP_MEM_CHECK(TAG, wss, return false);
#ifdef MQTT_SUPPORTED_FEATURE_WS_SUBPROTOCOL
esp_transport_ws_set_subprotocol(wss, "mqtt");
#endif
esp_transport_set_default_port(wss, MQTT_WSS_DEFAULT_PORT);
esp_transport_list_add(client->transport_list, wss, "wss");
#endif
ESP_MEM_CHECK(TAG, client->transport_list, return false);
return true;
}

Expand Down Expand Up @@ -915,21 +955,6 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
});
}

if (client->config->path) {
#if MQTT_ENABLE_WS
esp_transport_handle_t ws_trans = esp_transport_list_get_transport(client->transport_list, "ws");
if (ws_trans) {
esp_transport_ws_set_path(ws_trans, client->config->path);
}
#endif
#if MQTT_ENABLE_WSS
esp_transport_handle_t wss_trans = esp_transport_list_get_transport(client->transport_list, "wss");
if (wss_trans) {
esp_transport_ws_set_path(wss_trans, client->config->path);
}
#endif
}

if (puri.field_data[UF_PORT].len) {
client->config->port = strtol((const char *)(uri + puri.field_data[UF_PORT].off), NULL, 10);
}
Expand Down Expand Up @@ -1579,6 +1604,11 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}
if (esp_mqtt_client_create_transport(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to create transport list");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}
esp_err_t err = ESP_OK;
#if MQTT_CORE_SELECTION_ENABLED
ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE);
Expand Down

0 comments on commit 647e0ef

Please sign in to comment.