Skip to content

Commit

Permalink
Merge pull request #243 from qzhuyan/feat/william/stream-dead-accepto…
Browse files Browse the repository at this point in the history
…r-fallback

feat: dead stream acceptor fallback to conn owner
  • Loading branch information
qzhuyan committed Dec 6, 2023
2 parents 06ac82d + c2e5fb9 commit bbf19f0
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 10 deletions.
52 changes: 42 additions & 10 deletions c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,10 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx,
// but that will buffer more in msquic stack and hit control limit.
acc->active = ACCEPTOR_RECV_MODE_PASSIVE;
}
else
{
TP_CB_3(acceptor_available, (uintptr_t)c_ctx->Connection, 0);
}

assert(acc);
acc_pid = &(acc->Pid);
Expand All @@ -1389,18 +1393,46 @@ handle_connection_event_peer_stream_started(QuicerConnCTX *c_ctx,
props_name,
props_value,
2);
if (enif_send(NULL, acc_pid, NULL, report))
{
MsQuic->SetCallbackHandler(
Event->PEER_STREAM_STARTED.Stream, stream_callback, s_ctx);
// We should return success only when callback is set
return QUIC_STATUS_SUCCESS;
}
else
if (!enif_send(NULL, acc_pid, NULL, report))
{
// NOTE: we must return non sucess status
return QUIC_STATUS_UNREACHABLE;
if (is_orphan)
{
// Connection acceptor is dead
// we don't need to destroy acceptor, we don't have the ownwership
return QUIC_STATUS_UNREACHABLE;
}
else
{
TP_CB_3(acceptor_down_fallback, (uintptr_t)c_ctx->Connection, 0);
// Lets try the the connection owner
//
// Destroy this dead acceptor
AcceptorDestroy(acc);
// Set is_orphan to true, connection owner takeover
props_value[1] = ATOM_TRUE;
acc = AcceptorAlloc();
CxPlatCopyMemory(acc, c_ctx->owner, sizeof(ACCEPTOR));
s_ctx->owner = acc;
// this is our protocol
acc->active = ACCEPTOR_RECV_MODE_PASSIVE;
acc_pid = &(acc->Pid);

report = make_event_with_props(env,
ATOM_NEW_STREAM,
enif_make_resource(env, s_ctx),
props_name,
props_value,
2);
if (!enif_send(NULL, acc_pid, NULL, report))
{
// Sad...
return QUIC_STATUS_UNREACHABLE;
}
}
}
MsQuic->SetCallbackHandler(
Event->PEER_STREAM_STARTED.Stream, stream_callback, s_ctx);
return QUIC_STATUS_SUCCESS;
}

static QUIC_STATUS
Expand Down
88 changes: 88 additions & 0 deletions test/quicer_snb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
-export([ tc_app_echo_server/1,
tc_slow_conn/1,
tc_stream_owner_down/1,
tc_stream_acceptor_down/1,
tc_conn_owner_down/1,
tc_conn_close_flag_1/1,
tc_conn_close_flag_2/1,
Expand Down Expand Up @@ -182,6 +183,7 @@ all() ->
[ tc_app_echo_server
, tc_slow_conn
, tc_stream_owner_down
, tc_stream_acceptor_down
, tc_conn_owner_down
, tc_conn_close_flag_1
, tc_conn_close_flag_2
Expand Down Expand Up @@ -402,6 +404,92 @@ tc_stream_owner_down(Config) ->
ok.


tc_stream_acceptor_down(Config) ->
ServerConnCallback = example_server_connection,
ServerStreamCallback = example_server_stream,
Port = select_port(),
application:ensure_all_started(quicer),
ListenerOpts = [{conn_acceptors, 32}, {peer_bidi_stream_count, 10},
{peer_unidi_stream_count, 0} | default_listen_opts(Config)],
ConnectionOpts = [ {conn_callback, ServerConnCallback}
, {stream_acceptors, 2}
| default_conn_opts()],
StreamOpts = [ {stream_callback, ServerStreamCallback}
| default_stream_opts() ],
Options = {ListenerOpts, ConnectionOpts, StreamOpts},
ct:pal("Listener Options: ~p", [Options]),
?check_trace(#{timetrap => 10000},
begin
{ok, _QuicApp} = quicer:spawn_listener(mqtt, Port, Options),
{ok, Conn} = quicer:connect("localhost", Port,
[{peer_bidi_stream_count, 10}, {peer_unidi_stream_count, 1} | default_conn_opts()], 5000),
{ok, Stm} = quicer:start_stream(Conn, [{active, true}]),
{ok, Stm2} = quicer:start_stream(Conn, [{active, true}]),
{ok, 5} = quicer:async_send(Stm, <<"ping1">>),
ct:pal("ping1 sent"),
{ok, 5} = quicer:async_send(Stm2, <<"ping2">>),
ct:pal("ping2 sent"),
receive
{quic, <<"ping1">>, Stm, _} -> ok
after 100 -> ct:fail("no ping1")
end,
receive
{quic, <<"ping2">>, Stm2, _} -> ok
after 100 -> ct:fail("no ping2")
end,
{DeadPid, DMRef } = spawn_monitor(fun() ->
quicer:async_accept_stream(Conn, [])
end),
%% GIVEN: one remote stream acceptor is DOWN
receive
{'DOWN', DMRef, process, DeadPid, normal} -> ok
after 500 -> ct:fail("no DOWN message for dead pid")
end,
%% WHEN: We trigger peer (server) to initiate remote stream to us
{ok, Stm3Out} = quicer:start_stream(Conn, [{active, true}, {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}]),
quicer:async_send(Stm3Out, <<"ping3">>),
Stm3In = receive
%% THEN: This process is selected as stream owner fallback (is_orphan = true)
{quic, new_stream, Incoming, #{flags := Flag, is_orphan := true}} ->
ct:pal("incoming stream from server: ~p", [Incoming]),
true = quicer:is_unidirectional(Flag),
quicer:setopt(Incoming, active, true),
Incoming
after 1000 ->
ct:fail("no incoming stream")
end,
receive
{quic, Data, Stm3In, DFlag} ->
ct:pal("~p is received from ~p with flag: ~p", [Data, Stm3In, DFlag]),
?assertEqual(Data, <<"ping3">>)
after 1000 ->
ct:fail("no incoming data")
end,
quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
receive
{quic, closed, Conn, _} ->
ct:pal("Connecion is closed")
end,
quicer:shutdown_connection(Conn)
end,
fun(_Result, Trace) ->
ct:pal("Trace is ~p", [Trace]),
%% check that acceptor is first picked but it is down and fallback is triggered
?assert(?causality(#{ ?snk_kind := debug
, function := "handle_connection_event_peer_stream_started"
, tag := "acceptor_available"
, resource_id := _Rid
},
#{ ?snk_kind := debug
, function := "handle_connection_event_peer_stream_started"
, tag := "acceptor_down_fallback"
, resource_id := _Rid
},
Trace))
end),
ok.


tc_conn_owner_down(Config) ->
Port = select_port(),
ListenerOpts = [{conn_acceptors, 32} | default_listen_opts(Config)],
Expand Down

0 comments on commit bbf19f0

Please sign in to comment.