Skip to content

Commit

Permalink
put_stream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Cliff Moon committed May 27, 2010
1 parent 9fd968d commit f67572e
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 3 deletions.
3 changes: 2 additions & 1 deletion apps/luwak/ebin/luwak.app
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
luwak_file,
luwak_stream,
luwak_tree,
luwak_tree_utils
luwak_tree_utils,
luwak_put_stream
]},
{registered, []},
{applications, [
Expand Down
16 changes: 14 additions & 2 deletions apps/luwak/src/luwak_io.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-module(luwak_io).

-export([put_range/4, get_range/4, truncate/3]).
-export([put_range/4, get_range/4, truncate/3, no_tree_put_range/4]).

put_range(Riak, File, Start, Data) ->
internal_put_range(Riak, File, Start, Data).
Expand Down Expand Up @@ -28,6 +28,17 @@ truncate(Riak, File, Start) ->
%%==============================================
%% internal api
%%==============================================
no_tree_put_range(Riak, File, Start, Data) ->
error_logger:info_msg("no_tree_put_range(Riak, File, ~p, ~p)~n", [Start, Data]),
BlockSize = luwak_file:get_property(File, block_size),
BlockAlignedStart = Start - (Start rem BlockSize),
case (Start rem BlockSize) of
0 -> write_blocks(Riak, File, undefined, Start, Data, BlockSize, []);
BlockOffset ->
error_logger:info_msg("blockoffset ~p~n", [BlockOffset]),
{ok, Block} = luwak_tree:block_at(Riak, File, Start),
write_blocks(Riak, File, Block, Start, Data, BlockSize, [])
end.

internal_put_range(Riak, File, Start, Data) ->
BlockSize = luwak_file:get_property(File, block_size),
Expand Down Expand Up @@ -74,7 +85,8 @@ write_blocks(Riak, File, undefined, Start, Data, BlockSize, Written) when is_lis
write_blocks(Riak, File, PartialStartBlock, Start, Data, BlockSize, Written) when is_list(Written), byte_size(Data) < BlockSize ->
% error_logger:info_msg("D write_blocks(Riak, File, ~p, ~p, ~p, ~p, ~p) ~n", [PartialStartBlock, Start, Data, BlockSize, Written]),
DataSize = byte_size(Data),
<<Head:Start/binary, _:DataSize/binary, Tail/binary>> = luwak_block:data(PartialStartBlock),
PartialStart = Start rem BlockSize,
<<Head:PartialStart/binary, _:DataSize/binary, Tail/binary>> = luwak_block:data(PartialStartBlock),
BlockData = <<Head/binary, Data/binary, Tail/binary>>,
{ok, Block} = luwak_block:create(Riak, BlockData),
{ok, lists:reverse([{luwak_block:name(Block),byte_size(BlockData)}|Written])};
Expand Down
100 changes: 100 additions & 0 deletions apps/luwak/src/luwak_put_stream.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
-module(luwak_put_stream).

-define(BUFFER_SIZE, 20).
-record(state, {file,offset,blocksize,ref,ttl,written=[],buffer=[],buffersize=0}).
%% API
-export([start_link/4, start/4, send/2, ping/1, close/1, status/2]).

start_link(Riak, File, Offset, TTL) ->
Ref = make_ref(),
BlockSize = luwak_file:get_property(File, block_size),
Pid = proc_lib:spawn_link(fun() ->
recv(Riak, #state{file=File,offset=Offset,blocksize=BlockSize,ref=Ref,ttl=TTL})
end),
{put_stream, Ref, Pid}.

start(Riak, File, Offset, TTL) ->
Ref = make_ref(),
BlockSize = luwak_file:get_property(File, block_size),
Pid = proc_lib:spawn(fun() ->
recv(Riak, #state{file=File,offset=Offset,blocksize=BlockSize,ref=Ref,ttl=TTL})
end),
{put_stream, Ref, Pid}.

send({put_stream, Ref, Pid}, Data) ->
Pid ! {put, Ref, Data}.

ping({put_stream, Ref, Pid}) ->
Pid ! {ping, Ref}.

close({put_stream, Ref, Pid}) ->
Pid ! {close, Ref}.

status({put_stream, Ref, Pid}, Timeout) ->
Pid ! {status, self(), Ref},
MonitorRef = erlang:monitor(process, Pid),
receive
{stream, Ref, File} ->
erlang:demonitor(MonitorRef),
{ok, File};
{'DOWN', MonitorRef, _, Pid, Reason} -> {error, Reason}
after Timeout ->
erlang:demonitor(MonitorRef),
{error, timeout}
end.

recv(_, close) ->
ok;
recv(Riak, {flushed, State = #state{file=File,ttl=TTL}}) ->
receive
{status, Client, Ref} -> Client ! {stream, Ref, File}
after TTL ->
ok
end;
recv(Riak, State = #state{ref=Ref,ttl=TTL,buffer=Buffer,file=File}) ->
recv(Riak,
receive
{put, Ref, Data} -> handle_data(Riak, State#state{buffer=[Data|Buffer],buffersize=iolist_size([Data|Buffer])});
{ping, Ref} -> State;
{close, Ref} -> flush(Riak, State);
{status, Client, Ref} ->
Client ! {stream, Ref, File},
State
after TTL ->
flush(Riak, State)
end).

%% partial write on an unaligned block. buffer better be empty
handle_data(Riak, State=#state{file=File,offset=Offset,blocksize=BlockSize,buffer=Buffer,written=[],buffersize=BufferSize}) when Offset rem BlockSize =/= 0, BufferSize >= BlockSize - (Offset rem BlockSize) ->
PartialSize = BlockSize - Offset rem BlockSize,
<<PartialData:PartialSize/binary, TailData/binary>> = iolist_to_binary(lists:reverse(Buffer)),
{ok, [W]} = luwak_io:no_tree_put_range(Riak, File, Offset, PartialData),
update_tree(Riak, State#state{offset=Offset+PartialSize,buffer=[TailData],buffersize=byte_size(TailData),written=[W]});
handle_data(Riak, State=#state{file=File,offset=Offset,blocksize=BlockSize,buffer=Buffer,written=Written,buffersize=BufferSize}) when BufferSize >= BlockSize ->
BufferSize = iolist_size(Buffer),
PartialSize = BufferSize - BufferSize rem BlockSize,
<<PartialData:PartialSize/binary, TailData/binary>> = iolist_to_binary(lists:reverse(Buffer)),
{ok, Written1} = luwak_io:no_tree_put_range(Riak, File, Offset, PartialData),
update_tree(Riak, State#state{offset=Offset+PartialSize,buffer=[TailData],buffersize=byte_size(TailData),written=Written ++ Written1});
handle_data(Riak, State) ->
update_tree(Riak, State).

update_tree(Riak, State=#state{offset=Offset,file=File,written=Written}) when length(Written) >= ?BUFFER_SIZE ->
OriginalOffset = Offset - luwak_tree_utils:blocklist_length(Written),
{ok, NewFile} = luwak_tree:update(Riak, File, OriginalOffset, Written),
State#state{file=NewFile};
update_tree(Riak, State) ->
State.

flush(Riak, State=#state{offset=Offset,file=File,buffer=Buffer,written=Written}) when length(Buffer) > 0 ->
error_logger:info_msg("A flush(Riak, ~p)~n", [State]),
{ok, Written1} = luwak_io:no_tree_put_range(Riak, File, Offset, iolist_to_binary(lists:reverse(Buffer))),
WriteSize = luwak_tree_utils:blocklist_length(Written1),
flush(Riak, State#state{offset=Offset+WriteSize,buffer=[],buffersize=0,written=Written++Written1});
flush(Riak, State=#state{offset=Offset,file=File,written=Written}) when length(Written) > 0 ->
error_logger:info_msg("B flush(Riak, ~p)~n", [State]),
OriginalOffset = Offset - luwak_tree_utils:blocklist_length(Written),
{ok, NewFile} = luwak_tree:update(Riak, File, OriginalOffset, Written),
{flushed, State#state{file=NewFile}};
flush(Riak, State) ->
{flushed, State}.
31 changes: 31 additions & 0 deletions apps/luwak/test/luwak_put_stream_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-module(luwak_put_stream_tests).

-include_lib("eunit/include/eunit.hrl").

aligned_put_stream_test() ->
test_helper:riak_test(fun(Riak) ->
{ok, File} = luwak_file:create(Riak, <<"file1">>, [{block_size,3},{tree_order,3}], dict:new()),
PutStream = luwak_put_stream:start(Riak, File, 0, 1000),
Input = [<<"hey">>, <<"why">>, <<"are">>, <<"you">>, <<"drp">>],
lists:foreach(fun(B) -> luwak_put_stream:send(PutStream, B) end, Input),
luwak_put_stream:close(PutStream),
{ok, File2} = luwak_put_stream:status(PutStream, 1000),
Blocks = luwak_io:get_range(Riak, File2, 0, 15),
?assertEqual(<<"heywhyareyoudrp">>, iolist_to_binary(Blocks))
end).

unaligned_put_stream_test() ->
test_helper:riak_test(fun(Riak) ->
{ok, File} = luwak_file:create(Riak, <<"file1">>, [{block_size,3},{tree_order,3}], dict:new()),
{ok, _, File1} = luwak_io:put_range(Riak, File, 0, <<"heywhyareyoudrp">>),
ok = file:write_file("/Users/cliff/tree8.dot", luwak_tree:visualize_tree(Riak, luwak_file:get_property(File1, root))),
PutStream = luwak_put_stream:start(Riak, File1, 4, 1000),
Input = [<<"her">>, <<"pdr">>, <<"p">>],
lists:foreach(fun(B) -> luwak_put_stream:send(PutStream, B) end, Input),
luwak_put_stream:close(PutStream),
{ok, File2} = luwak_put_stream:status(PutStream, 1000),
ok = file:write_file("/Users/cliff/tree9.dot", luwak_tree:visualize_tree(Riak, luwak_file:get_property(File2, root))),
Blocks = luwak_io:get_range(Riak, File2, 0, 15),
timer:sleep(100),
?assertEqual(<<"heywherpdrpudrp">>, iolist_to_binary(Blocks))
end).

0 comments on commit f67572e

Please sign in to comment.