diff --git a/examples/mqtt_client_example/.gitignore b/examples/mqtt_client_example/.gitignore index 92f3b79..00a92a3 100644 --- a/examples/mqtt_client_example/.gitignore +++ b/examples/mqtt_client_example/.gitignore @@ -1,4 +1,4 @@ -src/config.erl _build/** _checkouts/** +src/config.erl rebar.lock diff --git a/examples/mqtt_client_example/rebar.config b/examples/mqtt_client_example/rebar.config index 36bbcac..e15d233 100644 --- a/examples/mqtt_client_example/rebar.config +++ b/examples/mqtt_client_example/rebar.config @@ -1,5 +1,5 @@ {erl_opts, [debug_info]}. {deps, [ - {mqtt_client, {git, "https://github.com/atomvm/atomvm_mqtt_client.git", {branch, "master"}}} + {atomvm_mqtt_client, {git, "https://github.com/atomvm/atomvm_mqtt_client.git", {branch, "master"}}} ]}. {plugins, [atomvm_rebar3_plugin]}. diff --git a/examples/mqtt_client_example/src/mqtt_client_example.app.src b/examples/mqtt_client_example/src/mqtt_client_example.app.src index a7ccdec..eee0099 100644 --- a/examples/mqtt_client_example/src/mqtt_client_example.app.src +++ b/examples/mqtt_client_example/src/mqtt_client_example.app.src @@ -1,5 +1,5 @@ {application, mqtt_client_example, [ - {description, "An OTP library"}, + {description, "An AtomVM application"}, {vsn, "0.1.0"}, {registered, []}, {applications, [ diff --git a/examples/mqtt_client_example/src/mqtt_client_example.erl b/examples/mqtt_client_example/src/mqtt_client_example.erl index 8b7b4cb..08f26d6 100644 --- a/examples/mqtt_client_example/src/mqtt_client_example.erl +++ b/examples/mqtt_client_example/src/mqtt_client_example.erl @@ -1,5 +1,5 @@ %% -%% Copyright (c) 2021 dushin.net +%% Copyright (c) 2021-2023 dushin.net %% All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -33,16 +33,8 @@ start() -> {ok, _MQTT} = mqtt_client:start(Config), io:format("MQTT started.~n"), - loop_forever(). + timer:sleep(infinity). -loop_forever() -> - receive - halt -> halt - end. - -%% -%% connected callback. This function will be called -%% handle_connected(MQTT) -> Config = mqtt_client:get_config(MQTT), Topic = <<"atomvm/qos0">>, @@ -60,15 +52,10 @@ handle_subscribed(MQTT, Topic) -> handle_data(_MQTT, Topic, Data) -> io:format("Received data on topic ~p: ~p ~n", [Topic, Data]), - % io:format("Pending publishes: ~p~n", [mqtt_client:get_pending_publishes(MQTT)]), - % io:format("Pending subscriptions: ~p~n", [mqtt_client:get_pending_subscriptions(MQTT)]), - % io:format("Pending unsubscriptions: ~p~n", [mqtt_client:get_pending_unsubscriptions(MQTT)]), - io:format("process count: ~p~n", [erlang:system_info(process_count)]), - io:format("Free heap on handle_data: ~p~n", [erlang:system_info(esp32_free_heap_size)]), ok. start_network(StaConfig) -> - case network_fsm:wait_for_sta(StaConfig) of + case network:wait_for_sta(StaConfig) of {ok, {Address, Netmask, Gateway}} -> io:format( "Acquired IP address: ~s Netmask: ~s Gateway: ~s~n", @@ -81,8 +68,27 @@ start_network(StaConfig) -> publish_loop(MQTT, Topic, Seq) -> io:format("Publishing data on topic ~p~n", [Topic]), - _ = mqtt_client:publish(MQTT, Topic, list_to_binary("echo" ++ integer_to_list(Seq))), - timer:sleep(5000), - io:format("process count: ~p~n", [erlang:system_info(process_count)]), - io:format("Free heap after publish: ~p~n", [erlang:system_info(esp32_free_heap_size)]), + try + Self = self(), + HandlePublished = fun(MQTT2, Topic2, MsgId) -> + Self ! published, + handle_published(MQTT2, Topic2, MsgId) + end, + PublishOptions = #{qos => at_least_once, published_handler => HandlePublished}, + Msg = list_to_binary("echo" ++ integer_to_list(Seq)), + _ = mqtt_client:publish(MQTT, Topic, Msg, PublishOptions), + receive + published -> + ok + after 10000 -> + io:format("Timed out waiting for publish ack~n") + end + catch + C:E:S -> + io:format("Error in publish: ~p:~p~p~n", [C, E, S]) + end, + timer:sleep(1000), publish_loop(MQTT, Topic, Seq + 1). + +handle_published(MQTT, Topic, MsgId) -> + io:format("MQTT ~p published to topic ~p msg_id=~p~n", [MQTT, Topic, MsgId]). diff --git a/ports/atomvm_mqtt_client.c b/ports/atomvm_mqtt_client.c index ee27e09..481e4f2 100644 --- a/ports/atomvm_mqtt_client.c +++ b/ports/atomvm_mqtt_client.c @@ -1,5 +1,5 @@ // -// Copyright (c) 2021 dushin.net +// Copyright (c) 2021-2022 dushin.net // All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -33,114 +33,161 @@ #include #include +#include #include // #define ENABLE_TRACE #include - #define TAG "atomvm_mqtt" -static const char *const mqtt_atom = "\x4" "mqtt"; -static const char *const stop_atom = "\x4" "stop"; -static const char *const receiver_atom = "\x8" "receiver"; -static const char *const url = "\x3" "url"; -static const char *const connected_atom = "\x9" "connected"; -static const char *const disconnected_atom = "\xC" "disconnected"; -static const char *const disconnect_atom = "\xA" "disconnect"; -static const char *const reconnect_atom = "\x9" "reconnect"; -static const char *const published_atom = "\x9" "published"; -static const char *const subscribed_atom = "\xA" "subscribed"; -static const char *const unsubscribed_atom = "\xC" "unsubscribed"; -static const char *const data_atom = "\x4" "data"; -static const char *const publish_atom = "\x7" "publish"; -static const char *const subscribe_atom = "\x9" "subscribe"; -static const char *const unsubscribe_atom = "\xB" "unsubscribe"; +// static const char *const cert_atom = ATOM_STR("\x4", "cert"); +static const char *const connected_atom = ATOM_STR("\x9", "connected"); +static const char *const data_atom = ATOM_STR("\x4", "data"); +static const char *const disconnected_atom = ATOM_STR("\xC", "disconnected"); +static const char *const host_atom = ATOM_STR("\x4", "host"); +static const char *const mqtt_atom = ATOM_STR("\x4", "mqtt"); +static const char *const password_atom = ATOM_STR("\x8", "password"); +static const char *const port_atom = ATOM_STR("\x4", "port"); +static const char *const published_atom = ATOM_STR("\x9", "published"); +static const char *const receiver_atom = ATOM_STR("\x8", "receiver"); +static const char *const subscribed_atom = ATOM_STR("\xA", "subscribed"); +// static const char *const transport_atom = ATOM_STR("\x9", "transport"); +static const char *const unsubscribed_atom = ATOM_STR("\xC", "unsubscribed"); +static const char *const url_atom = ATOM_STR("\x3", "url"); +static const char *const username_atom = ATOM_STR("\x8", "username"); // error codes -static const char *const esp_tls_atom = "\x07" "esp_tls"; -static const char *const connection_refused_atom = "\x12" "connection_refused"; -static const char *const connection_accepted_atom = "\x13" "connection_accepted"; -static const char *const protocol_atom = "\x08" "protocol"; -static const char *const id_rejected_atom = "\x0B" "id_rejected"; -static const char *const server_unavailable_atom = "\x12" "server_unavailable"; -static const char *const bad_username_atom = "\x0C" "bad_username"; -static const char *const not_authorized_atom = "\x0E" "not_authorized"; -// 0123456789ABCDEF0123456789ABCDEF -// 0 1 - - -static void consume_mailbox(Context *ctx); -static term do_publish(Context *ctx, term topic, term data, term qos, term retain); -static term do_subscribe(Context *ctx, term topic, term qos); -static term do_unsubscribe(Context *ctx, term topic); -static void do_stop(Context *ctx); -static term do_disconnect(Context *ctx); -static term do_reconnect(Context *ctx); -static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event); -static term make_atom(GlobalContext *global, const char *string); -static term error_type_to_atom(Context *ctx, esp_mqtt_error_type_t error_type); -static term connect_return_code_to_atom(Context *ctx, esp_mqtt_connect_return_code_t connect_return_code); -static term create_tuple4(Context *ctx, term a, term b, term c, term d); -static term create_tuple5(Context *ctx, term a, term b, term c, term d, term e); +static const char *const bad_username_atom = ATOM_STR("\x0C", "bad_username"); +static const char *const connection_accepted_atom = ATOM_STR("\x13", "connection_accepted"); +static const char *const connection_refused_atom = ATOM_STR("\x12", "connection_refused"); +static const char *const esp_tls_atom = ATOM_STR("\x07", "esp_tls"); +static const char *const id_rejected_atom = ATOM_STR("\x0B", "id_rejected"); +static const char *const not_authorized_atom = ATOM_STR("\x0E", "not_authorized"); +static const char *const protocol_atom = ATOM_STR("\x08", "protocol"); +static const char *const server_unavailable_atom = ATOM_STR("\x12", "server_unavailable"); + +enum mqtt_cmd +{ + MQTTInvalidCmd = 0, + MQTTStopCmd, + MQTTDisconnectCmd, + MQTTReconnectCmd, + MQTTPublishCmd, + MQTTSubscribeCmd, + MQTTUnSubscribeCmd +}; + +static const AtomStringIntPair cmd_table[] = { + { ATOM_STR("\x4", "stop"), MQTTStopCmd }, + { ATOM_STR("\xA", "disconnect"), MQTTDisconnectCmd }, + { ATOM_STR("\x9", "reconnect"), MQTTReconnectCmd }, + { ATOM_STR("\x7", "publish"), MQTTPublishCmd }, + { ATOM_STR("\x9", "subscribe"), MQTTSubscribeCmd }, + { ATOM_STR("\xB", "unsubscribe"), MQTTUnSubscribeCmd }, + SELECT_INT_DEFAULT(MQTTInvalidCmd) +}; + +// TODO support configuration of MQTT transport + +// enum mqtt_transport +// { +// MQTTInvalidTransport = 0, +// MQTTMQTTTransport, +// MQTTMQTTSTransport, +// MQTTWSTransport, +// MQTTWSSTransport +// }; + +// static const AtomStringIntPair transport_table[] = { +// { ATOM_STR("\x4", "mqtt"), MQTTMQTTTransport }, +// { ATOM_STR("\x5", "mqtts"), MQTTMQTTSTransport }, +// { ATOM_STR("\x2", "ws"), MQTTWSTransport }, +// { ATOM_STR("\x3", "wss"), MQTTWSSTransport }, +// SELECT_INT_DEFAULT(MQTTInvalidTransport) +// }; struct platform_data { esp_mqtt_client_handle_t client; term receiver; }; +static term make_atom(GlobalContext *global, const char *string) +{ + int global_atom_index = globalcontext_insert_atom(global, (AtomString) string); + return term_from_atom_index(global_atom_index); +} -static void consume_mailbox(Context *ctx) +static term create_tuple4(Context *ctx, term a, term b, term c, term d) { - Message *message = mailbox_dequeue(ctx); - term msg = message->message; - term pid = term_get_tuple_element(msg, 0); - term ref = term_get_tuple_element(msg, 1); - uint64_t ref_ticks = term_to_ref_ticks(ref); - term req = term_get_tuple_element(msg, 2); + term terms[4]; + terms[0] = a; + terms[1] = b; + terms[2] = c; + terms[3] = d; - int local_process_id = term_to_local_process_id(pid); - Context *target = globalcontext_get_process(ctx->global, local_process_id); + return port_create_tuple_n(ctx, 4, terms); +} - term ret = ERROR_ATOM; - if (term_is_atom(req)) { - if (req == context_make_atom(ctx, stop_atom)) { - do_stop(ctx); - return; - } else if (req == context_make_atom(ctx, disconnect_atom)) { - ret = do_disconnect(ctx); - } else if (req == context_make_atom(ctx, reconnect_atom)) { - ret = do_reconnect(ctx); - } - } - else if (term_is_tuple(req)) { - term cmd = term_get_tuple_element(req, 0); - if (cmd == context_make_atom(ctx, publish_atom)) { - term topic = term_get_tuple_element(req, 1); - term data = term_get_tuple_element(req, 2); - term qos = term_get_tuple_element(req, 3); - term retain = term_get_tuple_element(req, 4); - ret = do_publish(ctx, topic, data, qos, retain); - } else if (cmd == context_make_atom(ctx, subscribe_atom)) { - term topic = term_get_tuple_element(req, 1); - term qos = term_get_tuple_element(req, 2); - ret = do_subscribe(ctx, topic, qos); - } else if (cmd == context_make_atom(ctx, unsubscribe_atom)) { - term topic = term_get_tuple_element(req, 1); - ret = do_unsubscribe(ctx, topic); - } +static term create_tuple5(Context *ctx, term a, term b, term c, term d, term e) +{ + term terms[4]; + terms[0] = a; + terms[1] = b; + terms[2] = c; + terms[3] = d; + terms[4] = e; + + return port_create_tuple_n(ctx, 5, terms); +} + +static term error_type_to_atom(GlobalContext *global, esp_mqtt_error_type_t error_type) +{ + switch (error_type) { + case MQTT_ERROR_TYPE_ESP_TLS: + return make_atom(global, esp_tls_atom); + case MQTT_ERROR_TYPE_CONNECTION_REFUSED: + return make_atom(global, connection_refused_atom); + default: + return UNDEFINED_ATOM; } +} - mailbox_destroy_message(message); +static char *get_default_client_id() +{ + uint8_t mac[6]; + esp_efuse_mac_get_default(mac); - if (UNLIKELY(memory_ensure_free(ctx, 3 + 2) != MEMORY_GC_OK)) { - mailbox_send(target, MEMORY_ATOM); - } else { - term ret_msg = port_create_tuple2(ctx, term_from_ref_ticks(ref_ticks, ctx), ret); - mailbox_send(target, ret_msg); + size_t buf_size = strlen("atomvm-") + 12 + 1; + char *buf = malloc(buf_size); + if (IS_NULL_PTR(buf)) { + ESP_LOGE(TAG, "Failed to allocate client_id buf"); + return NULL; } + snprintf(buf, buf_size, + "atomvm-%02x%02x%02x%02x%02x%02x", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); + return buf; } +static term connect_return_code_to_atom(GlobalContext *global, esp_mqtt_connect_return_code_t connect_return_code) +{ + switch (connect_return_code) { + case MQTT_CONNECTION_ACCEPTED: + return make_atom(global, connection_accepted_atom); + case MQTT_CONNECTION_REFUSE_PROTOCOL: + return make_atom(global, protocol_atom); + case MQTT_CONNECTION_REFUSE_ID_REJECTED: + return make_atom(global, id_rejected_atom); + case MQTT_CONNECTION_REFUSE_SERVER_UNAVAILABLE: + return make_atom(global, server_unavailable_atom); + case MQTT_CONNECTION_REFUSE_BAD_USERNAME: + return make_atom(global, bad_username_atom); + case MQTT_CONNECTION_REFUSE_NOT_AUTHORIZED: + return make_atom(global, not_authorized_atom); + default: + return UNDEFINED_ATOM; + } +} static term do_publish(Context *ctx, term topic, term data, term qos, term retain) { @@ -275,253 +322,393 @@ static term do_reconnect(Context *ctx) } -static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) +static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { + esp_mqtt_event_handle_t event = event_data; + Context *ctx = (Context *) event->user_context; struct platform_data *plfdat = (struct platform_data *) ctx->platform_data; int pid = term_to_local_process_id(plfdat->receiver); Context *target = globalcontext_get_process(ctx->global, pid); - switch (event->event_id) { + esp_mqtt_event_id_t mqtt_event_id = (esp_mqtt_event_id_t) event_id; + switch (mqtt_event_id) { + case MQTT_EVENT_CONNECTED: { TRACE(TAG ": MQTT_EVENT_CONNECTED\n"); - if (UNLIKELY(memory_ensure_free(ctx, 3) != MEMORY_GC_OK)) { + + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { mailbox_send(target, MEMORY_ATOM); - return ESP_OK; + return; } - term msg = port_create_tuple2(ctx, - context_make_atom(ctx, mqtt_atom), - context_make_atom(ctx, connected_atom) + + term msg = port_create_tuple2( + ctx, + make_atom(ctx->global, mqtt_atom), + make_atom(ctx->global, connected_atom) ); + mailbox_send(target, msg); break; } + case MQTT_EVENT_DISCONNECTED: { TRACE(TAG ": MQTT_EVENT_DISCONNECTED\n"); - if (UNLIKELY(memory_ensure_free(ctx, 3) != MEMORY_GC_OK)) { + + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { mailbox_send(target, MEMORY_ATOM); - return ESP_OK; + return; } - term msg = port_create_tuple2(ctx, - context_make_atom(ctx, mqtt_atom), - context_make_atom(ctx, disconnected_atom) + + term msg = port_create_tuple2( + ctx, + make_atom(ctx->global, mqtt_atom), + make_atom(ctx->global, disconnected_atom) ); + mailbox_send(target, msg); break; } + case MQTT_EVENT_SUBSCRIBED: { - TRACE(TAG ": MQTT_EVENT_SUBSCRIBED, msg_id=%d\n", event->msg_id); - if (UNLIKELY(memory_ensure_free(ctx, 4) != MEMORY_GC_OK)) { + TRACE(TAG ": MQTT_EVENT_SUBSCRIBED, msg_id=%d\n", event_id); + + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(3)) != MEMORY_GC_OK)) { mailbox_send(target, MEMORY_ATOM); - return ESP_OK; + return; } - term msg = port_create_tuple3(ctx, - context_make_atom(ctx, mqtt_atom), - context_make_atom(ctx, subscribed_atom), + + term msg = port_create_tuple3( + ctx, + make_atom(ctx->global, mqtt_atom), + make_atom(ctx->global, subscribed_atom), term_from_int(event->msg_id) ); + mailbox_send(target, msg); break; } + case MQTT_EVENT_UNSUBSCRIBED: { - TRACE(TAG ": MQTT_EVENT_UNSUBSCRIBED, msg_id=%d\n", event->msg_id); - if (UNLIKELY(memory_ensure_free(ctx, 4) != MEMORY_GC_OK)) { + TRACE(TAG ": MQTT_EVENT_UNSUBSCRIBED, msg_id=%d\n", event_id); + + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(3)) != MEMORY_GC_OK)) { mailbox_send(target, MEMORY_ATOM); - return ESP_OK; + return; } - term msg = port_create_tuple3(ctx, - context_make_atom(ctx, mqtt_atom), - context_make_atom(ctx, unsubscribed_atom), + + term msg = port_create_tuple3( + ctx, + make_atom(ctx->global, mqtt_atom), + make_atom(ctx->global, unsubscribed_atom), term_from_int(event->msg_id) ); + mailbox_send(target, msg); break; } + case MQTT_EVENT_PUBLISHED: { TRACE(TAG ": MQTT_EVENT_PUBLISHED, msg_id=%d\n", event->msg_id); - if (UNLIKELY(memory_ensure_free(ctx, 4) != MEMORY_GC_OK)) { + + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(3)) != MEMORY_GC_OK)) { mailbox_send(target, MEMORY_ATOM); - return ESP_OK; + return; } - term msg = port_create_tuple3(ctx, - context_make_atom(ctx, mqtt_atom), - context_make_atom(ctx, published_atom), + + term msg = port_create_tuple3( + ctx, + make_atom(ctx->global, mqtt_atom), + make_atom(ctx->global, published_atom), term_from_int(event->msg_id) ); + mailbox_send(target, msg); break; } + case MQTT_EVENT_DATA: { - TRACE(TAG ": MQTT_EVENT_DATA, msg_id=%d\n", event->msg_id); + TRACE(TAG ": MQTT_EVENT_DATA, event_id=%d\n", event_id); TRACE(TAG ": TOPIC=%.*s\n", event->topic_len, event->topic); TRACE(TAG ": DATA=%.*s\n", event->data_len, event->data); + int topic_size = term_binary_data_size_in_terms(event->topic_len); int data_size = term_binary_data_size_in_terms(event->data_len); - if (UNLIKELY(memory_ensure_free(ctx, 5 + topic_size + data_size) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(4) + topic_size + data_size) != MEMORY_GC_OK)) { mailbox_send(target, MEMORY_ATOM); - return ESP_OK; + return; } + term topic = term_from_literal_binary(event->topic, event->topic_len, ctx); term data = term_from_literal_binary(event->data, event->data_len, ctx); - term msg = create_tuple4(ctx, - context_make_atom(ctx, mqtt_atom), - context_make_atom(ctx, data_atom), + term msg = create_tuple4( + ctx, + make_atom(ctx->global, mqtt_atom), + make_atom(ctx->global, data_atom), topic, data ); + mailbox_send(target, msg); break; } + case MQTT_EVENT_ERROR: { ESP_LOGE(TAG, "MQTT_EVENT_ERROR"); - if (UNLIKELY(memory_ensure_free(ctx, 3 + 6) != MEMORY_GC_OK)) { + + // {mqtt, {ErrorType :: atom(), ConnectReturnCode :: atom(), tls_last_esp_err :: integer(), tls_stack_err :: integer(), tls_cert_verify_flags :: integer()}} + size_t heap_size = TUPLE_SIZE(3) + TUPLE_SIZE(5); + if (UNLIKELY(memory_ensure_free(ctx, heap_size) != MEMORY_GC_OK)) { mailbox_send(target, MEMORY_ATOM); - return ESP_OK; + return; } + esp_mqtt_error_codes_t *mqtt_error = event->error_handle; - term error = create_tuple5(ctx, - error_type_to_atom(ctx, mqtt_error->error_type), - connect_return_code_to_atom(ctx, mqtt_error->connect_return_code), + term error = create_tuple5( + ctx, + error_type_to_atom(ctx->global, mqtt_error->error_type), + connect_return_code_to_atom(ctx->global, mqtt_error->connect_return_code), term_from_int(mqtt_error->esp_tls_last_esp_err), term_from_int(mqtt_error->esp_tls_stack_err), term_from_int(mqtt_error->esp_tls_cert_verify_flags) ); - term msg = port_create_tuple3(ctx, - context_make_atom(ctx, mqtt_atom), + + term msg = port_create_tuple3( + ctx, + make_atom(ctx->global, mqtt_atom), ERROR_ATOM, error ); + mailbox_send(target, msg); break; } + case MQTT_EVENT_BEFORE_CONNECT: { - ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT msg_id: %d", event->msg_id); + ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT event_id: %d", event_id); break; } + default: - ESP_LOGW(TAG, "Other event. event_id: %d", event->event_id); + ESP_LOGW(TAG, "Other event. event_id: %d", event_id); break; } - return ESP_OK; } -static term error_type_to_atom(Context *ctx, esp_mqtt_error_type_t error_type) +static void consume_mailbox(Context *ctx) { - switch (error_type) { - case MQTT_ERROR_TYPE_ESP_TLS: - return context_make_atom(ctx, esp_tls_atom); - case MQTT_ERROR_TYPE_CONNECTION_REFUSED: - return context_make_atom(ctx, connection_refused_atom); - default: - return UNDEFINED_ATOM; - } -} + Message *message = mailbox_dequeue(ctx); + term msg = message->message; + term pid = term_get_tuple_element(msg, 0); + term ref = term_get_tuple_element(msg, 1); + term req = term_get_tuple_element(msg, 2); + int local_process_id = term_to_local_process_id(pid); + Context *target = globalcontext_get_process(ctx->global, local_process_id); -static term connect_return_code_to_atom(Context *ctx, esp_mqtt_connect_return_code_t connect_return_code) -{ - switch (connect_return_code) { - case MQTT_CONNECTION_ACCEPTED: - return context_make_atom(ctx, connection_accepted_atom); - case MQTT_CONNECTION_REFUSE_PROTOCOL: - return context_make_atom(ctx, protocol_atom); - case MQTT_CONNECTION_REFUSE_ID_REJECTED: - return context_make_atom(ctx, id_rejected_atom); - case MQTT_CONNECTION_REFUSE_SERVER_UNAVAILABLE: - return context_make_atom(ctx, server_unavailable_atom); - case MQTT_CONNECTION_REFUSE_BAD_USERNAME: - return context_make_atom(ctx, bad_username_atom); - case MQTT_CONNECTION_REFUSE_NOT_AUTHORIZED: - return context_make_atom(ctx, not_authorized_atom); - default: - return UNDEFINED_ATOM; - } -} + term ret = ERROR_ATOM; -static term make_atom(GlobalContext *global, const char *string) -{ - int global_atom_index = globalcontext_insert_atom(global, (AtomString) string); - return term_from_atom_index(global_atom_index); -} + if (term_is_atom(req)) { + int cmd = interop_atom_term_select_int(cmd_table, req, ctx->global); + switch (cmd) { -static term create_tuple4(Context *ctx, term a, term b, term c, term d) -{ - term terms[4]; - terms[0] = a; - terms[1] = b; - terms[2] = c; - terms[3] = d; + case MQTTStopCmd: + do_stop(ctx); + break; - return port_create_tuple_n(ctx, 4, terms); -} + case MQTTDisconnectCmd: + ret = do_disconnect(ctx); + break; + case MQTTReconnectCmd: + ret = do_reconnect(ctx); + break; -static term create_tuple5(Context *ctx, term a, term b, term c, term d, term e) -{ - term terms[4]; - terms[0] = a; - terms[1] = b; - terms[2] = c; - terms[3] = d; - terms[4] = e; + default: + ESP_LOGE(TAG, "Unknown command"); + break; + } - return port_create_tuple_n(ctx, 5, terms); + } else if (term_is_tuple(req) && term_get_tuple_arity(req) > 0) { + + int cmd = interop_atom_term_select_int(cmd_table, term_get_tuple_element(req, 0), ctx->global); + switch (cmd) { + case MQTTPublishCmd: { + term topic = term_get_tuple_element(req, 1); + term data = term_get_tuple_element(req, 2); + term qos = term_get_tuple_element(req, 3); + term retain = term_get_tuple_element(req, 4); + ret = do_publish(ctx, topic, data, qos, retain); + } + break; + + case MQTTSubscribeCmd: { + term topic = term_get_tuple_element(req, 1); + term qos = term_get_tuple_element(req, 2); + ret = do_subscribe(ctx, topic, qos); + } + break; + + case MQTTUnSubscribeCmd: { + term topic = term_get_tuple_element(req, 1); + ret = do_unsubscribe(ctx, topic); + } + break; + + default: + ESP_LOGE(TAG, "Unknown command"); + break; + } + } else { + ESP_LOGE(TAG, "Invalid command"); + } + + // {Ref, ok | error} + size_t heap_size = TUPLE_SIZE(2) + REF_SIZE; + if (UNLIKELY(memory_ensure_free(ctx, heap_size) != MEMORY_GC_OK)) { + mailbox_send(target, MEMORY_ATOM); + } else { + term ret_msg = port_create_tuple2(ctx, ref, ret); + mailbox_send(target, ret_msg); + } + + mailbox_destroy_message(message); } +// +// entrypoints +// + void atomvm_mqtt_client_init(GlobalContext *global) { esp_log_level_set("MQTT_CLIENT", ESP_LOG_VERBOSE); } -Context *atomvm_mqtt_client_create_port(GlobalContext *global, term opts) +// NB. Caller assumes ownership of returned string +static char *maybe_get_string(term kv, AtomString key, GlobalContext *global) { - term receiver = interop_proplist_get_value(opts, make_atom(global, receiver_atom)); - term url_term = interop_proplist_get_value(opts, make_atom(global, url)); + term value_term = interop_kv_get_value(kv, key, global); + if (!term_is_string(value_term) && !term_is_binary(value_term)) { + return NULL; + } int ok; - char *url_str = interop_term_to_string(url_term, &ok); - if (!ok) { - ESP_LOGE(TAG, "Error: url is not a proper string or binary."); + char *value_str = interop_term_to_string(value_term, &ok); + if (UNLIKELY(!ok)) { + ESP_LOGE(TAG, "Error: value is not a proper string or binary."); + return NULL; + } + return value_str; +} + +// NB. Caller assumes ownership of returned string +static char *get_string_default(term kv, AtomString key, AtomString default_value, GlobalContext *global) +{ + term value_term = interop_kv_get_value(kv, key, global); + if (!term_is_string(value_term) && !term_is_binary(value_term)) { + int len = atom_string_len(default_value); + char *buf = malloc(len + 1); + if (IS_NULL_PTR(buf)) { + ESP_LOGW(TAG, "Unable to allocate memory for default value"); + return NULL; + } + atom_string_to_c(default_value, buf, len); + return buf; + } + + int ok; + char *value_str = interop_term_to_string(value_term, &ok); + if (UNLIKELY(!ok)) { + ESP_LOGE(TAG, "Error: value is not a proper string or binary."); return NULL; } - if (UNLIKELY(IS_NULL_PTR(url_str))) { - ESP_LOGE(TAG, "Error: Unable to allocate url string."); + return value_str; +} + +// static esp_mqtt_transport_t get_transport(term kv, GlobalContext *global) +// { +// int transport = interop_atom_term_select_int(transport_table, transport_atom, global); +// switch (transport) { +// case MQTTMQTTTransport: +// return MQTT_TRANSPORT_OVER_TCP; +// case MQTTMQTTSTransport: +// return MQTT_TRANSPORT_OVER_SSL; +// case MQTTWSTransport: +// return MQTT_TRANSPORT_OVER_WS; +// case MQTTWSSTransport: +// return MQTT_TRANSPORT_OVER_WSS; +// default: +// ESP_LOGW(TAG, "Unknown transport"); +// return MQTT_TRANSPORT_UNKNOWN; +// } +// } + +Context *atomvm_mqtt_client_create_port(GlobalContext *global, term opts) +{ + term receiver_term = interop_kv_get_value(opts, receiver_atom, global); + if (UNLIKELY(!term_is_pid(receiver_term))) { + ESP_LOGE(TAG, "Missing receiver pid during port creation"); return NULL; } Context *ctx = context_new(global); ctx->native_handler = consume_mailbox; + struct platform_data *plfdat = malloc(sizeof(struct platform_data)); + plfdat->receiver = receiver_term; + ctx->platform_data = plfdat; + + char *url_str = maybe_get_string(opts, url_atom, global); + // esp_mqtt_transport_t transport = get_transport(opts, global); + char *host_str = maybe_get_string(opts, host_atom, global); + term port_term = interop_kv_get_value(opts, port_atom, global); + int port = 0; + if (term_is_integer(port_term)) { + port = term_from_int(port_term); + } + char *username_str = maybe_get_string(opts, username_atom, global); + char *password_str = maybe_get_string(opts, password_atom, global); + // char *cert_str = maybe_get_string(opts, cert_atom, global); + + // Note that char * values passed into this struct are copied into the MQTT state + const char *client_id = get_default_client_id(); esp_mqtt_client_config_t mqtt_cfg = { .uri = url_str, - .event_handle = mqtt_event_handler, + .client_id = client_id, .user_context = (void *) ctx }; esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); + free(url_str); + free(host_str); + free(username_str); + free(password_str); + if (UNLIKELY(IS_NULL_PTR(client))) { ESP_LOGE(TAG, "Error: Unable to initialize MQTT client.\n"); return NULL; } + + esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, ctx); + esp_err_t err = esp_mqtt_client_start(client); if (err != ESP_OK) { context_destroy(ctx); ESP_LOGE(TAG, "Error: Unable to start MQTT client. Error: %i.\n", err); return NULL; } - - struct platform_data *plfdat = malloc(sizeof(struct platform_data)); plfdat->client = client; - plfdat->receiver = receiver; - ctx->platform_data = plfdat; TRACE(TAG ": MQTT started.\n"); return ctx; } #include + #ifdef CONFIG_AVM_MQTT_CLIENT_ENABLE + REGISTER_PORT_DRIVER(atomvm_mqtt_client, atomvm_mqtt_client_init, atomvm_mqtt_client_create_port) + #endif diff --git a/src/mqtt_client.erl b/src/mqtt_client.erl index b61f872..50ec6d0 100644 --- a/src/mqtt_client.erl +++ b/src/mqtt_client.erl @@ -1,5 +1,5 @@ %% -%% Copyright (c) 2021 dushin.net +%% Copyright (c) 2021-2023 dushin.net %% All rights reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -46,7 +46,7 @@ -module(mqtt_client). -export([ - start/1, stop/1, disconnect/1, reconnect/1, + start/1, start/2, start_link/1, start_link/2, stop/1, disconnect/1, reconnect/1, get_config/1, get_pending_publishes/1, get_pending_subscriptions/1, get_pending_unsubscriptions/1, publish/3, publish/4, subscribe/3, unsubscribe/3 ]). @@ -54,7 +54,7 @@ -behavior(gen_server). -% -define(TRACE(A, B), io:format(A, B)). +% -define(TRACE(A, B), io:format("[mqtt_client] " ++ A, B)). -define(TRACE(A, B), ok). -record(state, { @@ -82,7 +82,9 @@ disconnected_handler => fun((mqtt()) -> any()), error_handler => fun((mqtt(), error()) -> any()), username => binary_or_string(), - password => binary_or_string() + password => binary_or_string(), + client_id => binary_or_string(), + trusted_cert => binary_or_string() }. -type error_type() :: esp_tls | connection_refused | undefined. @@ -189,6 +191,18 @@ start(Config) -> gen_server:start(?MODULE, Config, []). +-spec start(ServerName :: {local, atom()}, Config::config()) -> {ok, mqtt()} | {error, Reason::term()}. +start(ServerName, Config) -> + gen_server:start(ServerName, ?MODULE, Config, []). + +-spec start_link(Config::config()) -> {ok, mqtt()} | {error, Reason::term()}. +start_link(Config) -> + gen_server:start_link(?MODULE, Config, []). + +-spec start_link(ServerName :: {local, atom()}, Config::config()) -> {ok, mqtt()} | {error, Reason::term()}. +start_link(ServerName, Config) -> + gen_server:start_link(ServerName, ?MODULE, Config, []). + %%----------------------------------------------------------------------------- %% @param MQTT the MQTT client instance created via `start/1' %% @returns ok @@ -473,6 +487,7 @@ handle_call({publish, Topic, Message, PublishOptions}, _From, State) -> {reply, ok, State}; _ -> PendingPublishes = State#state.pending_publishes, + ?TRACE("qos=~p msg_id=~p PendingPublishes=~p~n", [Qos, MsgId, PendingPublishes]), NewPendingPublishes = PendingPublishes#{MsgId => {Topic, PublishOptions}}, {reply, MsgId, State#state{pending_publishes=NewPendingPublishes}} end; @@ -568,7 +583,7 @@ handle_info({mqtt, published, MsgId}, State) -> PendingPublishes = State#state.pending_publishes, NewPendingPublishes = case maps:get(MsgId, PendingPublishes, undefined) of undefined -> - io:format("WARNING: `published` message received but no subscriber was found for msg id ~p~n", [MsgId]), + io:format("WARNING: `published` message received but no callback was found for msg id ~p~n", [MsgId]), PendingPublishes; {Topic, PublishOptions} -> ?TRACE("Found pending publish. Topic=~p PublishOptions=~p~n", [Topic, PublishOptions]), @@ -672,37 +687,28 @@ code_change(_OldVsn, State, _Extra) -> %% @private do_publish(Port, Topic, Message, Qos, Retain) -> - call(Port, {publish, Topic, Message, qos_to_int(Qos), Retain}). + port:call(Port, {publish, Topic, Message, qos_to_int(Qos), Retain}). %% @private do_subscribe(Port, Topic, Qos) -> - call(Port, {subscribe, Topic, qos_to_int(Qos)}). + port:call(Port, {subscribe, Topic, qos_to_int(Qos)}). %% @private do_unsubscribe(Port, Topic) -> - call(Port, {unsubscribe, Topic}). + port:call(Port, {unsubscribe, Topic}). %% @private do_stop(Port) -> - do_disconnect(Port), + port:do_disconnect(Port), Port ! stop. %% @private do_disconnect(Port) -> - call(Port, disconnect). + port:call(Port, disconnect). %% @private do_reconnect(Port) -> - call(Port, reconnect). - -%% @private -call(Port, Msg) -> - Ref = make_ref(), - Port ! {self(), Ref, Msg}, - receive - {Ref, Ret} -> - Ret - end. + port:call(Port, reconnect). validate_publish_options(Options) -> validate_function_or_pid_or_undefined(published_handler, Options),