Skip to content

Commit

Permalink
sha1 checksumming of latest write, and a one off checksum for the who…
Browse files Browse the repository at this point in the history
…le file contents
  • Loading branch information
Cliff Moon committed May 31, 2010
1 parent d265e75 commit a790f15
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 17 deletions.
3 changes: 2 additions & 1 deletion apps/luwak/ebin/luwak.app
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
luwak_tree,
luwak_tree_utils,
luwak_put_stream,
luwak_get_stream
luwak_get_stream,
luwak_checksum
]},
{registered, []},
{applications, [
Expand Down
14 changes: 14 additions & 0 deletions apps/luwak/src/luwak_checksum.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-module(luwak_checksum).

-export([sha1/2]).

sha1(Riak, File) ->
Length = luwak_file:length(Riak, File),
sha1_int(crypto:sha_init(), luwak_get_stream:start(Riak, File, 0, Length)).

sha1_int(Ctx, GetStream) ->
case luwak_get_stream:recv(GetStream, 1000) of
{error, timeout} -> {error, timeout};
eos -> crypto:sha_final(Ctx);
{Data, _} -> sha1_int(crypto:sha_update(Ctx,Data),GetStream)
end.
26 changes: 24 additions & 2 deletions apps/luwak/src/luwak_file.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
-module(luwak_file).

-export([create/3, create/4, set_attributes/3, get_attributes/1, exists/2,
delete/2, get/2, get_property/2, update_root/3, name/1, length/2]).
delete/2, get/2, get_property/2, update_root/3, update_checksum/3,
name/1, length/2]).

-include_lib("luwak/include/luwak.hrl").

Expand All @@ -21,9 +22,13 @@ create(Riak, Name, Attributes) when is_binary(Name) ->
%% is 1000000.
%% {tree_order, int()} - The maximum number of children for an individual tree node. Default
%% is 250.
%% {checksumming, boolean()} - This controls whether or not checksumming will occur. Checksumming
%% will leave a checksum of the most recent write request as a property
%% of the filehandle.
create(Riak, Name, Properties, Attributes) when is_binary(Name) ->
BlockSize = proplists:get_value(block_size, Properties, ?BLOCK_DEFAULT),
Order = proplists:get_value(tree_order, Properties, ?ORDER_DEFAULT),
Checksumming = proplists:get_value(checksumming, Properties, false),
if
Order < 2 -> throw("tree_order cannot be less than 2");
BlockSize < 1 -> throw("block_size cannot be less than 1");
Expand All @@ -34,6 +39,8 @@ create(Riak, Name, Properties, Attributes) when is_binary(Name) ->
{block_size, BlockSize},
{created, now()},
{modified, now()},
{checksumming, Checksumming},
{checksum, undefined},
{tree_order, Order},
{ancestors, []},
{root, undefined}
Expand Down Expand Up @@ -118,7 +125,22 @@ update_root(Riak, Obj, NewRoot) ->
ObjVal2 = lists:keyreplace(ancestors, 1, ObjVal1, {ancestors, [OldRoot|Ancestors]}),
ObjVal3 = lists:keyreplace(root, 1, ObjVal2, {root, NewRoot}),
Obj2 = riak_object:apply_updates(riak_object:update_value(Obj, ObjVal3)),
{Riak:put(Obj2, 2), Obj2}.
ok = Riak:put(Obj2, 2),
{ok, Obj2}.

%% @private
update_checksum(Riak, Obj, ChecksumFun) ->
case get_property(Obj, checksumming) of
true ->
Values = riak_object:get_value(Obj),
ObjVal1 = riak_object:get_value(Obj),
ObjVal2 = lists:keyreplace(checksum, 1, ObjVal1, {checksum, {sha1, ChecksumFun()}}),
Obj2 = riak_object:apply_updates(riak_object:update_value(Obj, ObjVal2)),
ok = Riak:put(Obj2, 2),
{ok, Obj2};
_ ->
{ok, Obj}
end.

%% @spec name(Obj :: luwak_file()) -> binary()
%% @doc returns the name of the given file handle.
Expand Down
3 changes: 2 additions & 1 deletion apps/luwak/src/luwak_get_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ start(Riak, File, Start, Length) ->
Root = luwak_file:get_property(File, root),
MapStart = [{{?N_BUCKET, Root}, 0}],
Map = #map{riak=Riak,blocksize=BlockSize,ref=Ref,offset=Start,endoffset=Start+Length},
Receiver = proc_lib:spawn_link(receiver_fun(MapStart, self(), Map)),
Receiver = proc_lib:spawn(receiver_fun(MapStart, self(), Map)),
{get_stream, Ref, Receiver}.

