Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/mqtt_client_example/.gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
src/config.erl
_build/**
_checkouts/**
src/config.erl
rebar.lock
2 changes: 1 addition & 1 deletion examples/mqtt_client_example/rebar.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{erl_opts, [debug_info]}.
{deps, [
{mqtt_client, {git, "https://github.com/atomvm/atomvm_mqtt_client.git", {branch, "master"}}}
{atomvm_mqtt_client, {git, "https://github.com/atomvm/atomvm_mqtt_client.git", {branch, "master"}}}
]}.
{plugins, [atomvm_rebar3_plugin]}.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{application, mqtt_client_example, [
{description, "An OTP library"},
{description, "An AtomVM application"},
{vsn, "0.1.0"},
{registered, []},
{applications, [
Expand Down
46 changes: 26 additions & 20 deletions examples/mqtt_client_example/src/mqtt_client_example.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%
%% Copyright (c) 2021 dushin.net
%% Copyright (c) 2021-2023 dushin.net
%% All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -33,16 +33,8 @@ start() ->
{ok, _MQTT} = mqtt_client:start(Config),
io:format("MQTT started.~n"),

loop_forever().
timer:sleep(infinity).

loop_forever() ->
receive
halt -> halt
end.

%%
%% connected callback. This function will be called
%%
handle_connected(MQTT) ->
Config = mqtt_client:get_config(MQTT),
Topic = <<"atomvm/qos0">>,
Expand All @@ -60,15 +52,10 @@ handle_subscribed(MQTT, Topic) ->

handle_data(_MQTT, Topic, Data) ->
io:format("Received data on topic ~p: ~p ~n", [Topic, Data]),
% io:format("Pending publishes: ~p~n", [mqtt_client:get_pending_publishes(MQTT)]),
% io:format("Pending subscriptions: ~p~n", [mqtt_client:get_pending_subscriptions(MQTT)]),
% io:format("Pending unsubscriptions: ~p~n", [mqtt_client:get_pending_unsubscriptions(MQTT)]),
io:format("process count: ~p~n", [erlang:system_info(process_count)]),
io:format("Free heap on handle_data: ~p~n", [erlang:system_info(esp32_free_heap_size)]),
ok.

start_network(StaConfig) ->
case network_fsm:wait_for_sta(StaConfig) of
case network:wait_for_sta(StaConfig) of
{ok, {Address, Netmask, Gateway}} ->
io:format(
"Acquired IP address: ~s Netmask: ~s Gateway: ~s~n",
Expand All @@ -81,8 +68,27 @@ start_network(StaConfig) ->

publish_loop(MQTT, Topic, Seq) ->
io:format("Publishing data on topic ~p~n", [Topic]),
_ = mqtt_client:publish(MQTT, Topic, list_to_binary("echo" ++ integer_to_list(Seq))),
timer:sleep(5000),
io:format("process count: ~p~n", [erlang:system_info(process_count)]),
io:format("Free heap after publish: ~p~n", [erlang:system_info(esp32_free_heap_size)]),
try
Self = self(),
HandlePublished = fun(MQTT2, Topic2, MsgId) ->
Self ! published,
handle_published(MQTT2, Topic2, MsgId)
end,
PublishOptions = #{qos => at_least_once, published_handler => HandlePublished},
Msg = list_to_binary("echo" ++ integer_to_list(Seq)),
_ = mqtt_client:publish(MQTT, Topic, Msg, PublishOptions),
receive
published ->
ok
after 10000 ->
io:format("Timed out waiting for publish ack~n")
end
catch
C:E:S ->
io:format("Error in publish: ~p:~p~p~n", [C, E, S])
end,
timer:sleep(1000),
publish_loop(MQTT, Topic, Seq + 1).

handle_published(MQTT, Topic, MsgId) ->
io:format("MQTT ~p published to topic ~p msg_id=~p~n", [MQTT, Topic, MsgId]).
Loading