Skip to content

Commit

Permalink
Refactor the frame buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
iamaleksey committed Dec 27, 2013
1 parent cda96c0 commit ed17b65
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 30 deletions.
45 changes: 45 additions & 0 deletions src/seestar_buffer.erl
@@ -0,0 +1,45 @@
%%% Copyright 2012 Aleksey Yeschenko
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.

%%% @private
-module(seestar_buffer).

-export([new/0, decode/2]).
-export_type([buffer/0]).

-record(buffer, {buffered :: iolist(),
current_size :: non_neg_integer(),
pending_size :: non_neg_integer() | undefined}).
-opaque buffer() :: #buffer{}.

%% -------------------------------------------------------------------------
%% API
%% -------------------------------------------------------------------------

-spec new() -> buffer().
new() ->
#buffer{buffered = [], current_size = 0, pending_size = undefined}.

-spec decode(buffer(), binary()) -> {[seestar_frame:frame()], buffer()}.
decode(#buffer{current_size = Current, pending_size = Pending} = Buffer, NewData)
when is_integer(Pending) andalso Current + size(NewData) < Pending ->
{[], Buffer#buffer{buffered = [NewData|Buffer#buffer.buffered],
current_size = Current + size(NewData)}};

decode(Buffer, NewData) ->
Data = list_to_binary(lists:reverse([NewData|Buffer#buffer.buffered])),
{Frames, Rest} = seestar_frame:decode(Data),
{Frames, #buffer{buffered = [Rest],
current_size = size(Rest),
pending_size = seestar_frame:pending_size(Rest)}}.
19 changes: 12 additions & 7 deletions src/seestar_frame.erl
Expand Up @@ -15,12 +15,13 @@
%%% @private
-module(seestar_frame).

-export([new/4, id/1, flags/1, has_flag/2, opcode/1, body/1, encode/1, decode/1]).
-export([new/4, id/1, flags/1, has_flag/2, opcode/1,
body/1, encode/1, pending_size/1, decode/1]).

-type stream_id() :: -1..127.
-type flag() :: compression | tracing.
-type opcode() :: 16#00..16#0C.
-export_type([stream_id/0, flag/0, opcode/0]).
-export_type([stream_id/0, flag/0, opcode/0, frame/0]).

-define(COMPRESSION, 16#01).
-define(TRACING, 16#02).
Expand All @@ -29,7 +30,7 @@
flags = [] :: [flag()],
opcode :: opcode(),
body :: binary()}).
-type frame() :: #frame{}.
-opaque frame() :: #frame{}.

%% -------------------------------------------------------------------------
%% API
Expand Down Expand Up @@ -69,16 +70,20 @@ encode_flags(Flags) ->
encode_flag(compression) -> ?COMPRESSION;
encode_flag(tracing) -> ?TRACING.

-spec decode(binary()) -> {[frame()], binary(), integer() | undefined}.
-spec pending_size(binary()) -> pos_integer().
pending_size(<<16#81, _Flags, _ID/signed, _Op, Size:32, _/binary>>) ->
Size + 8;
pending_size(_) ->
undefined.

-spec decode(binary()) -> {[frame()], binary()}.
decode(Stream) ->
decode(Stream, []).
decode(<<16#81, Flags, ID/signed, Op, Size:32, Body:Size/binary, Rest/binary>>, Acc) ->
Frame = #frame{id = ID, flags = decode_flags(Flags), opcode = Op, body = Body},
decode(Rest, [Frame|Acc]);
decode(<<16#81, _Flags, _ID/signed, _Op, Size:32, _Rest/binary>> = Stream, Acc) ->
{lists:reverse(Acc), Stream, Size + 8};
decode(Stream, Acc) ->
{lists:reverse(Acc), Stream, undefined}.
{lists:reverse(Acc), Stream}.

decode_flags(Byte) ->
F = fun(Mask, Flags) when Byte band Mask =:= Mask ->
Expand Down
28 changes: 5 additions & 23 deletions src/seestar_session.erl
Expand Up @@ -50,9 +50,7 @@
-record(st,
{host :: inet:hostname(),
sock :: inet:socket(),
incomplete = <<>> :: binary() | [binary()],
frame_size = undefined :: integer() | undefined,
incomplete_size = 0 :: integer(),
buffer :: seestar_buffer:buffer(),
free_ids :: [seestar_frame:stream_id()],
backlog = queue:new() :: queue(),
reqs :: ets:tid()}).
Expand Down Expand Up @@ -221,8 +219,8 @@ init([Host, Port, ConnectOptions]) ->
case gen_tcp:connect(Host, Port, SockOpts, Timeout) of
{ok, Sock} ->
ok = inet:setopts(Sock, [binary, {packet, 0}, {active, true}]),
{ok, #st{host = Host, sock = Sock, free_ids = lists:seq(0, 127),
reqs = ets:new(seestar_reqs, [])}};
{ok, #st{host = Host, sock = Sock, buffer = seestar_buffer:new(),
free_ids = lists:seq(0, 127), reqs = ets:new(seestar_reqs, [])}};
{error, Reason} ->
{stop, {connection_error, Reason}}
end.
Expand Down Expand Up @@ -270,24 +268,8 @@ handle_cast(Request, St) ->

%% @private
handle_info({tcp, Sock, Data}, #st{sock = Sock} = St) ->
{Frames, St2} = case St#st.frame_size of
undefined ->
Stream = <<(St#st.incomplete)/binary, Data/binary>>,
{FramesI, Incomplete, FrameSize} = seestar_frame:decode(Stream),
{FramesI, St#st{incomplete = Incomplete, frame_size = FrameSize, incomplete_size = size(Incomplete)}};
_ ->
Stream = [Data, St#st.incomplete],
StreamSize = St#st.incomplete_size + size(Data),
case StreamSize >= St#st.frame_size of
true ->
Stream2 = iolist_to_binary(Stream),
{FramesI, Incomplete, FrameSize} = seestar_frame:decode(Stream2),
{FramesI, St#st{incomplete = Incomplete, frame_size = FrameSize, incomplete_size = size(Incomplete)}};
false ->
{[], St#st{incomplete = Stream, incomplete_size = StreamSize}}
end
end,
{noreply, process_frames(Frames, St2)};
{Frames, Buffer} = seestar_buffer:decode(St#st.buffer, Data),
{noreply, process_frames(Frames, St#st{buffer = Buffer})};

handle_info({tcp_closed, Sock}, #st{sock = Sock} = St) ->
{stop, socket_closed, St};
Expand Down

0 comments on commit ed17b65

Please sign in to comment.