%% @spec recv(Stream :: get_stream(), Timeout :: int()) ->
Expand Down Expand Up @@ -51,6 +51,7 @@ nonblock_mr(Riak,Query,MapInput) ->

receiver_fun(MapInput, Parent, Map=#map{riak=Riak,offset=Offset,endoffset=EndOffset,ref=Ref}) ->
fun() ->
?debugFmt("receiver_fun(~p, ~p, ~p)~n", [MapInput, Parent, Map]),
Query = [{map,{qfun,map_fun()},Map#map{pid=self()}, false}],
nonblock_mr(Riak, Query, MapInput),
receive_loop(Ref, Offset, EndOffset, Parent)
Expand Down
6 changes: 4 additions & 2 deletions apps/luwak/src/luwak_io.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ internal_put_range(Riak, File, Start, Data) ->
0 ->
{ok, Written} = write_blocks(Riak, File, undefined, Start, Data, BlockSize, []),
{ok, NewFile} = luwak_tree:update(Riak, File, BlockAlignedStart, Written),
{ok, Written, NewFile};
{ok, NewFile1} = luwak_file:update_checksum(Riak, NewFile, fun() -> crypto:sha(Data) end),
{ok, Written, NewFile1};
BlockOffset ->
{ok, Block} = luwak_tree:block_at(Riak, File, Start),
{ok, Written} = write_blocks(Riak, File, Block, Start, Data, BlockSize, []),
{ok, NewFile} = luwak_tree:update(Riak, File, BlockAlignedStart, Written),
{ok, Written, NewFile}
{ok, NewFile1} = luwak_file:update_checksum(Riak, NewFile, fun() -> crypto:sha(Data) end),
{ok, Written, NewFile1}
end.

write_blocks(_, _, _, Start, <<>>, _, Written) when is_list(Written) ->
Expand Down
35 changes: 24 additions & 11 deletions apps/luwak/src/luwak_put_stream.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-module(luwak_put_stream).

-define(BUFFER_SIZE, 20).
-record(state, {file,offset,blocksize,ref,ttl,written=[],buffer=[],buffersize=0}).
-record(state, {file,offset,blocksize,ref,ttl,written=[],buffer=[],buffersize=0,checksumming=false,ctx=crypto:sha_init()}).

-include_lib("luwak/include/luwak.hrl").

Expand All @@ -19,16 +19,18 @@
start_link(Riak, File, Offset, TTL) ->
Ref = make_ref(),
BlockSize = luwak_file:get_property(File, block_size),
Checksumming = luwak_file:get_property(File, checksumming),
Pid = proc_lib:spawn_link(fun() ->
recv(Riak, #state{file=File,offset=Offset,blocksize=BlockSize,ref=Ref,ttl=TTL})
recv(Riak, #state{file=File,offset=Offset,blocksize=BlockSize,ref=Ref,ttl=TTL,checksumming=Checksumming})
end),
{put_stream, Ref, Pid}.

start(Riak, File, Offset, TTL) ->
Ref = make_ref(),
BlockSize = luwak_file:get_property(File, block_size),
Checksumming = luwak_file:get_property(File, checksumming),
Pid = proc_lib:spawn(fun() ->
recv(Riak, #state{file=File,offset=Offset,blocksize=BlockSize,ref=Ref,ttl=TTL})
recv(Riak, #state{file=File,offset=Offset,blocksize=BlockSize,ref=Ref,ttl=TTL,checksumming=Checksumming})
end),
{put_stream, Ref, Pid}.

Expand Down Expand Up @@ -97,36 +99,47 @@ handle_data(Riak, State=#state{file=File,offset=Offset,blocksize=BlockSize,buffe
PartialSize = BlockSize - Offset rem BlockSize,
<<PartialData:PartialSize/binary, TailData/binary>> = iolist_to_binary(lists:reverse(Buffer)),
{ok, [W]} = luwak_io:no_tree_put_range(Riak, File, Offset, PartialData),
update_tree(Riak, State#state{offset=Offset+PartialSize,buffer=[TailData],buffersize=byte_size(TailData),written=[W]});
update_tree(Riak, checksum(PartialData, State#state{offset=Offset+PartialSize,buffer=[TailData],buffersize=byte_size(TailData),written=[W]}));
handle_data(Riak, State=#state{file=File,offset=Offset,blocksize=BlockSize,buffer=Buffer,written=Written,buffersize=BufferSize}) when BufferSize >= BlockSize ->
BufferSize = iolist_size(Buffer),
PartialSize = BufferSize - BufferSize rem BlockSize,
<<PartialData:PartialSize/binary, TailData/binary>> = iolist_to_binary(lists:reverse(Buffer)),
{ok, Written1} = luwak_io:no_tree_put_range(Riak, File, Offset, PartialData),
update_tree(Riak, State#state{offset=Offset+PartialSize,buffer=[TailData],buffersize=byte_size(TailData),written=Written ++ Written1});
update_tree(Riak, checksum(PartialData, State#state{offset=Offset+PartialSize,buffer=[TailData],buffersize=byte_size(TailData),written=Written ++ Written1}));
handle_data(Riak, State) ->
update_tree(Riak, State).

update_tree(Riak, State=#state{offset=Offset,file=File,written=Written}) when length(Written) >= ?BUFFER_SIZE ->
OriginalOffset = Offset - luwak_tree_utils:blocklist_length(Written),
{ok, NewFile} = luwak_tree:update(Riak, File, OriginalOffset, Written),
State#state{file=NewFile};
update_checksum(Riak, State#state{file=NewFile});
update_tree(Riak, State) ->
State.

flush(Riak, State=#state{offset=Offset,file=File,buffer=Buffer,written=Written}) when length(Buffer) > 0 ->
?debugFmt("A flush(Riak, ~p)~n", [State]),
{ok, Written1} = luwak_io:no_tree_put_range(Riak, File, Offset, iolist_to_binary(lists:reverse(Buffer))),
Data = iolist_to_binary(lists:reverse(Buffer)),
{ok, Written1} = luwak_io:no_tree_put_range(Riak, File, Offset, Data),
WriteSize = luwak_tree_utils:blocklist_length(Written1),
flush(Riak, State#state{offset=Offset+WriteSize,buffer=[],buffersize=0,written=Written++Written1});
flush(Riak, checksum(Data, State#state{offset=Offset+WriteSize,buffer=[],buffersize=0,written=Written++Written1}));
flush(Riak, State=#state{offset=Offset,file=File,written=Written}) when length(Written) > 0 ->
?debugFmt("B flush(Riak, ~p)~n", [State]),
OriginalOffset = Offset - luwak_tree_utils:blocklist_length(Written),
{ok, NewFile} = luwak_tree:update(Riak, File, OriginalOffset, Written),
State#state{file=NewFile};
update_checksum(Riak, State#state{file=NewFile});
flush(Riak, State) ->
State.


checksum(_, State=#state{checksumming=false}) ->
State;
checksum(Data, State=#state{ctx=Ctx}) ->
State#state{ctx=crypto:sha_update(Ctx,Data)}.

update_checksum(_, State=#state{checksumming=false}) ->
State;
update_checksum(Riak, State=#state{ctx=Ctx,file=File}) ->
{ok, File1} = luwak_file:update_checksum(Riak,File,fun() -> crypto:sha_final(Ctx) end),
State#state{file=File1}.

close(Riak, State) ->
{closed, flush(Riak, State)}.

21 changes: 21 additions & 0 deletions apps/luwak/test/luwak_checksum_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-module(luwak_checksum_tests).

-include_lib("eunit/include/eunit.hrl").

one_off_checksum_test() ->
test_helper:riak_test(fun(Riak) ->
Sha = crypto:sha(<<"chilled monkey brains">>),
{ok, File} = luwak_file:create(Riak, <<"file1">>, [{tree_order,2},{block_size,2}], dict:new()),
{ok, _, File1} = luwak_io:put_range(Riak, File, 0, <<"chilled monkey brains">>),
timer:sleep(100),
Checksum = luwak_checksum:sha1(Riak, File1),
?assertEqual(Sha, Checksum)
end).

do_a_simple_checksum_test() ->
test_helper:riak_test(fun(Riak) ->
Sha = crypto:sha(<<"chilled monkey brains">>),
{ok, File} = luwak_file:create(Riak, <<"file1">>, [{checksumming,true}], dict:new()),
{ok, _, File1} = luwak_io:put_range(Riak, File, 0, <<"chilled monkey brains">>),
?assertEqual({sha1,Sha}, luwak_file:get_property(File1,checksum))
end).

0 comments on commit a790f15

Please sign in to comment.