Skip to content

Commit 97503cc

Browse files
committed
client: Add support for user events
Also supporting configurable queue size for the internal event loop. Closes #230
1 parent 9186e5f commit 97503cc

File tree

4 files changed

+77
-4
lines changed

4 files changed

+77
-4
lines changed

include/mqtt_client.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ typedef enum esp_mqtt_event_id_t {
8282
- Additional context: msg_id (id of the deleted
8383
message).
8484
*/
85+
MQTT_USER_EVENT, /*!< Custom event used to queue tasks into mqtt event handler
86+
All fields from the esp_mqtt_event_t type could be used to pass
87+
an additional context data to the handler.
88+
*/
8589
} esp_mqtt_event_id_t;
8690

8791
/**
@@ -569,6 +573,16 @@ esp_err_t esp_mqtt_client_unregister_event(esp_mqtt_client_handle_t client, esp_
569573
*/
570574
int esp_mqtt_client_get_outbox_size(esp_mqtt_client_handle_t client);
571575

576+
/**
577+
* @brief Dispatch user event to the mqtt internal event loop
578+
*
579+
* @param client *MQTT* client handle
580+
* @param event *MQTT* event handle structure
581+
* @return ESP_OK on success
582+
* ESP_ERR_TIMEOUT if the event couldn't be queued (ref also CONFIG_MQTT_EVENT_QUEUE_SIZE)
583+
*/
584+
esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mqtt_event_t *event);
585+
572586
#ifdef __cplusplus
573587
}
574588
#endif //__cplusplus

lib/include/mqtt_client_priv.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <stdio.h>
1111
#include <stdlib.h>
12+
#include <stdatomic.h>
1213
#include "esp_err.h"
1314
#include "platform.h"
1415

@@ -123,6 +124,9 @@ struct esp_mqtt_client {
123124
EventGroupHandle_t status_bits;
124125
SemaphoreHandle_t api_lock;
125126
TaskHandle_t task_handle;
127+
#if MQTT_EVENT_QUEUE_SIZE > 1
128+
atomic_int queued_events;
129+
#endif
126130
};
127131

128132
bool esp_mqtt_set_if_config(char const *const new_config, char **old_config);

lib/include/mqtt_config.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@
9696
#define MQTT_ENABLE_WS CONFIG_MQTT_TRANSPORT_WEBSOCKET
9797
#define MQTT_ENABLE_WSS CONFIG_MQTT_TRANSPORT_WEBSOCKET_SECURE
9898

99+
#ifdef CONFIG_MQTT_EVENT_QUEUE_SIZE
100+
#define MQTT_EVENT_QUEUE_SIZE CONFIG_MQTT_EVENT_QUEUE_SIZE
101+
#else
102+
#define MQTT_EVENT_QUEUE_SIZE 10
103+
#endif
99104

100105
#define OUTBOX_MAX_SIZE (4*1024)
101106
#endif

mqtt_client.c

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,14 @@ static bool create_client_data(esp_mqtt_client_handle_t client)
765765

766766
esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config)
767767
{
768-
esp_mqtt_client_handle_t client = calloc(1, sizeof(struct esp_mqtt_client));
768+
esp_mqtt_client_handle_t client = heap_caps_calloc(1, sizeof(struct esp_mqtt_client),
769+
#if MQTT_EVENT_QUEUE_SIZE > 1
770+
// if supporting multiple queued events, we keep track of them
771+
// using atomic variable, so need to make sure it won't get allocated in PSRAM
772+
MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT);
773+
#else
774+
MALLOC_CAP_DEFAULT);
775+
#endif
769776
ESP_MEM_CHECK(TAG, client, return NULL);
770777
if (!create_client_data(client)) {
771778
goto _mqtt_init_failed;
@@ -776,10 +783,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
776783
}
777784
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
778785
esp_event_loop_args_t no_task_loop = {
779-
.queue_size = 1,
786+
.queue_size = MQTT_EVENT_QUEUE_SIZE,
780787
.task_name = NULL,
781788
};
782789
esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle);
790+
#if MQTT_EVENT_QUEUE_SIZE > 1
791+
atomic_init(&client->queued_events, 0);
792+
#endif
783793
#endif
784794

