Skip to content

Commit

Permalink
Merge pull request #12514 from savonarola/0214-fix-ft-responses
Browse files Browse the repository at this point in the history
fix(ft): report ft assemble status from a dedicated process
  • Loading branch information
savonarola committed Apr 25, 2024
2 parents 2cbf4dc + 05f5444 commit c425835
Show file tree
Hide file tree
Showing 10 changed files with 407 additions and 476 deletions.
2 changes: 1 addition & 1 deletion apps/emqx_ft/src/emqx_ft.app.src
@@ -1,6 +1,6 @@
{application, emqx_ft, [
{description, "EMQX file transfer over MQTT"},
{vsn, "0.1.14"},
{vsn, "0.1.15"},
{registered, []},
{mod, {emqx_ft_app, []}},
{applications, [
Expand Down
176 changes: 76 additions & 100 deletions apps/emqx_ft/src/emqx_ft.erl
Expand Up @@ -30,10 +30,7 @@

-export([
on_message_publish/1,
on_message_puback/4,
on_client_timeout/3,
on_process_down/4,
on_channel_unregistered/1
on_message_puback/4
]).

-export([
Expand Down Expand Up @@ -88,8 +85,6 @@
checksum => checksum()
}.

-define(FT_EVENT(EVENT), {?MODULE, EVENT}).

-define(ACK_AND_PUBLISH(Result), {true, Result}).
-define(ACK(Result), {false, Result}).
-define(DELAY_ACK, delay).
Expand All @@ -100,21 +95,11 @@

hook() ->
ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST),
ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST),
ok = emqx_hooks:put('client.timeout', {?MODULE, on_client_timeout, []}, ?HP_LOWEST),
ok = emqx_hooks:put(
'client.monitored_process_down', {?MODULE, on_process_down, []}, ?HP_LOWEST
),
ok = emqx_hooks:put(
'cm.channel.unregistered', {?MODULE, on_channel_unregistered, []}, ?HP_LOWEST
).
ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST).

unhook() ->
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}),
ok = emqx_hooks:del('client.timeout', {?MODULE, on_client_timeout}),
ok = emqx_hooks:del('client.monitored_process_down', {?MODULE, on_process_down}),
ok = emqx_hooks:del('cm.channel.unregistered', {?MODULE, on_channel_unregistered}).
ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}).

%%--------------------------------------------------------------------
%% API
Expand Down Expand Up @@ -152,40 +137,6 @@ on_message_puback(PacketId, #message{from = From, topic = Topic} = Msg, _PubRes,
ignore
end.

on_channel_unregistered(ChannelPid) ->
ok = emqx_ft_async_reply:deregister_all(ChannelPid).

on_client_timeout(_TRef0, ?FT_EVENT({MRef, TopicReplyData}), Acc) ->
_ = erlang:demonitor(MRef, [flush]),
Result = {error, timeout},
_ = publish_response(Result, TopicReplyData),
case emqx_ft_async_reply:take_by_mref(MRef) of
{ok, undefined, _TRef1, _TopicReplyData} ->
{stop, Acc};
{ok, PacketId, _TRef1, _TopicReplyData} ->
{stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, result_to_rc(Result))) | Acc]};
not_found ->
{ok, Acc}
end;
on_client_timeout(_TRef, _Event, Acc) ->
{ok, Acc}.

on_process_down(MRef, _Pid, DownReason, Acc) ->
case emqx_ft_async_reply:take_by_mref(MRef) of
{ok, PacketId, TRef, TopicReplyData} ->
_ = emqx_utils:cancel_timer(TRef),
Result = down_reason_to_result(DownReason),
_ = publish_response(Result, TopicReplyData),
case PacketId of
undefined ->
{stop, Acc};
_ ->
{stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, result_to_rc(Result))) | Acc]}
end;
not_found ->
{ok, Acc}
end.

%%--------------------------------------------------------------------
%% Handlers for transfer messages
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -253,7 +204,7 @@ on_init(#{packet_id := PacketId}, Msg, Transfer, Meta) ->
filemeta => Meta
}),
%% Currently synchronous.
%% If we want to make it async, we need to use `emqx_ft_async_reply`,
%% If we want to make it async, we need to use `with_responder`,
%% like in `on_fin`.
?ACK_AND_PUBLISH(store_filemeta(Transfer, Meta)).

