Skip to content

Commit

Permalink
Merge pull request #5792 from rickard-green/rickard/async-response-im…
Browse files Browse the repository at this point in the history
…provement/OTP-17784

Improve response handling for asynchronous gen and erpc requests
  • Loading branch information
rickard-green committed Mar 21, 2022
2 parents 70376ac + cb67a70 commit bdd6b69
Show file tree
Hide file tree
Showing 14 changed files with 4,325 additions and 491 deletions.
701 changes: 585 additions & 116 deletions lib/kernel/doc/src/erpc.xml

Large diffs are not rendered by default.

347 changes: 306 additions & 41 deletions lib/kernel/src/erpc.erl

Large diffs are not rendered by default.

332 changes: 331 additions & 1 deletion lib/kernel/test/erpc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
send_request_wait_reqtmo/1,
send_request_check_reqtmo/1,
send_request_against_ei_node/1,
send_request_receive_reqid_collection/1,
send_request_wait_reqid_collection/1,
send_request_check_reqid_collection/1,
multicall/1, multicall_reqtmo/1,
multicall_recv_opt/1,
multicall_recv_opt2/1,
Expand Down Expand Up @@ -56,7 +59,7 @@ suite() ->
[{ct_hooks,[ts_install_cth]},
{timetrap,{minutes,2}}].

