Skip to content

Commit

Permalink
Skip invalid state manifests in GC bucket and add warning log
Browse files Browse the repository at this point in the history
See #827

Logged information includes:

- Key in GC bucket
- UUID (in manifest)
- CS bucket and key (also in manifest)

It's noteworthy that if riak_cs_delete_fsm replies ok with the same
total blocks and deleted blocks to riak_cs_gc_worker, then the worker
attempt to delete manifest entry in twop_set.  Doing it before
deleting manfiests and blocks completely poses orphan
manifests/blocks.  In this PR, riak_cs_delete_fsm replies with total
blocks = 1 and deleted_blocks = 0.

Conflicts:
	src/riak_cs_delete_fsm.erl
Add GC riak_test for invalid state manifet in GC bucket
  • Loading branch information
shino committed Sep 5, 2014
1 parent 2311e6c commit d7c6d07
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 31 deletions.
83 changes: 54 additions & 29 deletions src/riak_cs_delete_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
-endif.

%% API
-export([start_link/4,
-export([start_link/5,
block_deleted/2]).

%% gen_fsm callbacks
Expand All @@ -51,6 +51,9 @@
manifest :: lfs_manifest(),
riak_client :: riak_client(),
gc_worker_pid :: pid(),
%% Key in GC bucket which this manifest belongs to.
%% Set only once at init and unchanged. Used only for logs.
gc_key :: binary(),
delete_blocks_remaining :: ordsets:ordset({binary(), integer()}),
unacked_deletes=ordsets:new() :: ordsets:ordset(integer()),
all_delete_workers=[] :: list(pid()),
Expand All @@ -65,8 +68,8 @@
%% ===================================================================

%% @doc Start a `riak_cs_delete_fsm'.
start_link(RcPid, Manifest, GCWorkerPid, Options) ->
Args = [RcPid, Manifest, GCWorkerPid, Options],
start_link(RcPid, Manifest, GCWorkerPid, GCKey, Options) ->
Args = [RcPid, Manifest, GCWorkerPid, GCKey, Options],
gen_fsm:start_link(?MODULE, Args, []).

-spec block_deleted(pid(), {ok, {binary(), integer()}} | {error, binary()}) -> ok.
Expand All @@ -77,14 +80,15 @@ block_deleted(Pid, Response) ->
%% gen_fsm callbacks
%% ====================================================================

init([RcPid, {UUID, Manifest}, GCWorkerPid, _Options]) ->
init([RcPid, {UUID, Manifest}, GCWorkerPid, GCKey, _Options]) ->
{Bucket, Key} = Manifest?MANIFEST.bkey,
State = #state{bucket=Bucket,
key=Key,
manifest=Manifest,
uuid=UUID,
riak_client=RcPid,
gc_worker_pid=GCWorkerPid},
gc_worker_pid=GCWorkerPid,
gc_key=GCKey},
{ok, prepare, State, 0}.

%% @TODO Make sure we avoid any race conditions here
Expand Down Expand Up @@ -160,15 +164,31 @@ deleting_state_result(_, State) ->
{next_state, deleting, UpdState}.

-spec handle_receiving_manifest(state()) ->
{next_state, atom(), state()}.
handle_receiving_manifest(State=#state{riak_client=RcPid,
manifest=Manifest}) ->
{NewManifest, BlocksToDelete} = blocks_to_delete_from_manifest(Manifest),
BlockCount = ordsets:size(BlocksToDelete),
NewState = State#state{manifest=NewManifest,
delete_blocks_remaining=BlocksToDelete,
total_blocks=BlockCount},
{next_state, atom(), state()} | {stop, normal, state()}.
handle_receiving_manifest(State=#state{manifest=Manifest,
gc_key=GCKey}) ->
case blocks_to_delete_from_manifest(Manifest) of
{ok, {NewManifest, BlocksToDelete}} ->
BlockCount = ordsets:size(BlocksToDelete),
NewState = State#state{manifest=NewManifest,
delete_blocks_remaining=BlocksToDelete,
total_blocks=BlockCount},
start_block_servers(NewState);
{error, invalid_state} ->
_ = lager:warning("Invalid state manifest in GC bucket at ~p: ~p",
[GCKey, Manifest]),
%% If total blocks and deleted blocks are the same,
%% gc worker attempt to delete the manifest in fileset.
%% Then manifests and blocks becomes orphan.
%% To avoid it, set total_blocks > 0 here.
{stop, normal, State#state{total_blocks=1}}
end.

-spec start_block_servers(state()) -> {next_state, atom(), state()} |
{stop, normal, state()}.
start_block_servers(#state{riak_client=RcPid,
manifest=Manifest,
delete_blocks_remaining=BlocksToDelete} = State) ->
%% Handle the case where there are 0 blocks to delete,
%% i.e. content length of 0,
%% and can not check-out any workers.
Expand All @@ -180,16 +200,16 @@ handle_receiving_manifest(State=#state{riak_client=RcPid,
riak_cs_lfs_utils:delete_concurrency()),
case length(AllDeleteWorkers) of
0 ->
{stop, normal, NewState};
{stop, normal, State};
_ ->
FreeDeleters = ordsets:from_list(AllDeleteWorkers),
NewState1 = NewState#state{all_delete_workers=AllDeleteWorkers,
free_deleters=FreeDeleters},
StateAfterDeleteStart = maybe_delete_blocks(NewState1),
NewState = State#state{all_delete_workers=AllDeleteWorkers,
free_deleters=FreeDeleters},
StateAfterDeleteStart = maybe_delete_blocks(NewState),
{next_state, deleting, StateAfterDeleteStart}
end;
false ->
{stop, normal, NewState}
{stop, normal, State}
end.

maybe_delete_blocks(State=#state{free_deleters=[]}) ->
Expand Down Expand Up @@ -240,21 +260,26 @@ manifest_cleanup(_, _, _, _, _) ->
ok.

-spec blocks_to_delete_from_manifest(lfs_manifest()) ->
{lfs_manifest(), ordsets:ordset(integer())}.
{ok, {lfs_manifest(), ordsets:ordset(integer())}} |
{error, term()}.
blocks_to_delete_from_manifest(Manifest=?MANIFEST{state=State,
delete_blocks_remaining=undefined})
when State =:= pending_delete;State =:= writing; State =:= scheduled_delete ->
case riak_cs_lfs_utils:block_sequences_for_manifest(Manifest) of
[]=Blocks ->
UpdManifest = Manifest?MANIFEST{delete_blocks_remaining=[],
state=deleted};
Blocks ->
UpdManifest = Manifest?MANIFEST{delete_blocks_remaining=Blocks}
end,
{UpdManifest, Blocks};
{UpdState, Blocks} =
case riak_cs_lfs_utils:block_sequences_for_manifest(Manifest) of
[] ->
{deleted, ordsets:new()};
BlockSequence ->
{State, BlockSequence}

end,
UpdManifest = Manifest?MANIFEST{delete_blocks_remaining=Blocks,
state=UpdState},
{ok, {UpdManifest, Blocks}};
blocks_to_delete_from_manifest(?MANIFEST{delete_blocks_remaining=undefined}) ->
{error, invalid_state};
blocks_to_delete_from_manifest(Manifest) ->
{Manifest,
Manifest?MANIFEST.delete_blocks_remaining}.
{ok, {Manifest, Manifest?MANIFEST.delete_blocks_remaining}}.

%% ===================================================================
%% Test API
Expand Down
5 changes: 3 additions & 2 deletions src/riak_cs_gc_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,14 @@ initiating_file_delete(continue, ?STATE{batch=[_ManiSetKey | RestKeys],
ok = continue(),
{next_state, fetching_next_fileset, State?STATE{batch=RestKeys,
batch_count=1+BatchCount}};
initiating_file_delete(continue, ?STATE{current_files=[Manifest | _RestManifests],
initiating_file_delete(continue, ?STATE{batch=[CurrentFileSetKey | _],
current_files=[Manifest | _RestManifests],
riak_client=RcPid}=State) ->
%% Use an instance of `riak_cs_delete_fsm' to handle the
%% deletion of the file blocks.
%% Don't worry about delete_fsm failures. Manifests are
%% rescheduled after a certain time.
Args = [RcPid, Manifest, self(), []],
Args = [RcPid, Manifest, self(), CurrentFileSetKey, []],
%% The delete FSM is hard-coded to send a sync event to our registered
%% name upon terminate(), so we do not have to pass our pid to it
%% in order to get a reply.
Expand Down

0 comments on commit d7c6d07

Please sign in to comment.