Expand All @@ -271,51 +222,92 @@ on_segment(#{packet_id := PacketId}, Msg, Transfer, Offset, Checksum) ->
}),
Segment = {Offset, Msg#message.payload},
%% Currently synchronous.
%% If we want to make it async, we need to use `emqx_ft_async_reply`,
%% If we want to make it async, we need to use `with_responder`,
%% like in `on_fin`.
?ACK_AND_PUBLISH(store_segment(Transfer, Segment)).

on_fin(#{packet_id := PacketId} = TopicReplyData, Msg, Transfer, FinalSize, FinalChecksum) ->
on_fin(
#{packet_id := PacketId, mode := Mode} = TopicReplyData, Msg, Transfer, FinalSize, FinalChecksum
) ->
?tp(info, "file_transfer_fin", #{
mqtt_msg => Msg,
packet_id => PacketId,
transfer => Transfer,
final_size => FinalSize,
checksum => FinalChecksum
}),
%% TODO: handle checksum? Do we need it?
with_new_packet(
TopicReplyData,
PacketId,
fun() ->
case assemble(Transfer, FinalSize, FinalChecksum) of
ok ->
?ACK_AND_PUBLISH(ok);
%% Assembling started, packet will be acked/replied by monitor or timeout
{async, Pid} ->
register_async_worker(Pid, TopicReplyData);
{error, _} = Error ->
?ACK_AND_PUBLISH(Error)
end
FinPacketKey = {self(), PacketId},
Callback = fun(Result) ->
on_complete("assemble", TopicReplyData, FinPacketKey, Transfer, Result)
end,
with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() ->
case assemble(Transfer, FinalSize, FinalChecksum) of
%% Assembling completed, ack through the responder right away
ok ->
emqx_ft_responder:ack(FinPacketKey, ok),
?DELAY_ACK;
%% Assembling started, packet will be acked by the responder
{async, Pid} ->
ok = emqx_ft_responder:kickoff(FinPacketKey, Pid),
ack_if_async(Mode);
%% Assembling failed, ack through the responder
{error, _} = Error ->
emqx_ft_responder:ack(FinPacketKey, Error),
?DELAY_ACK
end
).

register_async_worker(Pid, #{mode := Mode, packet_id := PacketId} = TopicReplyData) ->
MRef = erlang:monitor(process, Pid),
TRef = erlang:start_timer(
emqx_ft_conf:assemble_timeout(), self(), ?FT_EVENT({MRef, TopicReplyData})
),
case Mode of
async ->
ok = emqx_ft_async_reply:register(MRef, TRef, TopicReplyData),
ok = emqx_ft_storage:kickoff(Pid),
?ACK(ok);
sync ->
ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, TopicReplyData),
ok = emqx_ft_storage:kickoff(Pid),
end).

with_responder(Key, Callback, Timeout, CriticalSection) ->
case emqx_ft_responder:start(Key, Callback, Timeout) of
%% We have new packet
{ok, _} ->
CriticalSection();
%% Packet already received.
%% Since we are still handling the previous one,
%% we probably have retransmit here
{error, {already_started, _}} ->
?DELAY_ACK
end.

ack_if_async(sync) ->
?DELAY_ACK;
ack_if_async(async) ->
?ACK(ok).

on_complete(Op, #{mode := Mode} = TopicReplyData, {ChanPid, PacketId}, Transfer, ResponderResult) ->
?tp(debug, "on_complete", #{
operation => Op,
packet_id => PacketId,
transfer => Transfer
}),
Result =
case ResponderResult of
{RespMode, ok} when RespMode == ack orelse RespMode == down ->
ok;
{RespMode, {error, _} = Reason} when RespMode == ack orelse RespMode == down ->
?tp(error, Op ++ "_failed", #{
transfer => Transfer,
reason => Reason
}),
Reason;
timeout ->
?tp(error, Op ++ "_timed_out", #{
transfer => Transfer
}),
{error, timeout}
end,
NeedAck =
case {ResponderResult, Mode} of
{{down, _}, async} -> false;
{timeout, async} -> false;
_ -> true
end,
NeedAck andalso ack_packet(ChanPid, PacketId, Result),
_ = publish_response(Result, TopicReplyData).

ack_packet(ChanPid, PacketId, Result) ->
erlang:send(ChanPid, {puback, PacketId, [], result_to_rc(Result)}).

topic_reply_data(Mode, From, PacketId, #message{topic = Topic, headers = Headers}) ->
Props = maps:get(properties, Headers, #{}),
#{
Expand Down Expand Up @@ -483,19 +475,3 @@ clientid_to_binary(A) when is_atom(A) ->
atom_to_binary(A);
clientid_to_binary(B) when is_binary(B) ->
B.

down_reason_to_result(normal) ->
ok;
down_reason_to_result(shutdown) ->
ok;
down_reason_to_result({shutdown, Result}) ->
Result;
down_reason_to_result(noproc) ->
{error, noproc};
down_reason_to_result(Error) ->
{error, {internal_error, Error}}.

with_new_packet(#{mode := async}, _PacketId, Fun) ->
Fun();
with_new_packet(#{mode := sync}, PacketId, Fun) ->
emqx_ft_async_reply:with_new_packet(PacketId, Fun, undefined).
122 changes: 0 additions & 122 deletions apps/emqx_ft/src/emqx_ft_async_reply.erl

This file was deleted.

0 comments on commit c425835

Please sign in to comment.