Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: future packet buffer #173

Merged
merged 4 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ xref:
$(REBAR) xref

.PHONY: eunit
eunit: compile
$(REBAR) eunit verbose=truen
eunit:
$(REBAR) eunit -v -c --cover_export_name eunit

.PHONY: ct
ct:
QUICER_USE_SNK=1 $(REBAR) as test ct -v

.PHONY: cover
cover:
cover: eunit
mkdir -p coverage
QUICER_TEST_COVER=1 QUICER_USE_SNK=1 $(REBAR) as test ct --cover -v
$(REBAR) cover
QUICER_TEST_COVER=1 QUICER_USE_SNK=1 $(REBAR) as test ct --cover --cover_export_name=ct -v
$(REBAR) as test cover -v
lcov -c --directory c_build/CMakeFiles/quicer_nif.dir/c_src/ \
--output-file ./coverage/lcov.info

Expand All @@ -51,7 +51,7 @@ dialyzer:
$(REBAR) dialyzer

.PHONY: test
test: ct
test: eunit ct

.PHONY: check
check: clang-format
Expand Down
7 changes: 7 additions & 0 deletions include/quicer.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,11 @@
-define(QUIC_RECEIVE_FLAG_0_RTT , 16#0001).
-define(QUIC_RECEIVE_FLAG_FIN , 16#0002).

-record(quic_data, {
offset = 0 :: non_neg_integer(),
size = 0 :: non_neg_integer(),
flags = 0 :: integer(),
bin :: binary()
}).

-endif. %% QUICER_HRL
15 changes: 15 additions & 0 deletions include/quicer_types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
, open_flag => stream_open_flags()
, start_flag => stream_start_flags()
, event_mask => uint32()
, disable_fpbuffer => boolean()
}. %% @TODO expand

-type stream_open_flags() :: ?QUIC_STREAM_OPEN_FLAG_NONE |
Expand Down Expand Up @@ -345,4 +346,18 @@
%% @doc addr in quicer, IP and Port
-type quicer_addr() :: string().

%% @doc quic_data fragment with offset index
-type ifrag() :: {Index::non_neg_integer(), quic_data()}.


-type quic_data_buffer() :: ordsets:ordset(ifrag()).

%% @doc future packet buffer
-type fpbuffer() :: #{ next_offset := non_neg_integer()
, buffer := quic_data_buffer()
}.

%% @doc binary data with offset and size info
-type quic_data() :: #quic_data{}.

-endif. %% QUICER_TYPES_HRL
9 changes: 6 additions & 3 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@