785795
client->keepalive_tick = platform_tick_get_ms();
@@ -939,6 +949,17 @@ static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t cli
939949
return esp_mqtt_dispatch_event(client);
940950
}
941951

952+
esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mqtt_event_t *event)
953+
{
954+
esp_err_t ret = esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, MQTT_USER_EVENT, event, sizeof(*event), 0);
955+
#if MQTT_EVENT_QUEUE_SIZE > 1
956+
if (ret == ESP_OK) {
957+
atomic_fetch_add(&client->queued_events, 1);
958+
}
959+
#endif
960+
return ret;
961+
}
962+
942963
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
943964
{
944965
client->event.client = client;
@@ -1447,6 +1468,34 @@ static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
14471468
}
14481469
}
14491470

1471+
/**
1472+
* @brief When using multiple queued item, we'd like to reduce the poll timeout to proceed with event loop exacution
1473+
*/
1474+
static inline int max_poll_timeout(esp_mqtt_client_handle_t client, int max_timeout)
1475+
{
1476+
return
1477+
#if MQTT_EVENT_QUEUE_SIZE > 1
1478+
atomic_load(&client->queued_events) > 0 ? 10: max_timeout;
1479+
#else
1480+
max_timeout;
1481+
#endif
1482+
}
1483+
1484+
static inline void run_event_loop(esp_mqtt_client_handle_t client)
1485+
{
1486+
#if MQTT_EVENT_QUEUE_SIZE > 1
1487+
if (atomic_load(&client->queued_events) > 0) {
1488+
atomic_fetch_sub(&client->queued_events, 1);
1489+
#else
1490+
{
1491+
#endif
1492+
esp_err_t ret = esp_event_loop_run(client->config->event_loop_handle, 0);
1493+
if (ret != ESP_OK) {
1494+
ESP_LOGE(TAG, "Error in running event_loop %d", ret);
1495+
}
1496+
}
1497+
}
1498+
14501499
static void esp_mqtt_task(void *pv)
14511500
{
14521501
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
@@ -1470,6 +1519,7 @@ static void esp_mqtt_task(void *pv)
14701519
xEventGroupClearBits(client->status_bits, STOPPED_BIT);
14711520
while (client->run) {
14721521
MQTT_API_LOCK(client);
1522+
run_event_loop(client);
14731523
switch (client->state) {
14741524
case MQTT_STATE_DISCONNECTED:
14751525
break;
@@ -1571,7 +1621,7 @@ static void esp_mqtt_task(void *pv)
15711621
}
15721622
MQTT_API_UNLOCK(client);
15731623
xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true,
1574-
client->wait_timeout_ms / 2 / portTICK_PERIOD_MS);
1624+
max_poll_timeout(client, client->wait_timeout_ms / 2 / portTICK_PERIOD_MS));
15751625
// continue the while loop instead of break, as the mutex is unlocked
15761626
continue;
15771627
default:
@@ -1580,7 +1630,7 @@ static void esp_mqtt_task(void *pv)
15801630
}
15811631
MQTT_API_UNLOCK(client);
15821632
if (MQTT_STATE_CONNECTED == client->state) {
1583-
if (esp_transport_poll_read(client->transport, MQTT_POLL_READ_TIMEOUT_MS) < 0) {
1633+
if (esp_transport_poll_read(client->transport, max_poll_timeout(client, MQTT_POLL_READ_TIMEOUT_MS)) < 0) {
15841634
ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno);
15851635
esp_mqtt_abort_connection(client);
15861636
}

0 commit comments

Comments
 (0)