Skip to content

Commit

Permalink
get stream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Cliff Moon committed May 31, 2010
1 parent c55d8dc commit 0338f25
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 23 deletions.
3 changes: 2 additions & 1 deletion apps/luwak/ebin/luwak.app
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
luwak_stream,
luwak_tree,
luwak_tree_utils,
luwak_put_stream
luwak_put_stream,
luwak_get_stream
]},
{registered, []},
{applications, [
Expand Down
109 changes: 109 additions & 0 deletions apps/luwak/src/luwak_get_stream.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
-module(luwak_get_stream).

-export([get_stream/4, recv/2]).

-record(map, {riak,blocksize,ref,pid,offset,endoffset}).

-include_lib("luwak/include/luwak.hrl").


%% @spec get_stream(Riak :: riak(), File :: luwak_file(), Start :: int(), Length :: length()) ->
%% get_stream() | {error, Reason}
get_stream(Riak, File, Start, Length) ->
Ref = make_ref(),
BlockSize = luwak_file:get_property(File, block_size),
Root = luwak_file:get_property(File, root),
MapStart = [{{?N_BUCKET, Root}, 0}],
Map = #map{riak=Riak,blocksize=BlockSize,ref=Ref,offset=Start,endoffset=Start+Length},
Receiver = proc_lib:spawn_link(receiver_fun(MapStart, self(), Map)),
{get_stream, Ref, Receiver}.

recv({get_stream, Ref, Pid}, Timeout) ->
receive
{get, Ref, Data, Offset} -> {Data, Offset};
{get, Ref, eos} -> eos;
{get, Ref, closed} -> closed
after Timeout ->
{error, timeout}
end.

nonblock_mr(Riak,Query,MapInput) ->
case Riak:mapred_stream(Query,self(),30000) of
{ok, {ReqId, FlowPid}} ->
luke_flow:add_inputs(FlowPid, MapInput),
luke_flow:finish_inputs(FlowPid);
Error ->
exit(Error)
end.

receiver_fun(MapInput, Parent, Map=#map{riak=Riak,offset=Offset,endoffset=EndOffset,ref=Ref}) ->
fun() ->
Query = [{map,{qfun,map_fun()},Map#map{pid=self()}, false}],
nonblock_mr(Riak, Query, MapInput),
receive_loop(Ref, Offset, EndOffset, Parent)
end.

receive_loop(Ref, Offset, EndOffset, Parent) when Offset >= EndOffset ->
?debugMsg("receive_loop sending eos~n"),
Parent ! {get, Ref, eos},
ok;
receive_loop(Ref, Offset, EndOffset, Parent) ->
receive
{get, Ref, Data, Offset} ->
?debugFmt("receive_loop got ~p~n", [{get, Ref, Data, Offset}]),
Parent ! {get, Ref, Data, Offset},
receive_loop(Ref, Offset+byte_size(Data), EndOffset, Parent);
{close, Ref} ->
?debugFmt("receive_loop got ~p~n", [{close, Ref}]),
Parent ! {get, Ref, closed},
ok
end.

map_fun() ->
fun(Obj, TreeOffset, Map) ->
case (catch map(riak_object:get_value(Obj), TreeOffset, Map)) of
{'EXIT', Error} ->
error_logger:error_msg("map failed with: ~p~n", [Error]),
throw(Error);
Result -> Result
end
end.

map(Parent=#n{}, TreeOffset, Map=#map{riak=Riak,offset=Offset,endoffset=EndOffset,blocksize=BlockSize}) ->
?debugFmt("A map(~p, ~p, ~p)~n", [Parent, TreeOffset, Map]),
Fun = fun({Name,Length},AccOffset) ->
{[{{?N_BUCKET, Name}, AccOffset}], AccOffset+Length}
end,
Blocks = luwak_tree:get_range(Riak, Fun, Parent, BlockSize, TreeOffset, Offset, EndOffset),
Query = [{map,{qfun,map_fun()},Map,false}],
spawn(fun() ->
?debugFmt("launching MR on ~p~n", [Blocks]),
nonblock_mr(Riak, Query, Blocks)
end),
[];
map(Block, TreeOffset, Map=#map{riak=Riak,offset=Offset,endoffset=EndOffset,ref=Ref,pid=Pid,blocksize=BlockSize}) when TreeOffset < Offset ->
?debugFmt("B map(~p, ~p, ~p)~n", [Block, TreeOffset, Map]),
PartialSize = Offset - TreeOffset,
<<_:PartialSize/binary, Tail/binary>> = luwak_block:data(Block),
?debugFmt("sending ~p~n", [{get, Ref, Tail, Offset}]),
Pid ! {get, Ref, Tail, Offset},
[];
map(Block, TreeOffset, Map=#map{riak=Riak,offset=Offset,endoffset=EndOffset,ref=Ref,pid=Pid,blocksize=BlockSize}) when BlockSize >= EndOffset - TreeOffset ->
?debugFmt("C map(~p, ~p, ~p)~n", [Block, TreeOffset, Map]),
PartialSize = EndOffset - TreeOffset,
<<PartialData:PartialSize/binary, _/binary>> = luwak_block:data(Block),
?debugFmt("sending ~p~n", [{get, Ref, PartialData, TreeOffset}]),
Pid ! {get, Ref, PartialData, TreeOffset},
[];
map(Block, TreeOffset, Map=#map{riak=Riak,offset=Offset,endoffset=EndOffset,ref=Ref,pid=Pid,blocksize=BlockSize}) ->
?debugFmt("D map(~p, ~p, ~p)~n", [Block, TreeOffset, Map]),
Data = luwak_block:data(Block),
?debugFmt("sending ~p~n", [{get, Ref, Data, TreeOffset}]),
Pid ! {get, Ref, Data, TreeOffset},
[].

ident_fun() ->
fun({Name,Length}, NodeOffset) ->
{[{Name,Length}], NodeOffset}
end.

6 changes: 0 additions & 6 deletions apps/luwak/src/luwak_stream.erl

This file was deleted.

42 changes: 27 additions & 15 deletions apps/luwak/src/luwak_tree.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-module(luwak_tree).

-export([update/4, get/2, block_at/3, visualize_tree/2, get_range/6, truncate/7]).
-export([update/4, get/2, block_at/3, visualize_tree/2, get_range/6, get_range/7, truncate/7]).

-include_lib("luwak/include/luwak.hrl").

Expand Down Expand Up @@ -32,25 +32,33 @@ update(Riak, File, StartingPos, Blocks) ->
luwak_file:update_root(Riak, File, NewRootName)
end.

get_range(_, _, _, _, _, 0) ->
get_range(Riak, Parent, BlockSize, TreeStart, Start, End) ->
Fun = fun({Name,Length}, _) when Length =< BlockSize ->
?debugFmt("A foldrflatmap({~p,~p}, _)~n", [Name, Length]),
{[{Name,BlockSize}], 0};
({Name,NodeLength}, AccLength) ->
?debugFmt("B foldrflatmap({~p,~p}, ~p)~n", [Name, NodeLength, AccLength]),
{ok, Node} = get(Riak, Name),
Blocks = get_range(Riak, Node, BlockSize, AccLength, Start, End),
{Blocks, AccLength+NodeLength}
end,
get_range(Riak, Fun, Parent, BlockSize, TreeStart, Start, End).

get_range(_, _, _, _, _, _, 0) ->
[];
get_range(Riak, Parent = #n{children=[]}, BlockSize, TreeStart, Start, End) ->
get_range(Riak, Fun, Parent = #n{children=[]}, BlockSize, TreeStart, Start, End) ->
?debugMsg("D get_range(_, _, _, _, _, _)~n"),
[];
%% children are individual blocks
%% we can do this because trees are guaranteed to be full
get_range(Riak, Parent = #n{children=[{_,BlockSize}|_]=Children}, BlockSize, TreeStart, Start, End) ->
get_range(Riak, Fun, Parent = #n{children=[{_,BlockSize}|_]=Children}, BlockSize, TreeStart, Start, End) ->
?debugFmt("A get_range(Riak, ~p, ~p, ~p, ~p, ~p)~n", [Parent, BlockSize, TreeStart, Start, End]),
{Nodes,_} = read_split(Children, TreeStart, Start, End);
get_range(Riak, Parent = #n{children=Children}, BlockSize, TreeStart, Start, End) ->
{Nodes, Length} = read_split(Children, TreeStart, Start, End),
luwak_tree_utils:foldrflatmap(Fun, Nodes, Length);
get_range(Riak, Fun, Parent = #n{children=Children}, BlockSize, TreeStart, Start, End) ->
?debugFmt("B get_range(Riak, ~p, ~p, ~p, ~p, ~p)~n", [Parent, BlockSize, TreeStart, Start, End]),
{Nodes, Length} = read_split(Children, TreeStart, Start, End),
luwak_tree_utils:foldrflatmap(fun({Name,NodeLength}, AccLength) ->
?debugFmt("foldrflatmap({~p,~p}, ~p)~n", [Name, NodeLength, AccLength]),
{ok, Node} = get(Riak, Name),
{Blocks,_} = get_range(Riak, Node, BlockSize, AccLength, Start, End),
{Blocks, AccLength+NodeLength}
end, Nodes, Length).
luwak_tree_utils:foldrflatmap(Fun, Nodes, Length).

truncate(_Riak, File, _Start, undefined, _Order, _NodeOffset, _BlockSize) ->
?debugFmt("A truncate(Riak, File, ~p, undefined, ~p, ~p, ~p)~n", [_Start, _Order, _NodeOffset, _BlockSize]),
Expand Down Expand Up @@ -157,12 +165,16 @@ subtree_update(Riak, File, Order, InsertPos, TreePos, Parent = #n{}, Blocks) ->

list_into_nodes(Riak, Children, Order, StartingPos) ->
?debugFmt("list_into_nodes(Riak, ~p, ~p, ~p)~n", [Children, Order, StartingPos]),
map_sublist(fun(Sublist) ->
Written = map_sublist(fun(Sublist) ->
Length = luwak_tree_utils:blocklist_length(Sublist),
{ok, Obj} = create_node(Riak, Sublist),
{riak_object:key(Obj), Length}
end, Order, Children).

end, Order, Children),
if
length(Written) > Order -> list_into_nodes(Riak, Written, Order, StartingPos);
true -> Written
end.


%% @spec block_at(Riak::riak(), File::luwak_file(), Pos::int()) ->
%% {ok, BlockObj} | {error, Reason}
Expand Down
37 changes: 37 additions & 0 deletions apps/luwak/test/luwak_get_stream_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-module(luwak_get_stream_tests).

-include_lib("eunit/include/eunit.hrl").

simple_get_stream_test() ->
test_helper:riak_test(fun(Riak) ->
{ok, File} = luwak_file:create(Riak, <<"file1">>, [{block_size,3},{tree_order,3}], dict:new()),
{ok, Written, File1} = luwak_io:put_range(Riak, File, 0, <<"wontyoupleasetouchmymonkey">>),
GetStream = luwak_get_stream:get_stream(Riak, File1, 0, 26),
timer:sleep(100),
?assertEqual({<<"won">>, 0}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"tyo">>, 3}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"upl">>, 6}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"eas">>, 9}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"eto">>, 12}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"uch">>, 15}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"mym">>, 18}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"onk">>, 21}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"ey">>, 24}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual(eos, luwak_get_stream:recv(GetStream, 1000))
end).

three_level_tree_stream_test_() ->
Test = fun() ->
test_helper:riak_test(fun(Riak) ->
{ok, File} = luwak_file:create(Riak, <<"file1">>, [{block_size,2},{tree_order,2}], dict:new()),
{ok, Written, File1} = luwak_io:put_range(Riak, File, 0, <<"wontyoupleasetouchmymonkey">>),
GetStream = luwak_get_stream:get_stream(Riak, File1, 3, 10),
?assertEqual({<<"t">>, 3}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"yo">>, 4}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"up">>, 6}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"le">>, 8}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"as">>, 10}, luwak_get_stream:recv(GetStream, 1000)),
?assertEqual({<<"e">>, 12}, luwak_get_stream:recv(GetStream, 1000))
end)
end,
{timeout, 30000, Test}.
3 changes: 2 additions & 1 deletion apps/luwak/test/test_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ start_riak() ->
% application:set_env(riak_core, ring_state_dir, Dir),
application:set_env(riak_kv, storage_backend, riak_kv_cache_backend),
load_and_start_apps([kernel, stdlib, sasl, crypto, webmachine,
riak_core, riak_kv]).
riak_core, riak_kv, luke]).

stop_riak() ->
application:stop(luke),
application:stop(riak_kv),
application:stop(riak_core).

Expand Down

0 comments on commit 0338f25

Please sign in to comment.