Skip to content

Commit

Permalink
Move calculation of delete_block_remaining to riak_cs_delete_fsm:init/1
Browse files Browse the repository at this point in the history
complementary to #827, this commit prevents Manifest's delete_block_remaining
being undefined, after init/1 returns. Before prepare/2 very rarely but
there could be a message that interrupts and moves to some invalid state.
This also reduces number of states about delete_block_remaining being
undefined or not.
  • Loading branch information
kuenishi committed Apr 18, 2014
1 parent 1c1c6f4 commit 56e58bc
Showing 1 changed file with 54 additions and 44 deletions.
98 changes: 54 additions & 44 deletions src/riak_cs_delete_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,35 @@ block_deleted(Pid, Response) ->

init([RiakcPid, {UUID, Manifest}, _Options]) ->
{Bucket, Key} = Manifest?MANIFEST.bkey,
State = #state{bucket=Bucket,
key=Key,
manifest=Manifest,
uuid=UUID,
riakc_pid=RiakcPid},
{ok, prepare, State, 0}.

case blocks_to_delete_from_manifest(Manifest) of

{ok, {NewManifest, BlocksToDelete}} ->
BlockCount = case BlocksToDelete of
undefined ->
0;
_ ->
ordsets:size(BlocksToDelete)
end,

%% Handle the case where there are 0 blocks to delete,
%% i.e. content length of 0
case BlockCount > 0 of
true ->
State = #state{bucket=Bucket,
key=Key,
manifest=NewManifest,
total_blocks=BlockCount,
uuid=UUID,
riakc_pid=RiakcPid},
{ok, prepare, State, 0};
false ->
{stop, normal}
end;

{error, _Reason} ->
{stop, normal}
end.

%% @TODO Make sure we avoid any race conditions here
%% like we had in the other fsms.
Expand Down Expand Up @@ -161,34 +184,19 @@ deleting_state_result(_, State) ->
{next_state, atom(), state()}.
handle_receiving_manifest(State=#state{riakc_pid=RiakcPid,
manifest=Manifest}) ->
{NewManifest, BlocksToDelete} = blocks_to_delete_from_manifest(Manifest),
BlockCount = case BlocksToDelete of
undefined ->
0;
_ ->
ordsets:size(BlocksToDelete)
end,
%% Handle the case where there are 0 blocks to delete,
%% i.e. content length of 0
case BlockCount > 0 of
true ->
AllDeleteWorkers =
riak_cs_block_server:start_block_servers(RiakcPid,
riak_cs_lfs_utils:delete_concurrency()),
FreeDeleters = ordsets:from_list(AllDeleteWorkers),

NewState = State#state{manifest=NewManifest,
delete_blocks_remaining=BlocksToDelete,
all_delete_workers=AllDeleteWorkers,
free_deleters=FreeDeleters,
total_blocks=BlockCount},

StateAfterDeleteStart = maybe_delete_blocks(NewState),
AllDeleteWorkers =
riak_cs_block_server:start_block_servers(RiakcPid,
riak_cs_lfs_utils:delete_concurrency()),
FreeDeleters = ordsets:from_list(AllDeleteWorkers),

{next_state, deleting, StateAfterDeleteStart};
false ->
{stop, normal, State}
end.
NewState = State#state{manifest=Manifest,
all_delete_workers=AllDeleteWorkers,
free_deleters=FreeDeleters},

StateAfterDeleteStart = maybe_delete_blocks(NewState),

{next_state, deleting, StateAfterDeleteStart}.

maybe_delete_blocks(State=#state{free_deleters=[]}) ->
State;
Expand Down Expand Up @@ -240,20 +248,22 @@ manifest_cleanup(_, _, _, _, _) ->
blocks_to_delete_from_manifest(Manifest=?MANIFEST{state=State,
delete_blocks_remaining=undefined})
when State =:= pending_delete;State =:= writing; State =:= scheduled_delete ->
case riak_cs_lfs_utils:block_sequences_for_manifest(Manifest) of
[]=Blocks ->
UpdManifest = Manifest?MANIFEST{delete_blocks_remaining=[],
Blocks = riak_cs_lfs_utils:block_sequences_for_manifest(Manifest),
UpdManifest = case Blocks of
[] ->
Manifest?MANIFEST{delete_blocks_remaining=[],
state=deleted};
Blocks ->
UpdManifest = Manifest?MANIFEST{delete_blocks_remaining=Blocks}
end,
{UpdManifest, Blocks};
blocks_to_delete_from_manifest(Manifest=?MANIFEST{delete_blocks_remaining=undefined}) ->
Blocks ->
Manifest?MANIFEST{delete_blocks_remaining=Blocks}
end,
{ok, {UpdManifest, Blocks}};
blocks_to_delete_from_manifest(Manifest=?MANIFEST{state=State,
delete_blocks_remaining=undefined}) ->
lager:warning("Manifest in invalid state ~p", [Manifest]),
{Manifest, undefined};
blocks_to_delete_from_manifest(Manifest) ->
{Manifest,
Manifest?MANIFEST.delete_blocks_remaining}.
{error, {invalid_manifest_state, State}};
blocks_to_delete_from_manifest(Manifest=?MANIFEST{state=State})
when State =/= active ->
{ok, {Manifest, Manifest?MANIFEST.delete_blocks_remaining}}.

%% ===================================================================
%% Test API
Expand Down

0 comments on commit 56e58bc

Please sign in to comment.