From 8cbbdb729047fd67d0fbce841f17e6c9311c3667 Mon Sep 17 00:00:00 2001 From: Fred Dushin Date: Wed, 12 Apr 2023 12:57:15 -0400 Subject: [PATCH] Added CI build and cleaned up code to conform to erlfmt --- .github/workflows/build.yaml | 155 ++++++++++ examples/mqtt_client_example/rebar.config | 5 +- .../src/mqtt_client_example.app.src | 4 +- rebar.config | 2 +- src/mqtt_client.app.src | 4 +- src/mqtt_client.erl | 278 +++++++++++------- 6 files changed, 333 insertions(+), 115 deletions(-) create mode 100644 .github/workflows/build.yaml diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 0000000..0ab44ae --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,155 @@ +# +# Copyright 2023 Fred Dushin +# +# SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +# + +name: "Build MQTT Client" + +on: ["push", "pull_request"] + +jobs: + esp32-build: + runs-on: ubuntu-latest + container: espressif/idf:v${{ matrix.idf-version }} + + strategy: + matrix: + idf-version: ["4.4.3"] + otp: ["24"] + elixir_version: ["1.11"] + soc: ["esp32", "esp32c3", "esp32s2", "esp32s3"] + + env: + ImageOS: "ubuntu20" + + steps: + - name: Checkout repo + uses: actions/checkout@v2 + + - uses: erlef/setup-beam@v1 + with: + otp-version: ${{ matrix.otp }} + elixir-version: ${{ matrix.elixir_version }} + + - name: "APT update" + run: apt update -y + + - name: "Install deps" + run: DEBIAN_FRONTEND=noninteractive apt install -y git cmake + + - name: "System info" + run: | + echo "**uname:**" + uname -a + echo "**OTP version:**" + cat $(dirname $(which erlc))/../releases/RELEASES || true + + - name: "Check out AtomVM" + run: | + pwd + ls -l + git clone https://github.com/atomvm/AtomVM + cd AtomVM + git rev-parse --short HEAD + + - name: "Build: run cmake" + working-directory: AtomVM + run: | + mkdir build + cd build + cmake .. + + - name: "Build erlang and Elixir libs" + working-directory: AtomVM/build/libs + run: | + make + + - name: "Build ${{ matrix.soc }} AtomVM with atomvm_mqtt_client" + working-directory: ./AtomVM/src/platforms/esp32/ + run: | + cd components + ln -s /__w/atomvm_mqtt_client/atomvm_mqtt_client + cd .. + . $IDF_PATH/export.sh + idf.py set-target ${{ matrix.soc }} + idf.py reconfigure + idf.py build + + - name: "Create a ${{ matrix.soc }} atomvm_mqtt_client image" + working-directory: ./AtomVM/src/platforms/esp32/build + run: | + ./mkimage.sh + for i in atomvm-${{ matrix.soc }}-*.img; do + mv -- "${i}" "${i%.img}-atomvm_mqtt_client.img" + done + ls -l *.img + + - name: "Upload ${{ matrix.soc }} artifacts" + uses: actions/upload-artifact@v3 + with: + name: atomvm-${{ matrix.soc }}-atomvm_mqtt_client-image + path: ./AtomVM/src/platforms/esp32/build/atomvm-${{ matrix.soc }}-*.img + if-no-files-found: error + + avm-build: + + runs-on: ubuntu-latest + steps: + - name: Checkout repo + uses: actions/checkout@v2 + + - uses: erlef/setup-beam@v1 + with: + otp-version: "24" + + - name: "Build rebar3" + run: | + cd /tmp + git clone https://github.com/erlang/rebar3.git + cd rebar3 + ./bootstrap + + - name: "Build atomvm_mqtt_client AVM" + run: | + REBAR="/tmp/rebar3/rebar3" + ${REBAR} fmt -c + ${REBAR} packbeam -f -i + + - name: "Upload atomvm_mqtt_client AVM" + uses: actions/upload-artifact@v3 + with: + name: atomvm-mqtt_client-avm + path: ./_build/default/lib/mqtt_client.avm + if-no-files-found: error + + - name: "Build Example Programs" + run: | + REBAR="/tmp/rebar3/rebar3" + EXAMPLES="mqtt_client_example" + for i in ${EXAMPLES}; do cd ./examples/$i; ${REBAR} fmt -c || exit 1; ${REBAR} packbeam -p -f -i || exit 1; cd ../..; done + + release-if-master: + if: github.ref_name == 'master' + needs: ["esp32-build", "avm-build"] + runs-on: ubuntu-latest + steps: + - name: "Download artifacts" + uses: actions/download-artifact@v3 + + - name: Display structure of downloaded files + run: | + ls -R + + - name: Upload latest release + uses: pyTooling/Actions/releaser@main + with: + tag: latest + rm: true + token: ${{ secrets.GITHUB_TOKEN }} + files: | + atomvm-esp32-atomvm_mqtt_client-image/*.img + atomvm-esp32c3-atomvm_mqtt_client-image/*.img + atomvm-esp32s2-atomvm_mqtt_client-image/*.img + atomvm-esp32s3-atomvm_mqtt_client-image/*.img + atomvm-mqtt_client-avm/*.avm diff --git a/examples/mqtt_client_example/rebar.config b/examples/mqtt_client_example/rebar.config index e15d233..de3d3ba 100644 --- a/examples/mqtt_client_example/rebar.config +++ b/examples/mqtt_client_example/rebar.config @@ -1,5 +1,6 @@ {erl_opts, [debug_info]}. {deps, [ - {atomvm_mqtt_client, {git, "https://github.com/atomvm/atomvm_mqtt_client.git", {branch, "master"}}} + {atomvm_mqtt_client, + {git, "https://github.com/atomvm/atomvm_mqtt_client.git", {branch, "master"}}} ]}. -{plugins, [atomvm_rebar3_plugin]}. +{plugins, [atomvm_rebar3_plugin, erlfmt]}. diff --git a/examples/mqtt_client_example/src/mqtt_client_example.app.src b/examples/mqtt_client_example/src/mqtt_client_example.app.src index eee0099..ee8969f 100644 --- a/examples/mqtt_client_example/src/mqtt_client_example.app.src +++ b/examples/mqtt_client_example/src/mqtt_client_example.app.src @@ -5,8 +5,8 @@ {applications, [ kernel, stdlib ]}, - {env,[]}, + {env, []}, {modules, []}, {licenses, ["Apache 2.0"]}, {links, []} - ]}. +]}. diff --git a/rebar.config b/rebar.config index ccd0808..daabeee 100644 --- a/rebar.config +++ b/rebar.config @@ -1,3 +1,3 @@ {erl_opts, [debug_info]}. {deps, []}. -{plugins, [atomvm_rebar3_plugin]}. +{plugins, [atomvm_rebar3_plugin, erlfmt]}. diff --git a/src/mqtt_client.app.src b/src/mqtt_client.app.src index f1221c2..b97500a 100644 --- a/src/mqtt_client.app.src +++ b/src/mqtt_client.app.src @@ -5,8 +5,8 @@ {applications, [ kernel, stdlib ]}, - {env,[]}, + {env, []}, {modules, []}, {licenses, ["Apache 2.0"]}, {links, []} - ]}. +]}. diff --git a/src/mqtt_client.erl b/src/mqtt_client.erl index 50ec6d0..f2a7859 100644 --- a/src/mqtt_client.erl +++ b/src/mqtt_client.erl @@ -46,9 +46,18 @@ -module(mqtt_client). -export([ - start/1, start/2, start_link/1, start_link/2, stop/1, disconnect/1, reconnect/1, - get_config/1, get_pending_publishes/1, get_pending_subscriptions/1, get_pending_unsubscriptions/1, - publish/3, publish/4, subscribe/3, unsubscribe/3 + start/1, start/2, + start_link/1, start_link/2, + stop/1, + disconnect/1, + reconnect/1, + get_config/1, + get_pending_publishes/1, + get_pending_subscriptions/1, + get_pending_unsubscriptions/1, + publish/3, publish/4, + subscribe/3, + unsubscribe/3 ]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -88,11 +97,24 @@ }. -type error_type() :: esp_tls | connection_refused | undefined. --type connect_return_code() :: connection_accepted | protocol | id_rejected | server_unavailable | bad_username | not_authorized | undefined. +-type connect_return_code() :: + connection_accepted + | protocol + | id_rejected + | server_unavailable + | bad_username + | not_authorized + | undefined. -type tls_last_esp_err() :: integer(). -type tls_stack_err() :: integer(). -type tls_cert_verify_flags() :: integer(). --type error() :: {error_type(), connect_return_code(), tls_last_esp_err(), tls_stack_err(), tls_cert_verify_flags()}. +-type error() :: { + error_type(), + connect_return_code(), + tls_last_esp_err(), + tls_stack_err(), + tls_cert_verify_flags() +}. -type qos() :: at_most_once | at_least_once | exactly_once. @@ -187,19 +209,21 @@ %% %% @end %%----------------------------------------------------------------------------- --spec start(Config::config()) -> {ok, mqtt()} | {error, Reason::term()}. +-spec start(Config :: config()) -> {ok, mqtt()} | {error, Reason :: term()}. start(Config) -> gen_server:start(?MODULE, Config, []). --spec start(ServerName :: {local, atom()}, Config::config()) -> {ok, mqtt()} | {error, Reason::term()}. +-spec start(ServerName :: {local, atom()}, Config :: config()) -> + {ok, mqtt()} | {error, Reason :: term()}. start(ServerName, Config) -> gen_server:start(ServerName, ?MODULE, Config, []). --spec start_link(Config::config()) -> {ok, mqtt()} | {error, Reason::term()}. +-spec start_link(Config :: config()) -> {ok, mqtt()} | {error, Reason :: term()}. start_link(Config) -> gen_server:start_link(?MODULE, Config, []). --spec start_link(ServerName :: {local, atom()}, Config::config()) -> {ok, mqtt()} | {error, Reason::term()}. +-spec start_link(ServerName :: {local, atom()}, Config :: config()) -> + {ok, mqtt()} | {error, Reason :: term()}. start_link(ServerName, Config) -> gen_server:start_link(ServerName, ?MODULE, Config, []). @@ -209,7 +233,7 @@ start_link(ServerName, Config) -> %% @doc Stop the specified MQTT. %% @end %%----------------------------------------------------------------------------- --spec stop(MQTT::mqtt()) -> ok. +-spec stop(MQTT :: mqtt()) -> ok. stop(MQTT) -> gen_server:stop(MQTT). @@ -219,7 +243,7 @@ stop(MQTT) -> %% @doc Disconnect the specified MQTT from the MQTT broker. %% @end %%----------------------------------------------------------------------------- --spec disconnect(MQTT::mqtt()) -> ok. +-spec disconnect(MQTT :: mqtt()) -> ok. disconnect(MQTT) -> gen_server:call(MQTT, disconnect). @@ -229,7 +253,7 @@ disconnect(MQTT) -> %% @doc Reconnect the specified MQTT to the MQTT broker. %% @end %%----------------------------------------------------------------------------- --spec reconnect(MQTT::mqtt()) -> ok | error. +-spec reconnect(MQTT :: mqtt()) -> ok | error. reconnect(MQTT) -> gen_server:call(MQTT, reconnect). @@ -239,7 +263,7 @@ reconnect(MQTT) -> %% @doc Get the configuation used to initialize this MQTT instance %% @end %%----------------------------------------------------------------------------- --spec get_config(MQTT::mqtt()) -> config(). +-spec get_config(MQTT :: mqtt()) -> config(). get_config(MQTT) -> gen_server:call(MQTT, config). @@ -249,7 +273,7 @@ get_config(MQTT) -> %% @doc Get the list of pending publishes associated with this MQTT instance %% @end %%----------------------------------------------------------------------------- --spec get_pending_publishes(MQTT::mqtt()) -> [msg_id()]. +-spec get_pending_publishes(MQTT :: mqtt()) -> [msg_id()]. get_pending_publishes(MQTT) -> gen_server:call(MQTT, pending_publishes). @@ -259,7 +283,7 @@ get_pending_publishes(MQTT) -> %% @doc Get the list of pending subscriptions associated with this MQTT instance %% @end %%----------------------------------------------------------------------------- --spec get_pending_subscriptions(MQTT::mqtt()) -> [msg_id()]. +-spec get_pending_subscriptions(MQTT :: mqtt()) -> [msg_id()]. get_pending_subscriptions(MQTT) -> gen_server:call(MQTT, pending_subscriptions). @@ -269,7 +293,7 @@ get_pending_subscriptions(MQTT) -> %% @doc Get the list of pending unsubscriptions associated with this MQTT instance %% @end %%----------------------------------------------------------------------------- --spec get_pending_unsubscriptions(MQTT::mqtt()) -> [msg_id()]. +-spec get_pending_unsubscriptions(MQTT :: mqtt()) -> [msg_id()]. get_pending_unsubscriptions(MQTT) -> gen_server:call(MQTT, pending_unsubscriptions). @@ -283,11 +307,14 @@ get_pending_unsubscriptions(MQTT) -> %% This function is equivalent to `publish(MQTT, Topic, Message, #{})' %% @end %%----------------------------------------------------------------------------- --spec publish(MQTT::mqtt(), Topic::topic(), Message::binary_or_string()) -> ok | {error, Reason::term()}. -publish(MQTT, Topic, Message) - when is_pid(MQTT) andalso (is_binary(Topic) orelse is_list(Topic)) andalso (is_binary(Message) orelse is_list(Message)) -> +-spec publish(MQTT :: mqtt(), Topic :: topic(), Message :: binary_or_string()) -> + ok | {error, Reason :: term()}. +publish(MQTT, Topic, Message) when + is_pid(MQTT) andalso (is_binary(Topic) orelse is_list(Topic)) andalso + (is_binary(Message) orelse is_list(Message)) +-> ?MODULE:publish(MQTT, Topic, Message, maps:new()); -publish(_,_,_) -> +publish(_, _, _) -> throw(badarg). %%----------------------------------------------------------------------------- @@ -338,11 +365,20 @@ publish(_,_,_) -> %% %% @end %%----------------------------------------------------------------------------- --spec publish(MQTT::mqtt(), Topic::topic(), Message::binary_or_string(), PublishOptions::publish_options()) -> msg_id() | {error, Reason::term()}. -publish(MQTT, Topic, Message, PublishOptions) - when is_pid(MQTT) andalso (is_binary(Topic) orelse is_list(Topic)) andalso (is_binary(Message) orelse is_list(Message)) andalso is_map(PublishOptions) -> - gen_server:call(MQTT, {publish, Topic, Message, validate_publish_options(PublishOptions)}, 30000); -publish(_,_,_, _) -> +-spec publish( + MQTT :: mqtt(), + Topic :: topic(), + Message :: binary_or_string(), + PublishOptions :: publish_options() +) -> msg_id() | {error, Reason :: term()}. +publish(MQTT, Topic, Message, PublishOptions) when + is_pid(MQTT) andalso (is_binary(Topic) orelse is_list(Topic)) andalso + (is_binary(Message) orelse is_list(Message)) andalso is_map(PublishOptions) +-> + gen_server:call( + MQTT, {publish, Topic, Message, validate_publish_options(PublishOptions)}, 30000 + ); +publish(_, _, _, _) -> throw(badarg). %%----------------------------------------------------------------------------- @@ -394,11 +430,13 @@ publish(_,_,_, _) -> %% %% @end %%----------------------------------------------------------------------------- --spec subscribe(MQTT::mqtt(), Topic::topic(), SubscribeOptions::subscribe_options()) -> ok | {error, Reason::term()}. -subscribe(MQTT, Topic, SubscribeOptions) - when is_pid(MQTT) andalso (is_binary(Topic) orelse is_list(Topic)) andalso is_map(SubscribeOptions) -> +-spec subscribe(MQTT :: mqtt(), Topic :: topic(), SubscribeOptions :: subscribe_options()) -> + ok | {error, Reason :: term()}. +subscribe(MQTT, Topic, SubscribeOptions) when + is_pid(MQTT) andalso (is_binary(Topic) orelse is_list(Topic)) andalso is_map(SubscribeOptions) +-> gen_server:call(MQTT, {subscribe, Topic, validate_subscribe_options(SubscribeOptions)}, 30000); -subscribe(_,_,_) -> +subscribe(_, _, _) -> throw(badarg). %%----------------------------------------------------------------------------- @@ -434,14 +472,17 @@ subscribe(_,_,_) -> %% %% @end %%----------------------------------------------------------------------------- --spec unsubscribe(MQTT::mqtt(), Topic::topic(), UnSubscribeOptions::unsubscribe_options()) -> ok | {error, Reason::term()}. -unsubscribe(MQTT, Topic, UnSubscribeOptions) - when is_pid(MQTT) andalso (is_binary(Topic) orelse is_list(Topic)) andalso is_map(UnSubscribeOptions) -> - gen_server:call(MQTT, {unsubscribe, Topic, validate_unsubscribe_options(UnSubscribeOptions)}, 30000); -unsubscribe(_,_,_) -> +-spec unsubscribe(MQTT :: mqtt(), Topic :: topic(), UnSubscribeOptions :: unsubscribe_options()) -> + ok | {error, Reason :: term()}. +unsubscribe(MQTT, Topic, UnSubscribeOptions) when + is_pid(MQTT) andalso (is_binary(Topic) orelse is_list(Topic)) andalso is_map(UnSubscribeOptions) +-> + gen_server:call( + MQTT, {unsubscribe, Topic, validate_unsubscribe_options(UnSubscribeOptions)}, 30000 + ); +unsubscribe(_, _, _) -> throw(badarg). - %% ============================================================================ %% %% gen_server API @@ -452,10 +493,12 @@ unsubscribe(_,_,_) -> init(Config) -> try Self = self(), - Port = erlang:open_port({spawn, "atomvm_mqtt_client"}, [{receiver, Self}, {url, maps:get(url, Config)}]), + Port = erlang:open_port({spawn, "atomvm_mqtt_client"}, [ + {receiver, Self}, {url, maps:get(url, Config)} + ]), {ok, #state{ - port=Port, - config=Config + port = Port, + config = Config }} catch _:Error -> @@ -478,7 +521,9 @@ handle_call(pending_subscriptions, _From, State) -> handle_call(pending_unsubscriptions, _From, State) -> {reply, maps:keys(State#state.pending_unsubscriptions), State}; handle_call({publish, Topic, Message, PublishOptions}, _From, State) -> - ?TRACE("Handling call for publish. Topic=~p Message: ~p PublishOptions=~p~n", [Topic, Message, PublishOptions]), + ?TRACE("Handling call for publish. Topic=~p Message: ~p PublishOptions=~p~n", [ + Topic, Message, PublishOptions + ]), Qos = maps:get(qos, PublishOptions, at_most_once), Retain = maps:get(retain, PublishOptions, false), MsgId = do_publish(State#state.port, Topic, Message, Qos, Retain), @@ -489,7 +534,7 @@ handle_call({publish, Topic, Message, PublishOptions}, _From, State) -> PendingPublishes = State#state.pending_publishes, ?TRACE("qos=~p msg_id=~p PendingPublishes=~p~n", [Qos, MsgId, PendingPublishes]), NewPendingPublishes = PendingPublishes#{MsgId => {Topic, PublishOptions}}, - {reply, MsgId, State#state{pending_publishes=NewPendingPublishes}} + {reply, MsgId, State#state{pending_publishes = NewPendingPublishes}} end; handle_call({subscribe, Topic, Options}, _From, State) -> ?TRACE("Handling call for subscribe. Topic=~p Options=~p~n", [Topic, Options]), @@ -501,14 +546,14 @@ handle_call({subscribe, Topic, Options}, _From, State) -> MsgId when is_integer(MsgId) -> ?TRACE("Subscription msg_id: ~p~n", [MsgId]), NewSubscriber = #subscriber{ - msg_id=MsgId, - topic=Topic, + msg_id = MsgId, + topic = Topic, subscribed_handler = maps:get(subscribed_handler, Options, undefined), data_handler = maps:get(data_handler, Options, undefined) }, PendingSubscriptions = State#state.pending_subscriptions, NewPendingSubscriptions = PendingSubscriptions#{MsgId => NewSubscriber}, - {reply, ok, State#state{pending_subscriptions=NewPendingSubscriptions}}; + {reply, ok, State#state{pending_subscriptions = NewPendingSubscriptions}}; Error -> {reply, Error, State} end; @@ -526,7 +571,7 @@ handle_call({unsubscribe, Topic, Options}, _From, State) -> MsgId when is_integer(MsgId) -> PendingUnsubscriptions = State#state.pending_unsubscriptions, NewPendingUnsubscriptions = PendingUnsubscriptions#{MsgId => {Topic, Options}}, - {reply, ok, State#state{pending_unsubscriptions=NewPendingUnsubscriptions}}; + {reply, ok, State#state{pending_unsubscriptions = NewPendingUnsubscriptions}}; Error -> {reply, Error, State} end @@ -581,80 +626,98 @@ handle_info({mqtt, error, Error}, State) -> handle_info({mqtt, published, MsgId}, State) -> ?TRACE("handle_info({mqtt, published, ~p}~n", [MsgId]), PendingPublishes = State#state.pending_publishes, - NewPendingPublishes = case maps:get(MsgId, PendingPublishes, undefined) of - undefined -> - io:format("WARNING: `published` message received but no callback was found for msg id ~p~n", [MsgId]), - PendingPublishes; - {Topic, PublishOptions} -> - ?TRACE("Found pending publish. Topic=~p PublishOptions=~p~n", [Topic, PublishOptions]), - Self = self(), - case maps:get(published_handler, PublishOptions, default) of - default -> - ok; - Pid when is_pid(Pid) -> - Pid ! {mqtt, published, Self, Topic, MsgId}; - Fun when is_function(Fun) -> - spawn(fun() -> Fun(Self, Topic, MsgId) end) - end, - maps:remove(MsgId, PendingPublishes) - end, - {noreply, State#state{pending_publishes=NewPendingPublishes}}; + NewPendingPublishes = + case maps:get(MsgId, PendingPublishes, undefined) of + undefined -> + io:format( + "WARNING: `published` message received but no callback was found for msg id ~p~n", + [MsgId] + ), + PendingPublishes; + {Topic, PublishOptions} -> + ?TRACE("Found pending publish. Topic=~p PublishOptions=~p~n", [ + Topic, PublishOptions + ]), + Self = self(), + case maps:get(published_handler, PublishOptions, default) of + default -> + ok; + Pid when is_pid(Pid) -> + Pid ! {mqtt, published, Self, Topic, MsgId}; + Fun when is_function(Fun) -> + spawn(fun() -> Fun(Self, Topic, MsgId) end) + end, + maps:remove(MsgId, PendingPublishes) + end, + {noreply, State#state{pending_publishes = NewPendingPublishes}}; handle_info({mqtt, subscribed, MsgId}, State) -> ?TRACE("handle_info({mqtt, subscribed, ~p}, State~n", [MsgId]), SubscriberMap = State#state.subscriber_map, PendingSubscriptions = State#state.pending_subscriptions, - NewSubscriberMap = case maps:get(MsgId, PendingSubscriptions, undefined) of - undefined -> - io:format("WARNING: `subscribed` message received but no pending subscription was found for msg id ~p~n", [MsgId]), - SubscriberMap; - Subscriber -> - ?TRACE("Found subscriber. Subscriber=~p~n", [Subscriber]), - Topic = Subscriber#subscriber.topic, - Self = self(), - case Subscriber#subscriber.subscribed_handler of - undefined -> - ok; - Pid when is_pid(Pid) -> - Pid ! {mqtt, subscribed, Self, Topic}; - Fun when is_function(Fun) -> - spawn(fun() -> Fun(Self, Topic) end) - end, - SubscriberMap#{Topic => Subscriber} - end, + NewSubscriberMap = + case maps:get(MsgId, PendingSubscriptions, undefined) of + undefined -> + io:format( + "WARNING: `subscribed` message received but no pending subscription was found for msg id ~p~n", + [MsgId] + ), + SubscriberMap; + Subscriber -> + ?TRACE("Found subscriber. Subscriber=~p~n", [Subscriber]), + Topic = Subscriber#subscriber.topic, + Self = self(), + case Subscriber#subscriber.subscribed_handler of + undefined -> + ok; + Pid when is_pid(Pid) -> + Pid ! {mqtt, subscribed, Self, Topic}; + Fun when is_function(Fun) -> + spawn(fun() -> Fun(Self, Topic) end) + end, + SubscriberMap#{Topic => Subscriber} + end, {noreply, State#state{ - subscriber_map=NewSubscriberMap, - pending_subscriptions=maps:remove(MsgId, PendingSubscriptions) + subscriber_map = NewSubscriberMap, + pending_subscriptions = maps:remove(MsgId, PendingSubscriptions) }}; handle_info({mqtt, unsubscribed, MsgId}, State) -> ?TRACE("handle_info({mqtt, unsubscribed, ~p}~n", [MsgId]), SubscriberMap = State#state.subscriber_map, PendingUnSubscriptions = State#state.pending_unsubscriptions, - NewSubscriberMap = case maps:get(MsgId, PendingUnSubscriptions, undefined) of - undefined -> - io:format("WARNING: `unsubscribed` message received but no pending unsubscription was found for msg id ~p~n", [MsgId]), - SubscriberMap; - {Topic, Options} -> - Self = self(), - case maps:get(unsubscribed_handler, Options, default) of - default -> - ok; - Pid when is_pid(Pid) -> - Pid ! {mqtt, unsubscribed, Self, Topic}; - Fun when is_function(Fun) -> - spawn(fun() -> Fun(Self, Topic) end) - end, - maps:remove(Topic, SubscriberMap) - end, + NewSubscriberMap = + case maps:get(MsgId, PendingUnSubscriptions, undefined) of + undefined -> + io:format( + "WARNING: `unsubscribed` message received but no pending unsubscription was found for msg id ~p~n", + [MsgId] + ), + SubscriberMap; + {Topic, Options} -> + Self = self(), + case maps:get(unsubscribed_handler, Options, default) of + default -> + ok; + Pid when is_pid(Pid) -> + Pid ! {mqtt, unsubscribed, Self, Topic}; + Fun when is_function(Fun) -> + spawn(fun() -> Fun(Self, Topic) end) + end, + maps:remove(Topic, SubscriberMap) + end, {noreply, State#state{ - subscriber_map=NewSubscriberMap, - pending_unsubscriptions=maps:remove(MsgId, PendingUnSubscriptions) + subscriber_map = NewSubscriberMap, + pending_unsubscriptions = maps:remove(MsgId, PendingUnSubscriptions) }}; handle_info({mqtt, data, Topic, Data}, State) -> ?TRACE("handle_info({mqtt, data, ~p ~p}~n", [Topic, Data]), SubscriberMap = State#state.subscriber_map, case maps:get(Topic, SubscriberMap, undefined) of undefined -> - io:format("WARNING: `data` message received but no subscriber was found for topic ~p~n", [Topic]); + io:format( + "WARNING: `data` message received but no subscriber was found for topic ~p~n", [ + Topic + ] + ); Subscriber -> Self = self(), case Subscriber#subscriber.data_handler of @@ -673,7 +736,9 @@ handle_info(Info, State) -> %% @hidden terminate(Reason, State) -> - io:format("mqtt gen_server process ~p terminated with reason ~p. State: ~p~n", [self(), Reason, State]), + io:format("mqtt gen_server process ~p terminated with reason ~p. State: ~p~n", [ + self(), Reason, State + ]), do_stop(State#state.port), ok. @@ -730,8 +795,7 @@ validate_function_or_pid_or_undefined(Key, Options) -> case maps:get(Key, Options, undefined) of undefined -> ok; Value when is_function(Value) orelse is_pid(Value) -> ok; - _ -> - throw(badarg) + _ -> throw(badarg) end. validate_qos_or_undefined(Key, Options) -> @@ -740,8 +804,7 @@ validate_qos_or_undefined(Key, Options) -> at_most_once -> ok; at_least_once -> ok; exactly_once -> ok; - _ -> - throw(badarg) + _ -> throw(badarg) end. qos_to_int(Qos) -> @@ -756,6 +819,5 @@ validate_is_boolean_or_undefined(Key, Options) -> undefined -> ok; true -> ok; false -> ok; - _ -> - throw(badarg) + _ -> throw(badarg) end.