all() ->
all() ->
[call,
call_against_old_node,
call_from_old_node,
Expand All @@ -69,6 +72,9 @@ all() ->
send_request_wait_reqtmo,
send_request_check_reqtmo,
send_request_against_ei_node,
send_request_receive_reqid_collection,
send_request_wait_reqid_collection,
send_request_check_reqid_collection,
multicall,
multicall_reqtmo,
multicall_recv_opt,
Expand Down Expand Up @@ -937,6 +943,327 @@ send_request_against_ei_node(Config) when is_list(Config) ->

ok = stop_ei_node(EiNode).

send_request_receive_reqid_collection(Config) when is_list(Config) ->
{ok, _P, N} = ?CT_PEER(#{connection => 0}),
send_request_receive_reqid_collection_success(N),
send_request_receive_reqid_collection_timeout(N),
send_request_receive_reqid_collection_error(N),
ok.

send_request_receive_reqid_collection_success(N) ->

ReqId0 = erpc:send_request(N, fun () -> 0 end),

ReqIdC0 = erpc:reqids_new(),

ReqId1 = erpc:send_request(N, fun () -> receive after 400 -> 400 end end),
ReqIdC1 = erpc:reqids_add(ReqId1, req1, ReqIdC0),
1 = erpc:reqids_size(ReqIdC1),

ReqIdC2 = erpc:send_request(N, fun () -> receive after 1 -> 1 end end, req2, ReqIdC1),
2 = erpc:reqids_size(ReqIdC2),

ReqIdC3 = erpc:send_request(N, fun () -> receive after 200 -> 200 end end, req3, ReqIdC2),
3 = erpc:reqids_size(ReqIdC3),

{1, req2, ReqIdC4} = erpc:receive_response(ReqIdC3, infinity, true),
2 = erpc:reqids_size(ReqIdC4),

{200, req3, ReqIdC5} = erpc:receive_response(ReqIdC4, 7654, true),
1 = erpc:reqids_size(ReqIdC5),

{400, req1, ReqIdC6} = erpc:receive_response(ReqIdC5, 5000, true),
0 = erpc:reqids_size(ReqIdC6),

no_request = erpc:receive_response(ReqIdC6, 5000, true),

0 = erpc:receive_response(ReqId0),

ok.

send_request_receive_reqid_collection_timeout(N) ->

ReqId0 = erpc:send_request(N, fun () -> 0 end),

ReqIdC0 = erpc:reqids_new(),

ReqId1 = erpc:send_request(N, fun () -> receive after 1000 -> 1000 end end),
ReqIdC1 = erpc:reqids_add(ReqId1, req1, ReqIdC0),

ReqIdC2 = erpc:send_request(N, fun () -> receive after 1 -> 1 end end, req2, ReqIdC1),

ReqId3 = erpc:send_request(N, fun () -> receive after 500 -> 500 end end),
ReqIdC3 = erpc:reqids_add(ReqId3, req3, ReqIdC2),

Deadline = erlang:monotonic_time(millisecond) + 100,

{1, req2, ReqIdC4} = erpc:receive_response(ReqIdC3, {abs, Deadline}, true),
2 = erpc:reqids_size(ReqIdC4),

try not_valid = erpc:receive_response(ReqIdC4, {abs, Deadline}, true)
catch error:{erpc, timeout} -> ok
end,

Abandoned = lists:sort([{ReqId1, req1}, {ReqId3, req3}]),
Abandoned = lists:sort(erpc:reqids_to_list(ReqIdC4)),

%% Make sure requests were abandoned...
try not_valid = erpc:receive_response(ReqIdC4, {abs, Deadline+1000}, false)
catch error:{erpc, timeout} -> ok
end,

0 = erpc:receive_response(ReqId0, infinity),

ok.

send_request_receive_reqid_collection_error(N) ->

ReqId0 = erpc:send_request(N, fun () -> 0 end),

ReqIdC0 = erpc:reqids_new(),

ReqId1 = erpc:send_request(N, fun () -> receive after 600 -> 600 end end),
ReqIdC1 = erpc:reqids_add(ReqId1, req1, ReqIdC0),
try
nope = erpc:reqids_add(ReqId1, req2, ReqIdC1)
catch
error:{erpc, badarg} -> ok
end,

ReqIdC2 = erpc:send_request(N, fun () -> receive after 800 -> erlang:halt() end end, req2, ReqIdC1),
ReqIdC3 = erpc:send_request(N, fun () -> receive after 200 -> error(errored) end end, req3, ReqIdC2),
ReqIdC4 = erpc:send_request(N, fun () -> exit(exited) end, req4, ReqIdC3),
ReqIdC5 = erpc:send_request(N, fun () -> receive after 400 -> throw(thrown) end end, req5, ReqIdC4),

5 = erpc:reqids_size(ReqIdC5),

ReqIdC6 = try not_valid = erpc:receive_response(ReqIdC5, infinity, true)
catch exit:{{exception, exited}, req4, RIC6} -> RIC6
end,

4 = erpc:reqids_size(ReqIdC6),

try not_valid = erpc:receive_response(ReqIdC6, 2000, false)
catch error:{{exception, errored, _Stk}, req3, _} -> ok
end,

try not_valid = erpc:receive_response(ReqIdC6, infinity, false)
catch throw:{thrown, req5, _} -> ok
end,

{600, req1, ReqIdC6} = erpc:receive_response(ReqIdC6, infinity, false),

try not_valid = erpc:receive_response(ReqIdC6, 5000, false)
catch error:{{erpc, noconnection}, req2, ReqIdC6} -> ok
end,

0 = erpc:receive_response(ReqId0),

ok.

send_request_wait_reqid_collection(Config) when is_list(Config) ->
{ok, _P, N} = ?CT_PEER(#{connection => 0}),
send_request_wait_reqid_collection_success(N),
send_request_wait_reqid_collection_timeout(N),
send_request_wait_reqid_collection_error(N),
ok.

send_request_wait_reqid_collection_success(N) ->

ReqId0 = erpc:send_request(N, fun () -> 0 end),

ReqIdC0 = erpc:reqids_new(),

ReqId1 = erpc:send_request(N, fun () -> receive after 400 -> 400 end end),
ReqIdC1 = erpc:reqids_add(ReqId1, req1, ReqIdC0),
1 = erpc:reqids_size(ReqIdC1),

ReqIdC2 = erpc:send_request(N, fun () -> receive after 1 -> 1 end end, req2, ReqIdC1),
2 = erpc:reqids_size(ReqIdC2),

ReqIdC3 = erpc:send_request(N, fun () -> receive after 200 -> 200 end end, req3, ReqIdC2),
3 = erpc:reqids_size(ReqIdC3),

{{response, 1}, req2, ReqIdC4} = erpc:wait_response(ReqIdC3, infinity, true),
2 = erpc:reqids_size(ReqIdC4),

{{response, 200}, req3, ReqIdC5} = erpc:wait_response(ReqIdC4, 7654, true),
1 = erpc:reqids_size(ReqIdC5),

{{response, 400}, req1, ReqIdC6} = erpc:wait_response(ReqIdC5, 5000, true),
0 = erpc:reqids_size(ReqIdC6),

no_request = erpc:wait_response(ReqIdC6, 5000, true),

{response, 0} = erpc:wait_response(ReqId0),

ok.

send_request_wait_reqid_collection_timeout(N) ->

ReqId0 = erpc:send_request(N, fun () -> 0 end),

ReqIdC0 = erpc:reqids_new(),

ReqIdC0 = erpc:reqids_new(),

ReqId1 = erpc:send_request(N, fun () -> receive after 1000 -> 1000 end end),
ReqIdC1 = erpc:reqids_add(ReqId1, req1, ReqIdC0),

ReqIdC2 = erpc:send_request(N, fun () -> receive after 1 -> 1 end end, req2, ReqIdC1),

ReqId3 = erpc:send_request(N, fun () -> receive after 500 -> 500 end end),
ReqIdC3 = erpc:reqids_add(ReqId3, req3, ReqIdC2),

Deadline = erlang:monotonic_time(millisecond) + 100,

{{response, 1}, req2, ReqIdC4} = erpc:wait_response(ReqIdC3, {abs, Deadline}, true),
2 = erpc:reqids_size(ReqIdC4),

no_response = erpc:wait_response(ReqIdC4, {abs, Deadline}, true),

Unhandled = lists:sort([{ReqId1, req1}, {ReqId3, req3}]),
Unhandled = lists:sort(erpc:reqids_to_list(ReqIdC4)),

%% Make sure requests were not abandoned...
{{response, 500}, req3, ReqIdC4} = erpc:wait_response(ReqIdC4, {abs, Deadline+1500}, false),
{{response, 1000}, req1, ReqIdC4} = erpc:wait_response(ReqIdC4, {abs, Deadline+1500}, false),

{response, 0} = erpc:wait_response(ReqId0, infinity),

ok.

send_request_wait_reqid_collection_error(N) ->

ReqId0 = erpc:send_request(N, fun () -> 0 end),

ReqIdC0 = erpc:reqids_new(),

ReqId1 = erpc:send_request(N, fun () -> receive after 600 -> 600 end end),
ReqIdC1 = erpc:reqids_add(ReqId1, req1, ReqIdC0),
try
nope = erpc:reqids_add(ReqId1, req2, ReqIdC1)
catch
error:{erpc, badarg} -> ok
end,

ReqIdC2 = erpc:send_request(N, fun () -> receive after 800 -> erlang:halt() end end, req2, ReqIdC1),
ReqIdC3 = erpc:send_request(N, fun () -> receive after 200 -> error(errored) end end, req3, ReqIdC2),
ReqIdC4 = erpc:send_request(N, fun () -> exit(exited) end, req4, ReqIdC3),
ReqIdC5 = erpc:send_request(N, fun () -> receive after 400 -> throw(thrown) end end, req5, ReqIdC4),

5 = erpc:reqids_size(ReqIdC5),

ReqIdC6 = try not_valid = erpc:wait_response(ReqIdC5, infinity, true)
catch exit:{{exception, exited}, req4, RIC6} -> RIC6
end,

4 = erpc:reqids_size(ReqIdC6),

try not_valid = erpc:wait_response(ReqIdC6, 2000, false)
catch error:{{exception, errored, _Stk}, req3, _} -> ok
end,

try not_valid = erpc:wait_response(ReqIdC6, infinity, false)
catch throw:{thrown, req5, _} -> ok
end,

{{response, 600}, req1, ReqIdC6} = erpc:wait_response(ReqIdC6, infinity, false),

try not_valid = erpc:wait_response(ReqIdC6, 5000, false)
catch error:{{erpc, noconnection}, req2, ReqIdC6} -> ok
end,

{response, 0} = erpc:wait_response(ReqId0),

ok.

send_request_check_reqid_collection(Config) when is_list(Config) ->
{ok, _P, N} = ?CT_PEER(#{connection => 0}),
send_request_check_reqid_collection_success(N),
send_request_check_reqid_collection_error(N),
ok.

send_request_check_reqid_collection_success(N) ->

ReqId0 = erpc:send_request(N, fun () -> 0 end),

ReqIdC0 = erpc:reqids_new(),

ReqIdC1 = erpc:send_request(N, fun () -> receive after 600 -> 600 end end, req1, ReqIdC0),
1 = erpc:reqids_size(ReqIdC1),

ReqId2 = erpc:send_request(N, fun () -> receive after 200 -> 200 end end),
ReqIdC2 = erpc:reqids_add(ReqId2, req2, ReqIdC1),
2 = erpc:reqids_size(ReqIdC2),

ReqIdC3 = erpc:send_request(N, fun () -> receive after 400 -> 400 end end, req3, ReqIdC2),
3 = erpc:reqids_size(ReqIdC3),

Msg0 = next_msg(),

no_response = erpc:check_response(Msg0, ReqIdC3, true),

{{response, 200}, req2, ReqIdC4} = erpc:check_response(next_msg(), ReqIdC3, true),
2 = erpc:reqids_size(ReqIdC4),

{{response, 400}, req3, ReqIdC5} = erpc:check_response(next_msg(), ReqIdC4, true),
1 = erpc:reqids_size(ReqIdC5),

{{response, 600}, req1, ReqIdC6} = erpc:check_response(next_msg(), ReqIdC5, true),
0 = erpc:reqids_size(ReqIdC6),

no_request = erpc:check_response(Msg0, ReqIdC6, true),

{response, 0} = erpc:check_response(Msg0, ReqId0),

ok.

send_request_check_reqid_collection_error(N) ->

ReqId0 = erpc:send_request(N, fun () -> 0 end),

ReqIdC0 = erpc:reqids_new(),

ReqId1 = erpc:send_request(N, fun () -> receive after 600 -> 600 end end),
ReqIdC1 = erpc:reqids_add(ReqId1, req1, ReqIdC0),

ReqIdC2 = erpc:send_request(N, fun () -> receive after 800 -> erlang:halt() end end, req2, ReqIdC1),

ReqIdC3 = erpc:send_request(N, fun () -> receive after 200 -> error(errored) end end, req3, ReqIdC2),

ReqIdC4 = erpc:send_request(N, fun () -> exit(exited) end, req4, ReqIdC3),

ReqIdC5 = erpc:send_request(N, fun () -> receive after 400 -> throw(thrown) end end, req5, ReqIdC4),

5 = erpc:reqids_size(ReqIdC5),

Msg0 = next_msg(),

no_response = erpc:check_response(Msg0, ReqIdC5, true),

ReqIdC6 = try not_valid = erpc:check_response(next_msg(), ReqIdC5, true)
catch exit:{{exception, exited}, req4, RIC6} -> RIC6
end,

try not_valid = erpc:check_response(next_msg(), ReqIdC6, false)
catch error:{{exception, errored, _Stk}, req3, ReqIdC6} -> ok
end,

try not_valid = erpc:check_response(next_msg(), ReqIdC6, false)
catch throw:{thrown, req5, ReqIdC6} -> ok
end,

{{response, 600}, req1, ReqIdC6} = erpc:check_response(next_msg(), ReqIdC6, false),

try not_valid = erpc:check_response(next_msg(), ReqIdC6, false)
catch error:{{erpc, noconnection}, req2, ReqIdC6} -> ok
end,

{response, 0} = erpc:check_response(Msg0, ReqId0),

ok.

multicall(Config) ->
{ok, _Peer1, Node1} = ?CT_PEER(#{connection => 0}),
{ok, Peer2, Node2} = ?CT_PEER(#{connection => 0}),
Expand Down Expand Up @@ -1743,6 +2070,9 @@ f2() ->
timer:sleep(500),
halt().

next_msg() ->
receive M -> M end.

flush_msgq() ->
flush_msgq(0).
flush_msgq(N) ->
Expand Down
Loading

0 comments on commit bdd6b69

Please sign in to comment.