Skip to content

Commit

Permalink
prevent streaming gets from streaming to end of block
Browse files Browse the repository at this point in the history
basically the same issue as just fixed for get_range, get_stream was streaming through
the end of a block, whether or not you had asked for it
  • Loading branch information
beerriot committed Aug 5, 2010
1 parent 74217cc commit b39201a
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 7 deletions.
32 changes: 25 additions & 7 deletions src/luwak_get_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -116,22 +116,40 @@ map(Parent=#n{}, TreeOffset,
end)
end,
[];
map(Block, TreeOffset, _Map=#map{offset=Offset,ref=Ref,pid=Pid})
map(Block, TreeOffset,
_Map=#map{offset=Offset,ref=Ref,pid=Pid,endoffset=EndOffset,blocksize=
BlockSize})
when TreeOffset < Offset ->
?debugFmt("B map(~p, ~p, ~p)~n", [Block, TreeOffset, _Map]),
PartialSize = Offset - TreeOffset,
<<_:PartialSize/binary, Tail/binary>> = luwak_block:data(Block),
?debugFmt("sending ~p~n", [{get, Ref, Tail, Offset}]),
Pid ! {get, Ref, Tail, Offset},
case BlockSize >= EndOffset - TreeOffset of
%% should be the same as BlockSize >= EndOffset-Offset
false ->
%% wanted the rest of the block
?debugFmt("sending ~p~n", [{get, Ref, Tail, Offset}]),
Pid ! {get, Ref, Tail, Offset};
true ->
%% wanted only a middle chunk of the block
SubPartialSize = EndOffset-Offset,
<<SubTail:SubPartialSize/binary, _/binary>> = Tail,
?debugFmt("sending ~p~n", [{get, Ref, SubTail, Offset}]),
Pid ! {get, Ref, SubTail, Offset}
end,
[];
map(Block, TreeOffset,
_Map=#map{endoffset=EndOffset,ref=Ref,pid=Pid,blocksize=BlockSize})
when BlockSize >= EndOffset - TreeOffset ->
?debugFmt("C map(~p, ~p, ~p)~n", [Block, TreeOffset, _Map]),
PartialSize = EndOffset - TreeOffset,
<<PartialData:PartialSize/binary, _/binary>> = luwak_block:data(Block),
?debugFmt("sending ~p~n", [{get, Ref, PartialData, TreeOffset}]),
Pid ! {get, Ref, PartialData, TreeOffset},
case EndOffset - TreeOffset of
PartialSize when PartialSize > 0 ->
<<PartialData:PartialSize/binary, _/binary>> = luwak_block:data(Block),
?debugFmt("sending ~p~n", [{get, Ref, PartialData, TreeOffset}]),
Pid ! {get, Ref, PartialData, TreeOffset};
_PartialSize ->
%% boundary case where this block was looked up, but not needed
ok
end,
[];
map(Block, TreeOffset, _Map=#map{ref=Ref,pid=Pid}) ->
?debugFmt("D map(~p, ~p, ~p)~n", [Block, TreeOffset, _Map]),
Expand Down
74 changes: 74 additions & 0 deletions test/luwak_get_stream_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,77 @@ read_beyond_file_end_test() ->
GetStream = luwak_get_stream:start(Riak, File1, 30, 10),
?assertEqual(eos, luwak_get_stream:recv(GetStream, 1000))
end).

%% @doc Makes a simple file, named Name, with BlockCount blocks, each
%% filled with BlockSize characters.
%% Ex: make_standard_range_file(R, <<"foo">>, 5, 3)
%% will make a file named "foo" with the contents:
%% aaaaabbbbbccccc
%% @spec make_standard_range_file(riak_client(), binary(),
%% integer(), integer())
%% -> {ok, Contents::binary(), FileHandle::luwak_file()}
make_standard_range_file(Riak, Name, BlockSize, BlockCount) ->
{ok, File} = luwak_file:create(Riak, Name,
[{block_size, BlockSize}],
dict:new()),
Data = iolist_to_binary(
[ lists:duplicate(BlockSize, $a+N) ||
N <- lists:seq(0, BlockCount-1) ]),
{ok, _, NewFile} = luwak_io:put_range(Riak, File, 0, Data),
{ok, Data, NewFile}.

%% tests a get_range that specifies an offset+length that ends in
%% the middle of a block
%% Ex: aaaaaaaaaa bbbbbbbbbb cccccccccc
%% ^start end^
partial_end_block_get_range_test() ->
test_helper:riak_test(
fun(Riak) ->
{ok, Data, File} = make_standard_range_file(
Riak, <<"endblockrange">>, 10, 3),
Read = read_stream(
luwak_get_stream:start(Riak, File, 10, 14)),
<<_:10/binary, Expected:14/binary, _/binary>> = Data,
?assertEqual(Expected, Read)
end).

%% tests a get_range that specifies an offset that lands in the
%% middle of a block
%% Ex: aaaaaaaaaa bbbbbbbbbb cccccccccc
%% ^start end^
partial_start_block_get_range_test() ->
test_helper:riak_test(
fun(Riak) ->
{ok, Data, File} = make_standard_range_file(
Riak, <<"startblockrange">>, 10, 3),
Read = read_stream(
luwak_get_stream:start(Riak, File, 5, 15)),
<<_:5/binary, Expected:15/binary, _/binary>> = Data,
?assertEqual(Expected, Read)
end).


%% tests a get_range that starts and ends in the same block, but
%% is unaligned with that block
%% Ex: aaaaaaaaaa bbbbbbbbbb cccccccccc
%% ^st end^
partial_middle_block_get_range_test() ->
test_helper:riak_test(
fun(Riak) ->
{ok, Data, File} = make_standard_range_file(
Riak, <<"midblockrange">>, 10, 3),
Read = read_stream(
luwak_get_stream:start(Riak, File, 11, 8)),
<<_:11/binary, Expected:8/binary, _/binary>> = Data,
?assertEqual(Expected, Read)
end).

read_stream(Stream) ->
read_stream(Stream, luwak_get_stream:recv(Stream, 1000), []).

read_stream(_, eos, Acc) ->
iolist_to_binary(lists:reverse(Acc));
read_stream(Stream, {Data, _}, Acc) ->
read_stream(Stream,
luwak_get_stream:recv(Stream, 1000),
[Data|Acc]).

0 comments on commit b39201a

Please sign in to comment.