{profiles,
[ {test,
[{erl_opts, [{d, 'SNK_COLLECTOR'}]}]}
, {doc,
[ {erl_opts, [{d, 'SNK_COLLECTOR'}]}
, {src_dirs, ["src", "test/example"] }
]
},
{doc,
[{plugins, [rebar3_hex, rebar3_ex_doc]},
{ex_doc, [
{extras, [ "README.md"
Expand All @@ -37,6 +40,6 @@
{plugins , [coveralls]}. % use hex package
{cover_enabled , true}.
{cover_export_enabled , true}.
{coveralls_coverdata , "_build/test/cover/ct.coverdata"}. % or a string with wildcards or a list of files
{coveralls_coverdata , "_build/test/cover/*.coverdata"}. % or a string with wildcards or a list of files
{coveralls_service_name , "github"}.
{coveralls_parallel, true}.
156 changes: 156 additions & 0 deletions src/quicer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@
%% helpers
-export([ %% Stream flags tester
is_unidirectional/1
%% Future Packets Buffering
, quic_data/1
, merge_quic_datalist/1
, new_fpbuffer/0
, new_fpbuffer/1
, update_fpbuffer/2
, defrag_fpbuffer/2
]).
%% Exports for test
-export([ get_conn_rid/1
Expand Down Expand Up @@ -935,7 +942,52 @@ perf_counters() ->
Error
end.

%% @doc Convert quic data event to quic_data for fpbuffer
-spec quic_data({quic, binary(), stream_handle(), recv_data_props()}) -> quic_data().
quic_data({quic, Bin, _Handle, #{absolute_offset := Offset, len := Len,
flags := Flags }}) when is_binary(Bin) ->
#quic_data{offset = Offset, size = Len, bin = Bin, flags = Flags}.

-spec merge_quic_datalist([quic_data()]) -> {iolist(), Size :: non_neg_integer(), Flag :: integer()}.
merge_quic_datalist(QuicDataList) ->
lists:foldr(fun(#quic_data{bin = B, size = Size, flags = Flags}, {Acc, TotalSize, AFlags}) ->
{[B | Acc], Size + TotalSize, AFlags bor Flags}
end, {[], 0, 0}, QuicDataList).

-spec new_fpbuffer() -> fpbuffer().
new_fpbuffer() ->
new_fpbuffer(0).
new_fpbuffer(StartOffset) ->
#{next_offset => StartOffset, buffer => ordsets:new()}.

%% @doc update fpbuffer and return *next* continuous data.
-spec update_fpbuffer(quic_data(), fpbuffer()) -> {list(quic_data()), NewBuff :: fpbuffer()}.
update_fpbuffer(#quic_data{offset = Offset, size = Size} = Data, #{next_offset := Offset, buffer := []} = This) ->
%% Fast Path:. Offset is expected offset and buffer is empty.
{[Data], This#{next_offset := Offset + Size}};
update_fpbuffer(#quic_data{} = Data, #{next_offset := NextOffset, buffer := Buffer} = This) ->
Buffer1 = ordsets:add_element(ifrag(Data), Buffer),
{NewOffset, NewBuffer, NewData} = defrag_fpbuffer(NextOffset, Buffer1),
{NewData, This#{next_offset := NewOffset, buffer := NewBuffer}}.

%% @doc Pop out continuous data from the buffer start from the offset.
-spec defrag_fpbuffer(Offset :: non_neg_integer(), quic_data_buffer()) ->
{NewOffset :: non_neg_integer(), NewBuffer :: quic_data_buffer(), Res :: [quic_data()]}.
defrag_fpbuffer(Offset, Buffer) ->
defrag_fpbuffer(Offset, Buffer, []).
defrag_fpbuffer(Offset, [{Offset, Data} | T], Res) ->
defrag_fpbuffer(Offset + Data#quic_data.size, T, [Data | Res]);
defrag_fpbuffer(Offset, [], Res) ->
{Offset, [], lists:reverse(Res)};
defrag_fpbuffer(Offset, [{HeadOffset, _Data} | _T] = Buffer, Res) when HeadOffset >= Offset ->
% Nomatch
{Offset, Buffer, lists:reverse(Res)}.

%%% Internal helpers
-spec ifrag(quic_data()) -> ifrag().
ifrag(#quic_data{offset = Offset} = Data) ->
{Offset, Data}.

stats_map(recv_cnt) ->
"Recv.TotalPackets";
stats_map(recv_oct) ->
Expand Down Expand Up @@ -984,6 +1036,110 @@ flush(QuicEventName, Handle) when is_atom(QuicEventName) ->
%% Event must come, do not timeout
end.


-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

update_fpbuffer_test_() ->
Frag0 = #quic_data{offset = 0, size = 1, bin = <<1>>},
Frag1 = #quic_data{offset = 1, size = 2, bin = <<2, 3>>},
Frag3 = #quic_data{offset = 3, size = 3, bin = <<4, 5, 6>>},
Frag6 = #quic_data{offset = 6, size = 6, bin = <<7, 8, 9, 10, 11, 12>>},
FPBuffer1 = #{next_offset => 0, buffer => []},
FPBuffer1_3 = #{next_offset => 0, buffer => [ifrag(Frag1)]},
FPBuffer1_3_6 = #{next_offset => 0, buffer => [ifrag(Frag1), ifrag(Frag3), ifrag(Frag6)]},
FPBuffer1_6 = #{next_offset => 0, buffer => [ifrag(Frag1), ifrag(Frag6)]},
FPBuffer2 = #{next_offset => 1, buffer => []},
FPBuffer3 = #{next_offset => 3, buffer => []},
FPBuffer6 = #{next_offset => 3, buffer => [ifrag(Frag6)]},
FPBufferEnd = #{next_offset => 12, buffer => []},
[ ?_assertEqual({[Frag0], FPBuffer2}, update_fpbuffer(Frag0, FPBuffer1))
, ?_assertEqual({[Frag1], FPBuffer3}, update_fpbuffer(Frag1, FPBuffer2))
, ?_assertEqual({[], FPBuffer1_3}, update_fpbuffer(Frag1, FPBuffer1))
, ?_assertEqual(FPBuffer1_3_6, lists:foldl(fun(Frag, Acc) ->
{[], NewAcc} = update_fpbuffer(Frag, Acc),
NewAcc
end,
FPBuffer1,
[Frag1, Frag3, Frag6]
))
, ?_assertEqual(FPBuffer1_3_6, lists:foldl(fun(Frag, Acc) ->
{[], NewAcc} = update_fpbuffer(Frag, Acc),
NewAcc
end,
FPBuffer1,
[Frag6, Frag3, Frag1]
))
, ?_assertEqual({[Frag0, Frag1, Frag3, Frag6], FPBufferEnd}, update_fpbuffer(Frag0, FPBuffer1_3_6))
, ?_assertEqual({[Frag0, Frag1], FPBuffer6}, update_fpbuffer(Frag0, FPBuffer1_6))
].

defrag_fpbuffer_test_() ->
Frag0 = #quic_data{offset = 0, size = 1, bin = <<1>>},
Frag1 = #quic_data{offset = 1, size = 2, bin = <<2, 3>>},
Frag3 = #quic_data{offset = 3, size = 3, bin = <<4, 5, 6>>},
Frag6 = #quic_data{offset = 6, size = 6, bin = <<7, 8, 9, 10, 11, 12>>},

Buffer1 = orddict:from_list([ifrag(Frag0)]),
Buffer2 = orddict:from_list([ifrag(Frag0), ifrag(Frag3)]),
Buffer3 = orddict:from_list([ifrag(Frag0), ifrag(Frag3), ifrag(Frag6)]),
Buffer4 = orddict:from_list([ifrag(Frag0), ifrag(Frag1), ifrag(Frag6)]),
Buffer5 = orddict:from_list([ifrag(Frag1), ifrag(Frag0), ifrag(Frag6), ifrag(Frag3)]),
Buffer6 = orddict:from_list([ifrag(Frag1), ifrag(Frag6), ifrag(Frag3)]),
Buffer7 = orddict:from_list([ifrag(Frag1), ifrag(Frag6)]),
[
?_assertEqual(
{1, [], [Frag0]},
defrag_fpbuffer(0, Buffer1)
),
?_assertEqual(
{1, [ifrag(Frag3)], [Frag0]},
defrag_fpbuffer(0, Buffer2)
),
?_assertEqual(
{1, [ifrag(Frag3), ifrag(Frag6)], [Frag0]},
defrag_fpbuffer(0, Buffer3)
),
?_assertEqual(
{3, [ifrag(Frag6)], [Frag0, Frag1]},
defrag_fpbuffer(0, Buffer4)
),
?_assertEqual(
{12, [], [Frag0, Frag1, Frag3, Frag6]},
defrag_fpbuffer(0, Buffer5)
),
?_assertEqual(
{0, Buffer6, []},
defrag_fpbuffer(0, Buffer6)
),
?_assertEqual(
{12, [], [Frag1, Frag3, Frag6]},
defrag_fpbuffer(1, Buffer6)
),
?_assertEqual(
{3, [ifrag(Frag6)], [Frag1]},
defrag_fpbuffer(1, Buffer7)
)
].

merge_quic_datalist_test_() ->
Frag0 = #quic_data{offset = 0, size = 1, flags = ?QUIC_RECEIVE_FLAG_0_RTT, bin = <<1>>},
Frag1 = #quic_data{offset = 1, size = 2, bin = <<2, 3>>},
Frag3 = #quic_data{offset = 3, size = 3, bin = <<4, 5, 6>>},
Frag6 = #quic_data{offset = 6, size = 6, flags = ?QUIC_RECEIVE_FLAG_FIN, bin = <<7, 8, 9, 10, 11, 12>>},
[ ?_assertEqual({[], 0, 0}, merge_quic_datalist([]))
, ?_assertEqual({[<<1>>], 1, ?QUIC_RECEIVE_FLAG_0_RTT}, merge_quic_datalist([Frag0]))
, ?_assertEqual({[<<1>>, <<2, 3>>], 3, ?QUIC_RECEIVE_FLAG_0_RTT}, merge_quic_datalist([Frag0, Frag1]))
, ?_assertEqual({[<<2,3>>, <<4, 5, 6>>], 5, 0}, merge_quic_datalist([Frag1, Frag3]))
, ?_assertEqual({[<<7, 8, 9, 10, 11, 12>>], 6, ?QUIC_RECEIVE_FLAG_FIN}, merge_quic_datalist([Frag6]))
, ?_assertEqual({[<<2,3>>, <<4, 5, 6>>, <<7, 8, 9, 10, 11, 12>>], 11, ?QUIC_RECEIVE_FLAG_FIN},
merge_quic_datalist([Frag1, Frag3, Frag6]))
, ?_assertEqual({[<<1>>, <<2,3>>, <<4, 5, 6>>, <<7, 8, 9, 10, 11, 12>>], 12, ?QUIC_RECEIVE_FLAG_FIN bor ?QUIC_RECEIVE_FLAG_0_RTT},
merge_quic_datalist([Frag0, Frag1, Frag3, Frag6]))
].
% TEST
-endif.

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
Expand Down
30 changes: 28 additions & 2 deletions src/quicer_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ init([Callback, Conn, StreamOpts]) ->
, conn => Conn
, callback => Callback
, callback_state => undefined
, fpbuffer => maybe_buffer(StreamOpts)
},
case IsLocal of
false ->
Expand Down Expand Up @@ -254,6 +255,7 @@ init([Callback, Stream, Conn, StreamOpts, Props, PrevOwner]) ->
, stream => Stream
, callback => Callback
, callback_state => CBState
, fpbuffer => maybe_buffer(StreamOpts)
},
{ok, State, {continue , {?post_init, PrevOwner}}};
{error, _} = E ->
Expand Down Expand Up @@ -341,12 +343,31 @@ handle_info({quic, new_stream, Stream, #{flags := Flags, is_orphan := false} = P
{stop, {new_stream_crash, Reason}, State#{stream := Stream}}
end;
handle_info({quic, Bin, Stream, Props},
#{ stream := Stream, callback := M
#{ fpbuffer := disabled
, stream := Stream, callback := M
, callback_state := CallbackState } = State)
when is_binary(Bin) ->
%% FPbuffer is disabled, callback module should handle out of order delivery
?tp(debug, stream_data, #{module=>?MODULE, stream=>Stream}),
default_cb_ret(M:handle_stream_data(Stream, Bin, Props, CallbackState), State);

handle_info({quic, Bin, Stream, Props} = Evt,
#{ stream := Stream, callback := M
, fpbuffer := Buffer
, callback_state := CallbackState } = State)
when is_binary(Bin) andalso Buffer =/= disabled ->
%% FPbuffer is enabled, callback module get ordered data
?tp(debug, stream_data, #{module=>?MODULE, stream=>Stream, buffer => Buffer}),
case quicer:update_fpbuffer(quicer:quic_data(Evt), Buffer) of
{[], NewBuffer} ->
{noreply, State#{ fpbuffer := NewBuffer} };
{DataList, NewBuffer} ->
{IoListData, NewSize, NewFlag} = quicer:merge_quic_datalist(DataList),
AppData = iolist_to_binary(IoListData),
default_cb_ret(M:handle_stream_data(Stream, AppData,
Props#{len := NewSize, flags := NewFlag},
CallbackState),
State#{fpbuffer := NewBuffer})
end;
handle_info({quic, start_completed, Stream,
#{ status := _AtomStatus
, stream_id := _StreamId
Expand Down Expand Up @@ -475,3 +496,8 @@ maybe_log_stracetrace(ST) ->
logger:error("~p~n", [ST]),
ok.

maybe_buffer(#{disable_fpbuffer := true}) ->
disabled;
maybe_buffer(_) ->
%% Default enable
quicer:new_fpbuffer().
10 changes: 4 additions & 6 deletions test/example/rev.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
%% 1> rev:start().

%% To test new streams going "down"
%% 2> rev:d_test().
%% 2> rev:d_test_run().


%% To test new streams going "up"
%% 3> rev:u_test().
%% 3> rev:u_test_run().



Expand All @@ -39,8 +39,7 @@ start() ->


%% Test code

d_test() ->
d_test_run() ->
d_test(1).

d_test(N) ->
Expand All @@ -52,7 +51,7 @@ d_test(N) ->
io:format("Streams closed \n",[]).


u_test() ->
u_test_run() ->
u_test(1).
u_test(N) ->
L = lists:seq(1, N),
Expand All @@ -63,7 +62,6 @@ u_test(N) ->
io:format("Streams closed \n",[]).



send_and_close() ->
S = get_down_stream(),
quicer:send(S, <<"kalle ">>),
Expand Down
3 changes: 2 additions & 1 deletion test/quicer_snb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,8 @@ tc_accept_stream_active_once(Config) ->
ConnectionOpts = [ {conn_callback, ServerConnCallback}
, {stream_acceptors, 32}
| default_conn_opts()],
StreamOpts = [ {stream_callback, ServerStreamCallback}
StreamOpts = [ {stream_callback, ServerStreamCallback},
{disable_fpbuffer, true}
| default_stream_opts() ],
Options = {ListenerOpts, ConnectionOpts, StreamOpts},
ct:pal("Listener Options: ~p", [Options]),
Expand Down