diff --git a/include/riak_cs_gc_d.hrl b/include/riak_cs_gc.hrl
similarity index 87%
rename from include/riak_cs_gc_d.hrl
rename to include/riak_cs_gc.hrl
index 4670cceea..87aaf7265 100644
--- a/include/riak_cs_gc_d.hrl
+++ b/include/riak_cs_gc.hrl
@@ -25,33 +25,20 @@
%% The name is general so declare local type for readability.
-type index_result_keys() :: keys().
--record(gc_d_state, {
- interval :: 'infinity' | non_neg_integer(),
- %% the last time a deletion was scheduled
- last :: undefined | non_neg_integer(),
- %% the next scheduled gc time
- next :: undefined | non_neg_integer(),
+-record(gc_batch_state, {
%% start of the current gc interval
batch_start :: undefined | non_neg_integer(),
- %% caller of manual_batch
- %% Currently only used in `riak_cs_gc_single_run_eqc`.
- batch_caller :: undefined | pid(),
batch_count=0 :: non_neg_integer(),
%% Count of filesets skipped in this batch
batch_skips=0 :: non_neg_integer(),
batch=[] :: undefined | [index_result_keys()], % `undefined' only for testing
manif_count=0 :: non_neg_integer(),
block_count=0 :: non_neg_integer(),
- %% state of the fsm when a delete batch was paused
- pause_state :: undefined | atom(),
%% used when moving from paused -> idle
interval_remaining :: undefined | non_neg_integer(),
- timer_ref :: reference(),
- initial_delay :: non_neg_integer(),
leeway :: non_neg_integer(),
worker_pids=[] :: [pid()],
max_workers :: non_neg_integer(),
- active_workers=0 :: non_neg_integer(),
%% Used for paginated 2I querying of GC bucket
key_list_state :: undefined | gc_key_list_state(),
%% Options to use when start workers
@@ -106,3 +93,14 @@
-define(DEFAULT_GC_BATCH_SIZE, 1000).
-define(DEFAULT_GC_WORKERS, 2).
-define(EPOCH_START, <<"0">>).
+
+-record(gc_manager_state, {
+ next :: undefined | non_neg_integer(),
+ gc_batch_pid :: undefined | pid(),
+ batch_history = [] :: list(#gc_batch_state{}),
+ current_batch :: undefined | #gc_batch_state{},
+ interval = ?DEFAULT_GC_INTERVAL:: non_neg_integer() | infinity,
+ initial_delay :: non_neg_integer(),
+ timer_ref :: undefined | reference()
+ }).
+
diff --git a/src/riak_cs_config.erl b/src/riak_cs_config.erl
index de1eda371..7ca6e18a4 100644
--- a/src/riak_cs_config.erl
+++ b/src/riak_cs_config.erl
@@ -101,7 +101,7 @@
os_tokens_url/0,
os_users_url/0]).
--include("riak_cs_gc_d.hrl").
+-include("riak_cs_gc.hrl").
-include("oos_api.hrl").
-include("s3_api.hrl").
-include("list_objects.hrl").
diff --git a/src/riak_cs_gc.erl b/src/riak_cs_gc.erl
index b518f314b..e28fa622d 100644
--- a/src/riak_cs_gc.erl
+++ b/src/riak_cs_gc.erl
@@ -22,7 +22,7 @@
-module(riak_cs_gc).
--include("riak_cs_gc_d.hrl").
+-include("riak_cs_gc.hrl").
-ifdef(TEST).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
@@ -202,7 +202,7 @@ handle_move_result({error, _Reason}=Error, _RiakObject, _Bucket, _Key, _PDUUIDs,
%% @doc Return the number of seconds to wait after finishing garbage
%% collection of a set of files before starting the next.
--spec gc_interval() -> non_neg_integer().
+-spec gc_interval() -> non_neg_integer() | infinity.
gc_interval() ->
case application:get_env(riak_cs, gc_interval) of
undefined ->
diff --git a/src/riak_cs_gc_batch.erl b/src/riak_cs_gc_batch.erl
new file mode 100644
index 000000000..f28a8529f
--- /dev/null
+++ b/src/riak_cs_gc_batch.erl
@@ -0,0 +1,303 @@
+%% ---------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% ---------------------------------------------------------------------
+
+%% @doc The process that handles garbage collection of deleted file
+%% manifests and blocks.
+%%
+%% Simpler State Diagram
+%%
+%% init -> waiting_for_workers --(batch_complete)--> stop
+%% ^ |
+%% +--------------------+
+
+-module(riak_cs_gc_batch).
+
+-behaviour(gen_fsm).
+
+%% API
+-export([start_link/1,
+ current_state/1,
+ status_data/1,
+ stop/1]).
+
+%% gen_fsm callbacks
+-export([init/1,
+ prepare/2,
+ prepare/3,
+ waiting_for_workers/2,
+ waiting_for_workers/3,
+ handle_event/3,
+ handle_sync_event/4,
+ handle_info/3,
+ terminate/3,
+ code_change/4]).
+
+-include("riak_cs_gc.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+-endif.
+
+-define(SERVER, ?MODULE).
+-define(STATE, #gc_batch_state).
+
+-define(GC_WORKER, riak_cs_gc_worker).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%% @doc Start the garbage collection server
+start_link(Options) ->
+ gen_fsm:start_link({local, ?SERVER}, ?MODULE, [Options], []).
+
+current_state(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid, current_state, infinity).
+
+%% @doc Stop the daemon
+-spec stop(pid()) -> ok | {error, term()}.
+stop(Pid) ->
+ gen_fsm:sync_send_all_state_event(Pid, stop, infinity).
+
+%%%===================================================================
+%%% gen_fsm callbacks
+%%%===================================================================
+
+%% @doc Read the storage schedule and go to idle.
+
+init([State]) ->
+ {ok, prepare, State, 0}.
+
+has_batch_finished(?STATE{worker_pids=[],
+ batch=[],
+ key_list_state=KeyListState} = _State) ->
+ case KeyListState of
+ undefined -> true;
+ _ -> not riak_cs_gc_key_list:has_next(KeyListState)
+ end;
+has_batch_finished(_) ->
+ false.
+
+%% Asynchronous events
+
+prepare(timeout, State) ->
+ State1 = fetch_first_keys(State),
+ NextState = maybe_start_workers(State1),
+ case has_batch_finished(NextState) of
+ true ->
+ {stop, normal, NextState};
+ _ ->
+ {next_state, waiting_for_workers, NextState}
+ end.
+
+%% @doc This state initiates the deletion of a file from
+%% a set of manifests stored for a particular key in the
+%% garbage collection bucket.
+waiting_for_workers(_Msg, State) ->
+ {next_state, waiting_for_workers, State}.
+
+%% Synchronous events
+
+%% Some race condition?
+prepare(_, _, State) ->
+ {reply, {error, preparing}, prepare, State, 0}.
+
+waiting_for_workers(_Msg, _From, State) ->
+ {reply, ok, waiting_for_workers, State}.
+
+%% @doc there are no all-state events for this fsm
+handle_event({batch_complete, WorkerPid, WorkerState}, StateName, State0) ->
+ %%?debugFmt("~w", [State0]),%% WorkerState#gc_worker_state.batch_count,
+ State = handle_batch_complete(WorkerPid, WorkerState, State0),
+ %%?debugFmt("StateName ~p, ~p ~w", [StateName, has_batch_finished(State), State]),
+ case {has_batch_finished(State), StateName} of
+ {true, _} ->
+ {stop, normal, State};
+ {false, waiting_for_workers} ->
+ State2 = maybe_start_workers(State),
+ {next_state, waiting_for_workers, State2}
+ end;
+handle_event(_Event, StateName, State) ->
+ {next_state, StateName, State}.
+
+%% @doc Handle synchronous events that should be handled
+%% the same regardless of the current state.
+-spec handle_sync_event(term(), term(), atom(), ?STATE{}) ->
+ {reply, term(), atom(), ?STATE{}}.
+handle_sync_event(current_state, _From, StateName, State) ->
+ {reply, {StateName, State}, StateName, State};
+handle_sync_event(stop, _From, _StateName, State) ->
+ _ = cancel_batch(State),
+ {stop, cancel, {ok, State}, State};
+handle_sync_event(_Event, _From, StateName, State) ->
+ ok_reply(StateName, State).
+
+handle_info(_Info, StateName, State) ->
+ {next_state, StateName, State}.
+
+%% @doc TODO: log warnings if this fsm is asked to terminate in the
+%% middle of running a gc batch
+terminate(normal, _StateName, State) ->
+ _ = lager:info("Finished garbage collection: "
+ "~b seconds, ~p batch_count, ~p batch_skips, "
+ "~p manif_count, ~p block_count\n",
+ [elapsed(State?STATE.batch_start), State?STATE.batch_count,
+ State?STATE.batch_skips, State?STATE.manif_count,
+ State?STATE.block_count]),
+ riak_cs_gc_manager:finished(State);
+terminate(_Reason, _StateName, _State) ->
+ ok.
+
+%% @doc this fsm has no special upgrade process
+code_change(_OldVsn, StateName, State, _Extra) ->
+ {ok, StateName, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+fetch_first_keys(?STATE{batch_start=BatchStart,
+ leeway=Leeway} = State) ->
+
+ %% [Fetch the first set of manifests for deletion]
+ %% this does not check out a worker from the riak connection pool;
+ %% instead it creates a fresh new worker, the idea being that we
+ %% don't want to delay deletion just because the normal request
+ %% pool is empty; pool workers just happen to be literally the
+ %% socket process, so "starting" one here is the same as opening a
+ %% connection, and avoids duplicating the configuration lookup
+ %% code.
+ {KeyListRes, KeyListState} =
+ riak_cs_gc_key_list:new(BatchStart, Leeway),
+ #gc_key_list_result{bag_id=BagId, batch=Batch} = KeyListRes,
+ _ = lager:debug("Initial batch keys: ~p", [Batch]),
+ State?STATE{batch=Batch,
+ key_list_state=KeyListState,
+ bag_id=BagId}.
+
+%% @doc Handle a `batch_complete' event from a GC worker process.
+-spec handle_batch_complete(pid(), #gc_worker_state{}, ?STATE{}) -> ?STATE{}.
+handle_batch_complete(WorkerPid, WorkerState, State) ->
+ ?STATE{
+ worker_pids=WorkerPids,
+ batch_count=BatchCount,
+ batch_skips=BatchSkips,
+ manif_count=ManifestCount,
+ block_count=BlockCount} = State,
+ #gc_worker_state{batch_count=WorkerBatchCount,
+ batch_skips=WorkerBatchSkips,
+ manif_count=WorkerManifestCount,
+ block_count=WorkerBlockCount} = WorkerState,
+ _ = lager:debug("~p completed (~p)", [WorkerPid, WorkerState]),
+ UpdWorkerPids = lists:delete(WorkerPid, WorkerPids),
+ %% @TODO Workout the terminiology for these stats. i.e. Is batch
+ %% count just an increment or represenative of something else.
+ State?STATE{
+ worker_pids=UpdWorkerPids,
+ batch_count=BatchCount + WorkerBatchCount,
+ batch_skips=BatchSkips + WorkerBatchSkips,
+ manif_count=ManifestCount + WorkerManifestCount,
+ block_count=BlockCount + WorkerBlockCount}.
+
+%% @doc Start a GC worker and return the apprpriate next state and
+%% updated state record.
+-spec start_worker(?STATE{}) -> ?STATE{}.
+start_worker(?STATE{batch=[NextBatch|RestBatches],
+ bag_id=BagId,
+ worker_pids=WorkerPids} = State) ->
+ case ?GC_WORKER:start_link(BagId, NextBatch) of
+ {ok, Pid} ->
+ _ = lager:debug("GC worker ~p for bag ~p has started", [Pid, BagId]),
+ State?STATE{batch=RestBatches,
+ worker_pids=[Pid | WorkerPids]};
+ {error, _Reason} ->
+ State
+ end.
+
+%% @doc Cancel the current batch of files set for garbage collection.
+-spec cancel_batch(?STATE{}) -> any().
+cancel_batch(?STATE{batch_start=BatchStart,
+ worker_pids=WorkerPids}=_State) ->
+ %% Interrupt the batch of deletes
+ _ = lager:info("Canceled garbage collection batch after ~b seconds.",
+ [elapsed(BatchStart)]),
+ [riak_cs_gc_worker:stop(P) || P <- WorkerPids].
+
+-spec ok_reply(atom(), ?STATE{}) -> {reply, ok, atom(), ?STATE{}}.
+ok_reply(NextState, NextStateData) ->
+ {reply, ok, NextState, NextStateData}.
+
+maybe_start_workers(?STATE{max_workers=MaxWorkers,
+ worker_pids=WorkerPids} = State)
+ when MaxWorkers =:= length(WorkerPids) ->
+ State;
+maybe_start_workers(?STATE{max_workers=MaxWorkers,
+ worker_pids=WorkerPids,
+ key_list_state=undefined,
+ batch=[]} = State)
+ when MaxWorkers > length(WorkerPids) ->
+ State;
+maybe_start_workers(?STATE{max_workers=MaxWorkers,
+ worker_pids=WorkerPids,
+ key_list_state=KeyListState,
+ batch=[]} = State)
+ when MaxWorkers > length(WorkerPids) ->
+ %% Fetch the next set of manifests for deletion
+ {KeyListRes, UpdKeyListState} = riak_cs_gc_key_list:next(KeyListState),
+ #gc_key_list_result{bag_id=BagId, batch=Batch} = KeyListRes,
+ _ = lager:debug("Next batch keys: ~p", [Batch]),
+ State2 = State?STATE{batch=Batch,
+ key_list_state=UpdKeyListState,
+ bag_id=BagId},
+ case UpdKeyListState of
+ undefined -> State2;
+ _ -> maybe_start_workers(State2)
+ end;
+maybe_start_workers(?STATE{max_workers=MaxWorkers,
+ worker_pids=WorkerPids,
+ batch=Batch} = State)
+ when MaxWorkers > length(WorkerPids) ->
+ _ = lager:debug("Batch: ~p", [Batch, WorkerPids]),
+ State2 = start_worker(State),
+ maybe_start_workers(State2).
+
+-spec status_data(?STATE{}) -> [{atom(), term()}].
+status_data(State) ->
+ [{current, State?STATE.batch_start},
+ {elapsed, elapsed(State?STATE.batch_start)},
+ {files_deleted, State?STATE.batch_count},
+ {files_skipped, State?STATE.batch_skips},
+ {files_left, if is_list(State?STATE.batch) -> length(State?STATE.batch);
+ true -> 0
+ end}].
+
+%% @doc How many seconds have passed from `Time' to now.
+-spec elapsed(undefined | non_neg_integer()) -> non_neg_integer().
+elapsed(undefined) ->
+ riak_cs_gc:timestamp();
+elapsed(Time) ->
+ Now = riak_cs_gc:timestamp(),
+ case (Diff = Now - Time) > 0 of
+ true ->
+ Diff;
+ false ->
+ 0
+ end.
diff --git a/src/riak_cs_gc_console.erl b/src/riak_cs_gc_console.erl
index 663e9cb30..1e35bb85d 100644
--- a/src/riak_cs_gc_console.erl
+++ b/src/riak_cs_gc_console.erl
@@ -22,6 +22,8 @@
-module(riak_cs_gc_console).
+-export([human_time/1]).
+
-export([batch/1,
status/1,
pause/1,
@@ -61,11 +63,14 @@ status(_Opts) ->
cancel(_Opts) ->
?SAFELY(cancel_batch(), "Canceling the garbage collection batch").
-pause(_Opts) ->
- ?SAFELY(pause(), "Pausing the garbage collection daemon").
+pause(_) ->
+ output("Warning: Subcommand 'pause' will be removed in future version."),
+ _ = riak_cs_gc_manager:set_interval(infinity),
+ cancel([]).
-resume(_Opts) ->
- ?SAFELY(resume(), "Resuming the garbage collection daemon").
+resume(_) ->
+ output("Warning: Subcommand 'resume' will be removed in future version."),
+ set_interval(riak_cs_gc:gc_interval()).
'set-interval'(Opts) ->
?SAFELY(set_interval(parse_interval_opts(Opts)), "Setting the garbage collection interval").
@@ -78,19 +83,13 @@ resume(_Opts) ->
%%%===================================================================
start_batch(Options) ->
- handle_batch_start(riak_cs_gc_d:manual_batch(Options)).
+ handle_batch_start(riak_cs_gc_manager:start_batch(Options)).
get_status() ->
- handle_status(riak_cs_gc_d:status()).
+ handle_status(riak_cs_gc_manager:pp_status()).
cancel_batch() ->
- handle_batch_cancellation(riak_cs_gc_d:cancel_batch()).
-
-pause() ->
- handle_pause(riak_cs_gc_d:pause()).
-
-resume() ->
- handle_resumption(riak_cs_gc_d:resume()).
+ handle_batch_cancellation(riak_cs_gc_manager:cancel_batch()).
set_interval(undefined) ->
output("Error: No interval value specified"),
@@ -99,7 +98,7 @@ set_interval({'EXIT', _}) ->
output("Error: Invalid interval specified."),
error;
set_interval(Interval) ->
- case riak_cs_gc_d:set_interval(Interval) of
+ case riak_cs_gc_manager:set_interval(Interval) of
ok ->
output("The garbage collection interval was updated."),
ok;
@@ -127,56 +126,29 @@ set_leeway(Leeway) ->
handle_batch_start(ok) ->
output("Garbage collection batch started."),
ok;
-handle_batch_start({error, already_deleting}) ->
- output("Error: A garbage collection batch"
- " is already in progress."),
- error;
-handle_batch_start({error, already_paused}) ->
- output("The garbage collection daemon was already paused."),
+handle_batch_start({error, running}) ->
+ output("The garbage collection daemon is already running."),
error.
handle_status({ok, {State, Details}}) ->
- print_status(State, Details),
+ _ = print_state(State),
+ _ = print_details(Details),
ok.
handle_batch_cancellation(ok) ->
output("The garbage collection batch was canceled.");
-handle_batch_cancellation({error, no_batch}) ->
+handle_batch_cancellation({error, idle}) ->
output("No garbage collection batch was running."),
error.
-handle_pause(ok) ->
- output("The garbage collection daemon was paused."),
- ok;
-handle_pause({error, already_paused}) ->
- output("The garbage collection daemon was already paused."),
- error.
-
-handle_resumption(ok) ->
- output("The garbage collection daemon was resumed."),
- ok;
-handle_resumption({error, not_paused}) ->
- output("The garbage collection daemon was not paused."),
- error.
-
output(Output) ->
io:format(Output ++ "~n").
-print_status(State, Details) ->
- _ = print_state(State),
- _ = print_details(Details),
- ok.
-
+-spec print_state(riak_cs_gc_manager:statename()) -> ok.
print_state(idle) ->
output("There is no garbage collection in progress");
-print_state(fetching_next_batch) ->
- output("A garbage collection batch is in progress");
-print_state(feeding_workers) ->
- output("A garbage collection batch is in progress");
-print_state(waiting_for_workers) ->
- output("A garbage collection batch is in progress");
-print_state(paused) ->
- output("A garbage collection batch is currently paused").
+print_state(running) ->
+ output("A garbage collection batch is in progress").
%% @doc Pretty-print the status returned from the gc daemon.
print_details(Details) ->
diff --git a/src/riak_cs_gc_d.erl b/src/riak_cs_gc_d.erl
deleted file mode 100644
index 0b8ba3d4a..000000000
--- a/src/riak_cs_gc_d.erl
+++ /dev/null
@@ -1,628 +0,0 @@
-%% ---------------------------------------------------------------------
-%%
-%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License. You may obtain
-%% a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied. See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-%% ---------------------------------------------------------------------
-
-%% @doc The daemon that handles garbage collection of deleted file
-%% manifests and blocks.
-%%
-%% @TODO Differences in the fsm state and the state record
-%% get confusing. Maybe s/State/StateData.
-
-%% State transitions by event (not including transitions by command)
-%% 1. There is outer loop of `idle' <==> `fetching_next_batch'.
-%% Single GC run starts by `idle'->`fetching_next_batch' and ends by
-%% `fetching_next_batch'->`idle'.
-%% 2. `fetching_next_batch', `feeding_workers' and `waiting_for_workers'
-%% make up inner loop. If there is a room for workers, this process fetches
-%% fileset keys by 2i. Then it moves to `feeding_workers' and feeds sets of
-%% fileset keys to workers. If workers are full or 2i reaches the end,
-%% it collects results from workers at `waiting_for_workers'.
-%%
-%% idle <---------------+ Extra:{paused, StateToResume}
-%% | |
-%% [manual_batch or timer] | [no fileset keys AND
-%% | | no worker]
-%% V |
-%% fetching_next_batch ------+
-%% | ^
-%% | |
-%% | +----------------------+
-%% [no fileset keys] | |
-%% +----------------+ |
-%% | | [no fileset keys]
-%% | [fileset keys] |
-%% | | |
-%% | feeding_workers ---------------------+
-%% | | ^ |
-%% | | | |
-%% | | +---------+--+ |
-%% | | | | |
-%% | | [more workers AND |
-%% | +--------+ more fileset keys] |
-%% | | | | | |
-%% | [all active] | | | |
-%% | | +---------------+ | |
-%% | V | |
-%% +--> waiting_for_workers -----------+------------+
-
--module(riak_cs_gc_d).
-
--behaviour(gen_fsm).
-
-%% API
--export([start_link/0,
- status/0,
- manual_batch/1,
- cancel_batch/0,
- pause/0,
- resume/0,
- set_interval/1,
- stop/0]).
-
-%% gen_fsm callbacks
--export([init/1,
- idle/2,
- idle/3,
- fetching_next_batch/2,
- fetching_next_batch/3,
- feeding_workers/2,
- feeding_workers/3,
- waiting_for_workers/2,
- waiting_for_workers/3,
- paused/2,
- paused/3,
- handle_event/3,
- handle_sync_event/4,
- handle_info/3,
- terminate/3,
- code_change/4]).
-
--export([current_state/0]).
-
--include("riak_cs_gc_d.hrl").
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
-%% Test API
--export([test_link/0,
- test_link/1,
- change_state/1,
- status_data/1]).
-
--endif.
-
--define(SERVER, ?MODULE).
--define(STATE, #gc_d_state).
--define(GC_WORKER, riak_cs_gc_worker).
-
-%%%===================================================================
-%%% API
-%%%===================================================================
-
-%% @doc Start the garbage collection server
-start_link() ->
- gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-%% @doc Status is returned as a 2-tuple of `{State, Details}'. State
-%% should be `idle', `deleting', or `paused'. When `idle' the
-%% details (a proplist) will include the schedule, as well as the
-%% times of the last GC and the next planned GC.
-%% When `deleting' or `paused' details also the scheduled time of
-%% the active GC, the number of seconds the process has been
-%% running so far, and counts of how many filesets have been
-%% processed.
-status() ->
- gen_fsm:sync_send_event(?SERVER, status, infinity).
-
-%% @doc Force a garbage collection sweep manually.
-%%
-%% Allowed options are:
-%%
-%% - `testing'
-%% - Indicate the daemon is started as part of a test suite.
-%%
-manual_batch(Options) ->
- gen_fsm:sync_send_event(?SERVER, {manual_batch, Options}, infinity).
-
-%% @doc Cancel the garbage collection currently in progress. Returns `ok' if
-%% a batch was canceled, or `{error, no_batch}' if there was no batch
-%% in progress.
-cancel_batch() ->
- gen_fsm:sync_send_event(?SERVER, cancel_batch, infinity).
-
-%% @doc Pause the garbage collection daemon. Returns `ok' if
-%% the daemon was paused, or `{error, already_paused}' if the daemon
-%% was already paused.
-pause() ->
- gen_fsm:sync_send_event(?SERVER, pause, infinity).
-
-%% @doc Resume the garbage collection daemon. Returns `ok' if the
-%% daemon was resumed, or `{error, not_paused}' if the daemon was
-%% not paused.
-resume() ->
- gen_fsm:sync_send_event(?SERVER, resume, infinity).
-
-%% @doc Adjust the interval at which the daemon attempts to perform
-%% a garbage collection sweep. Setting the interval to a value of
-%% `infinity' effectively disable garbage collection. The daemon still
-%% runs, but does not carry out any file deletion.
--spec set_interval(term()) -> ok | {error, term()}.
-set_interval(Interval) ->
- gen_fsm:sync_send_all_state_event(?SERVER, {set_interval, Interval}, infinity).
-
-%% @doc Stop the daemon
--spec stop() -> ok | {error, term()}.
-stop() ->
- gen_fsm:sync_send_all_state_event(?SERVER, stop, infinity).
-
-%%%===================================================================
-%%% gen_fsm callbacks
-%%%===================================================================
-
-%% @doc Read the storage schedule and go to idle.
-
-init(Args) ->
- Interval = riak_cs_gc:gc_interval(),
- InitialDelay = riak_cs_gc:initial_gc_delay(),
- MaxWorkers = riak_cs_gc:gc_max_workers(),
- Testing = lists:member(testing, Args),
- SchedState = schedule_next(?STATE{interval=Interval,
- initial_delay=InitialDelay,
- max_workers=MaxWorkers,
- testing=Testing}),
- {ok, idle, SchedState}.
-
-%% Asynchronous events
-
-%% @doc Transitions out of idle are all synchronous events
-idle(_, State=?STATE{interval_remaining=undefined}) ->
- {next_state, idle, State};
-idle(_, State=?STATE{interval_remaining=IntervalRemaining}) ->
- TimerRef = erlang:send_after(IntervalRemaining,
- self(),
- {start_batch, riak_cs_gc:leeway_seconds()}),
- {next_state, idle, State?STATE{timer_ref=TimerRef}}.
-
-%% @doc Async transitions from `fetching_next_batch' are all due to
-%% messages the FSM sends itself, in order to have opportunities to
-%% handle messages from the outside world (like `status').
-fetching_next_batch(_, State=?STATE{batch=undefined}) ->
- %% This clause is for testing only
- {next_state, fetching_next_batch, State};
-fetching_next_batch(continue, ?STATE{batch_start=undefined,
- leeway=Leeway}=State) ->
- BatchStart = riak_cs_gc:timestamp(),
- %% Fetch the next set of manifests for deletion
- {KeyListRes, KeyListState} =
- riak_cs_gc_key_list:new(BatchStart, Leeway),
- #gc_key_list_result{bag_id=BagId, batch=Batch} = KeyListRes,
- _ = lager:debug("Initial batch keys: ~p", [Batch]),
- NewStateData = State?STATE{batch=Batch,
- batch_start=BatchStart,
- key_list_state=KeyListState,
- bag_id=BagId},
- ok = continue(),
- {next_state, feeding_workers, NewStateData};
-fetching_next_batch(continue, ?STATE{active_workers=0,
- batch=[],
- key_list_state=undefined,
- batch_caller=Caller} = State) ->
- %% finished with this GC run
- _ = case Caller of
- undefined -> ok;
- _ -> Caller ! {batch_finished, State}
- end,
- _ = lager:info("Finished garbage collection: "
- "~b seconds, ~p batch_count, ~p batch_skips, "
- "~p manif_count, ~p block_count\n",
- [elapsed(State?STATE.batch_start), State?STATE.batch_count,
- State?STATE.batch_skips, State?STATE.manif_count,
- State?STATE.block_count]),
- NewState = schedule_next(State?STATE{batch_start=undefined,
- batch_caller=undefined}),
- {next_state, idle, NewState};
-fetching_next_batch(continue, ?STATE{batch=[],
- key_list_state=undefined}=State) ->
- {next_state, waiting_for_workers, State};
-fetching_next_batch(continue, ?STATE{key_list_state=undefined}=State) ->
- {next_state, feeding_workers, State};
-fetching_next_batch(continue, ?STATE{key_list_state=KeyListState}=State) ->
- %% Fetch the next set of manifests for deletion
- {KeyListRes, UpdKeyListState} = riak_cs_gc_key_list:next(KeyListState),
- #gc_key_list_result{bag_id=BagId, batch=Batch} = KeyListRes,
- _ = lager:debug("Next batch keys: ~p", [Batch]),
- NewStateData = State?STATE{batch=Batch,
- key_list_state=UpdKeyListState,
- bag_id=BagId},
- ok = continue(),
- {next_state, feeding_workers, NewStateData};
-fetching_next_batch({batch_complete, WorkerPid, WorkerState}, State) ->
- NewStateData = handle_batch_complete(WorkerPid, WorkerState, State),
- ok = continue(),
- {next_state, fetching_next_batch, NewStateData};
-fetching_next_batch(_, State) ->
- {next_state, fetching_next_batch, State}.
-
-%% @doc Async transitions from `feeding_workers' are all due to
-%% messages the FSM sends itself, in order to have opportunities to
-%% handle messages from the outside world (like `status').
-feeding_workers(continue, ?STATE{max_workers=WorkerCount,
- active_workers=WorkerCount}=State)
- when WorkerCount > 0 ->
- %% Worker capacity has been reached so must wait for a worker to
- %% finish before assigning more work.
- {next_state, waiting_for_workers, State};
-feeding_workers(continue, ?STATE{batch=[]}=State) ->
- %% No outstanding work to hand out
- ok = continue(),
- {next_state, fetching_next_batch, State};
-feeding_workers(continue, State) ->
- %% Start worker process
- NextStateData = start_worker(State),
- ok = continue(),
- {next_state, feeding_workers, NextStateData};
-feeding_workers({batch_complete, WorkerPid, WorkerState}, State) ->
- NewStateData = handle_batch_complete(WorkerPid, WorkerState, State),
- ok = continue(),
- {next_state, feeding_workers, NewStateData};
-feeding_workers(_, State) ->
- {next_state, feeding_workers, State}.
-
-%% @doc This state initiates the deletion of a file from
-%% a set of manifests stored for a particular key in the
-%% garbage collection bucket.
-waiting_for_workers({batch_complete, WorkerPid, WorkerState}, State=?STATE{batch=[]}) ->
- NewStateData = handle_batch_complete(WorkerPid, WorkerState, State),
- ok = continue(),
- {next_state, fetching_next_batch, NewStateData};
-waiting_for_workers({batch_complete, WorkerPid, WorkerState}, State) ->
- NewStateData = handle_batch_complete(WorkerPid, WorkerState, State),
- ok = continue(),
- {next_state, feeding_workers, NewStateData};
-waiting_for_workers(_, State) ->
- {next_state, waiting_for_workers, State}.
-
-paused({batch_complete, WorkerPid, WorkerState}, State=?STATE{batch=[]}) ->
- NewStateData = handle_batch_complete(WorkerPid, WorkerState, State),
- {next_state, paused, NewStateData};
-paused(_, State) ->
- {next_state, paused, State}.
-
-%% Synchronous events
-
-idle({manual_batch, Options}, {CallerPid, _Tag}=_From, State) ->
- Leeway = leeway_option(Options),
- ok_reply(fetching_next_batch, start_manual_batch(
- lists:member(testing, Options),
- Leeway,
- State?STATE{batch_caller=CallerPid}));
-idle(pause, _From, State) ->
- ok_reply(paused, pause_gc(idle, State));
-idle(Msg, _From, State) ->
- Common = [{status, {ok, {idle, [{interval, State?STATE.interval},
- {leeway, riak_cs_gc:leeway_seconds()},
- {last, State?STATE.last},
- {next, State?STATE.next}]}}},
- {cancel_batch, {error, no_batch}},
- {resume, {error, not_paused}}],
- {reply, handle_common_sync_reply(Msg, Common, State), idle, State}.
-
-fetching_next_batch(pause, _From, State) ->
- ok_reply(paused, pause_gc(fetching_next_batch, State));
-fetching_next_batch(cancel_batch, _From, State) ->
- ok_reply(idle, cancel_batch(State));
-fetching_next_batch(Msg, _From, State) ->
- Common = [{status, {ok, {fetching_next_batch, status_data(State)}}},
- {manual_batch, {error, already_deleting}},
- {resume, {error, not_paused}}],
- {reply, handle_common_sync_reply(Msg, Common, State), fetching_next_batch, State}.
-
-feeding_workers(pause, _From, State) ->
- ok_reply(paused, pause_gc(feeding_workers, State));
-feeding_workers(cancel_batch, _From, State) ->
- ok_reply(idle, cancel_batch(State));
-feeding_workers(Msg, _From, State) ->
- Common = [{status, {ok, {feeding_workers, status_data(State)}}},
- {manual_batch, {error, already_deleting}},
- {resume, {error, not_paused}}],
- {reply, handle_common_sync_reply(Msg, Common, State), feeding_workers, State}.
-
-waiting_for_workers(pause, _From, State) ->
- ok_reply(paused, pause_gc(waiting_for_workers, State));
-waiting_for_workers(cancel_batch, _From, State) ->
- ok_reply(idle, cancel_batch(State));
-waiting_for_workers(Msg, _From, State) ->
- Common = [{status, {ok, {waiting_for_workers, status_data(State)}}},
- {manual_batch, {error, already_deleting}},
- {resume, {error, not_paused}}],
- {reply, handle_common_sync_reply(Msg, Common, State), waiting_for_workers, State}.
-
-paused(resume, _From, State=?STATE{pause_state=PauseState}) ->
- ok_reply(PauseState, resume_gc(State));
-paused(cancel_batch, _From, State) ->
- ok_reply(paused, cancel_batch(State?STATE{pause_state=idle}));
-paused(Msg, _From, State) ->
- Common = [{status, {ok, {paused, status_data(State)}}},
- {pause, {error, already_paused}},
- {manual_batch, {error, already_paused}}],
- {reply, handle_common_sync_reply(Msg, Common, State), paused, State}.
-
-%% @doc there are no all-state events for this fsm
-handle_event(_Event, StateName, State) ->
- {next_state, StateName, State}.
-
-%% @doc Handle synchronous events that should be handled
-%% the same regardless of the current state.
--spec handle_sync_event(term(), term(), atom(), ?STATE{}) ->
- {reply, term(), atom(), ?STATE{}}.
-handle_sync_event({set_interval, Interval}, _From, StateName, State) ->
- {Reply, NewState} = case riak_cs_gc:set_gc_interval(Interval) of
- ok -> {ok, State?STATE{interval=Interval}};
- {error, Reason} -> {{error, Reason}, State}
- end,
- {reply, Reply, StateName, NewState};
-handle_sync_event(current_state, _From, StateName, State) ->
- {reply, {StateName, State}, StateName, State};
-handle_sync_event({change_state, NewStateName}, _From, _StateName, State) ->
- ok_reply(NewStateName, State);
-handle_sync_event(stop, _From, _StateName, State) ->
- {stop, normal, ok, State};
-handle_sync_event(_Event, _From, StateName, State) ->
- ok_reply(StateName, State).
-
-handle_info({start_batch, Leeway}, idle, State) ->
- NewState = start_batch(Leeway, State),
- {next_state, fetching_next_batch, NewState};
-handle_info({start_batch, _}, InBatch, State) ->
- _ = lager:info("Unable to start garbage collection batch"
- " because a previous batch is still working."),
- {next_state, InBatch, State};
-handle_info(_Info, StateName, State) ->
- {next_state, StateName, State}.
-
-%% @doc TODO: log warnings if this fsm is asked to terminate in the
-%% middle of running a gc batch
-terminate(_Reason, _StateName, _State) ->
- ok.
-
-%% @doc this fsm has no special upgrade process
-code_change(_OldVsn, StateName, State, _Extra) ->
- {ok, StateName, State}.
-
-%%%===================================================================
-%%% Internal functions
-%%%===================================================================
-
-%% @doc Handle a `batch_complete' event from a GC worker process.
--spec handle_batch_complete(pid(), #gc_worker_state{}, ?STATE{}) -> ?STATE{}.
-handle_batch_complete(WorkerPid, WorkerState, State) ->
- ?STATE{active_workers=ActiveWorkers,
- worker_pids=WorkerPids,
- batch_count=BatchCount,
- batch_skips=BatchSkips,
- manif_count=ManifestCount,
- block_count=BlockCount} = State,
- #gc_worker_state{batch_count=WorkerBatchCount,
- batch_skips=WorkerBatchSkips,
- manif_count=WorkerManifestCount,
- block_count=WorkerBlockCount} = WorkerState,
- UpdWorkerPids = lists:delete(WorkerPid, WorkerPids),
- %% @TODO Workout the terminiology for these stats. i.e. Is batch
- %% count just an increment or represenative of something else.
- State?STATE{active_workers=ActiveWorkers - 1,
- worker_pids=UpdWorkerPids,
- batch_count=BatchCount + WorkerBatchCount,
- batch_skips=BatchSkips + WorkerBatchSkips,
- manif_count=ManifestCount + WorkerManifestCount,
- block_count=BlockCount + WorkerBlockCount}.
-
-%% @doc Start a GC worker and return the apprpriate next state and
-%% updated state record.
--spec start_worker(?STATE{}) -> ?STATE{}.
-start_worker(State=?STATE{testing=true}) ->
- State;
-start_worker(State=?STATE{batch=[NextBatch | RestBatches],
- bag_id=BagId,
- active_workers=ActiveWorkers,
- worker_pids=WorkerPids}) ->
- case ?GC_WORKER:start_link(BagId, NextBatch) of
- {ok, Pid} ->
- State?STATE{batch=RestBatches,
- active_workers=ActiveWorkers + 1,
- worker_pids=[Pid | WorkerPids]};
- {error, _Reason} ->
- State
- end.
-
-%% @doc Cancel the current batch of files set for garbage collection.
--spec cancel_batch(?STATE{}) -> ?STATE{}.
-cancel_batch(?STATE{batch_start=BatchStart,
- worker_pids=WorkerPids}=State) ->
- %% Interrupt the batch of deletes
- _ = lager:info("Canceled garbage collection batch after ~b seconds.",
- [elapsed(BatchStart)]),
- _ = [riak_cs_gc_worker:stop(P) || P <- WorkerPids],
- schedule_next(State?STATE{batch=[],
- worker_pids=[],
- active_workers=0}).
-
-%% @private
-%% @doc Send an asynchronous `continue' event. This is used to advance
-%% the FSM to the next state and perform some blocking action. The blocking
-%% actions are done in the beginning of the next state to give the FSM a
-%% chance to respond to events from the outside world.
--spec continue() -> ok.
-continue() ->
- gen_fsm:send_event(self(), continue).
-
-%% @doc How many seconds have passed from `Time' to now.
--spec elapsed(undefined | non_neg_integer()) -> non_neg_integer().
-elapsed(undefined) ->
- riak_cs_gc:timestamp();
-elapsed(Time) ->
- Now = riak_cs_gc:timestamp(),
- case (Diff = Now - Time) > 0 of
- true ->
- Diff;
- false ->
- 0
- end.
-
-%% @doc Take required actions to pause garbage collection and update
-%% the state record for the transition to `paused'.
--spec pause_gc(atom(), ?STATE{}) -> ?STATE{}.
-pause_gc(idle, State=?STATE{interval=Interval,
- timer_ref=TimerRef}) ->
- _ = lager:info("Pausing garbage collection"),
- Remainder = cancel_timer(Interval, TimerRef),
- State?STATE{pause_state=idle,
- interval_remaining=Remainder};
-pause_gc(State, StateData) ->
- _ = lager:info("Pausing garbage collection"),
- StateData?STATE{pause_state=State}.
-
--spec cancel_timer(timeout(), 'undefined' | timer:tref()) -> 'undefined' | integer().
-cancel_timer(_, undefined) ->
- undefined;
-cancel_timer(infinity, TimerRef) ->
- %% Cancel the timer in case the interval has
- %% recently be set to `infinity'.
- _ = erlang:cancel_timer(TimerRef),
- undefined;
-cancel_timer(_, TimerRef) ->
- handle_cancel_timer(erlang:cancel_timer(TimerRef)).
-
-handle_cancel_timer(false) ->
- 0;
-handle_cancel_timer(RemainderMillis) ->
- RemainderMillis.
-
--spec resume_gc(?STATE{}) -> ?STATE{}.
-resume_gc(State) ->
- _ = lager:info("Resuming garbage collection"),
- ok = continue(),
- State?STATE{pause_state=undefined}.
-
--spec ok_reply(atom(), ?STATE{}) -> {reply, ok, atom(), ?STATE{}}.
-ok_reply(NextState, NextStateData) ->
- {reply, ok, NextState, NextStateData}.
-
-%% @doc Setup the automatic trigger to start the next
-%% scheduled batch calculation.
--spec schedule_next(?STATE{}) -> ?STATE{}.
-schedule_next(?STATE{interval=infinity}=State) ->
- %% nothing to schedule, all triggers manual
- State;
-schedule_next(?STATE{initial_delay=undefined}=State) ->
- schedule_next(State, 0);
-schedule_next(?STATE{initial_delay=InitialDelay}=State) ->
- schedule_next(State?STATE{initial_delay=undefined}, InitialDelay).
-
-schedule_next(?STATE{next=Last, interval=Interval}=State, InitialDelay) ->
- RevisedNext = riak_cs_gc:timestamp() + Interval,
- TimerValue = Interval * 1000 + InitialDelay * 1000,
- TimerRef = erlang:send_after(TimerValue, self(),
- {start_batch, riak_cs_gc:leeway_seconds()}),
- State?STATE{batch_start=undefined,
- last=Last,
- next=RevisedNext,
- timer_ref=TimerRef}.
-
-%% @doc Actually kick off the batch. After calling this function, you
-%% must advance the FSM state to `fetching_next_batch'.
-%% Intentionally pattern match on an undefined Riak handle.
-start_batch(Leeway, State) ->
- %% this does not check out a worker from the riak
- %% connection pool; instead it creates a fresh new worker,
- %% the idea being that we don't want to delay deletion
- %% just because the normal request pool is empty; pool
- %% workers just happen to be literally the socket process,
- %% so "starting" one here is the same as opening a
- %% connection, and avoids duplicating the configuration
- %% lookup code
- ok = continue(),
- State?STATE{batch_count=0,
- batch_skips=0,
- manif_count=0,
- block_count=0,
- leeway=Leeway}.
-
--spec start_manual_batch(Testing::boolean(), non_neg_integer(), ?STATE{}) -> ?STATE{}.
-start_manual_batch(true, _, State) ->
- State?STATE{batch=undefined};
-start_manual_batch(false, Leeway, State) ->
- start_batch(Leeway, State?STATE{batch=[]}).
-
-%% @doc Extract a list of status information from a state record.
-%%
-%% CAUTION: Do not add side-effects to this function: it is called specutively.
--spec status_data(?STATE{}) -> [{atom(), term()}].
-status_data(State) ->
- [{interval, State?STATE.interval},
- {leeway, riak_cs_gc:leeway_seconds()},
- {last, State?STATE.last},
- {current, State?STATE.batch_start},
- {next, State?STATE.next},
- {elapsed, elapsed(State?STATE.batch_start)},
- {files_deleted, State?STATE.batch_count},
- {files_skipped, State?STATE.batch_skips},
- {files_left, if is_list(State?STATE.batch) -> length(State?STATE.batch);
- true -> 0
- end}].
-
-handle_common_sync_reply(Msg, Common, _State) when is_atom(Msg) ->
- proplists:get_value(Msg, Common, unknown_command);
-handle_common_sync_reply({MsgBase, _}, Common, State) when is_atom(MsgBase) ->
- handle_common_sync_reply(MsgBase, Common, State).
-
--spec leeway_option(list()) -> non_neg_integer().
-leeway_option(Options) ->
- case lists:keyfind(leeway, 1, Options) of
- {leeway, Leeway} ->
- Leeway;
- false ->
- riak_cs_gc:leeway_seconds()
- end.
-
-%% ===================================================================
-%% Test API and tests
-%% ===================================================================
-
-%% @doc Get the current state of the fsm for testing inspection
--spec current_state() -> {atom(), ?STATE{}} | {error, term()}.
-current_state() ->
- gen_fsm:sync_send_all_state_event(?SERVER, current_state).
-
--ifdef(TEST).
-
-%% Start the garbage collection server
-test_link() ->
- gen_fsm:start_link({local, ?SERVER}, ?MODULE, [testing], []).
-
-%% Start the garbage collection server
-test_link(Interval) ->
- application:set_env(riak_cs, gc_interval, Interval),
- test_link().
-
-%% Manipulate the current state of the fsm for testing
-change_state(State) ->
- gen_fsm:sync_send_all_state_event(?SERVER, {change_state, State}).
-
--endif.
diff --git a/src/riak_cs_gc_key_list.erl b/src/riak_cs_gc_key_list.erl
index b69c3a123..d81c64f20 100644
--- a/src/riak_cs_gc_key_list.erl
+++ b/src/riak_cs_gc_key_list.erl
@@ -23,9 +23,9 @@
-module(riak_cs_gc_key_list).
%% API
--export([new/2, next/1]).
+-export([new/2, next/1, has_next/1]).
--include("riak_cs_gc_d.hrl").
+-include("riak_cs_gc.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -41,7 +41,8 @@ new(BatchStart, Leeway) ->
next_pool(State).
%% @doc Fetch next key list and returns it with updated state
--spec next(gc_key_list_state()) -> {gc_key_list_result(), gc_key_list_state()}.
+-spec next(gc_key_list_state()) ->
+ {gc_key_list_result(), gc_key_list_state()|undefined}.
next(#gc_key_list_state{current_riak_client=RcPid,
continuation=undefined} = State) ->
ok = riak_cs_riak_client:stop(RcPid),
@@ -56,8 +57,14 @@ next(#gc_key_list_state{current_riak_client=RcPid,
{#gc_key_list_result{bag_id=BagId, batch=Batch},
State#gc_key_list_state{continuation=UpdContinuation}}.
+-spec has_next(gc_key_list_state()) -> boolean().
+has_next(#gc_key_list_state{remaining_bags=[], continuation=undefined}) ->
+ false;
+has_next(_) ->
+ true.
+
%% @doc Fetch next key list and returns it with updated state
--spec next_pool(gc_key_list_state()) -> {gc_key_list_result(), gc_key_list_state()}.
+-spec next_pool(gc_key_list_state()) -> {gc_key_list_result(), gc_key_list_state()|undefined}.
next_pool(#gc_key_list_state{remaining_bags=[]}) ->
{#gc_key_list_result{bag_id=undefined, batch=[]},
undefined};
diff --git a/src/riak_cs_gc_manager.erl b/src/riak_cs_gc_manager.erl
new file mode 100644
index 000000000..0ab9ca677
--- /dev/null
+++ b/src/riak_cs_gc_manager.erl
@@ -0,0 +1,323 @@
+%% ---------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% ---------------------------------------------------------------------
+%%
+%% State Diagram
+%%
+%% start -> idle -(start)-> running
+%% ^ |
+%% +----(finish)---+
+%% +----(cancel)---+
+%%
+%% Message excange chart (not a sequence, but just a list)
+%%
+%% Message\ sdr/rcver gc_manager gc_batch
+%% spawn_link) --->
+%% start) --->
+%% cancel) --->
+%% finished) <---
+%%
+-module(riak_cs_gc_manager).
+
+-behaviour(gen_fsm).
+
+%% Console API
+-export([start_batch/1,
+ cancel_batch/0,
+ set_interval/1,
+ status/0,
+ pp_status/0]).
+
+%% FSM API
+-export([start_link/0, finished/1]).
+
+-ifdef(TEST).
+-export([test_link/0]).
+-endif.
+
+%% gen_fsm callbacks
+-export([init/1,
+ handle_event/3,
+ handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
+
+-export([idle/2, idle/3,
+ running/2, running/3]).
+
+-type statename() :: idle | running.
+-export_type([statename/0]).
+
+-include("riak_cs_gc.hrl").
+
+-define(SERVER, ?MODULE).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+start_batch(Options) ->
+ gen_fsm:sync_send_event(?SERVER, {start, Options}, infinity).
+
+cancel_batch() ->
+ gen_fsm:sync_send_event(?SERVER, cancel, infinity).
+
+-spec status() -> {ok, {statename(), #gc_manager_state{}}}.
+status() ->
+ gen_fsm:sync_send_all_state_event(?SERVER, status, infinity).
+
+-spec pp_status() -> {ok, {statename(), proplists:proplist()}}.
+pp_status() ->
+ {ok, {StateName, State}} = status(),
+ D = lists:zip(record_info(fields, gc_manager_state),
+ tl(tuple_to_list(State))),
+ Details = lists:flatten(lists:map(fun({Type, Value}) ->
+ translate(Type, Value)
+ end, D)),
+ {ok, {StateName,
+ [{leeway, riak_cs_gc:leeway_seconds()}] ++ Details}}.
+
+%% @doc Adjust the interval at which the daemon attempts to perform
+%% a garbage collection sweep. Setting the interval to a value of
+%% `infinity' effectively disable garbage collection. The daemon still
+%% runs, but does not carry out any file deletion.
+-spec set_interval(term()) -> ok | {error, term()}.
+set_interval(Interval) when is_integer(Interval)
+ orelse Interval =:= infinity ->
+ gen_fsm:sync_send_all_state_event(?SERVER, {set_interval, Interval}, infinity).
+
+start_link() ->
+ gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+-ifdef(TEST).
+test_link() ->
+ gen_fsm:start_link({local, ?SERVER}, ?MODULE, [testing], []).
+
+-endif.
+
+
+finished(Report) ->
+ gen_fsm:sync_send_event(?SERVER, {finished, Report}, infinity).
+
+%%%===================================================================
+%%% gen_fsm callbacks
+%%%===================================================================
+
+init([]) ->
+ process_flag(trap_exit, true),
+ InitialDelay = riak_cs_gc:initial_gc_delay(),
+ State = case riak_cs_gc:gc_interval() of
+ infinity ->
+ #gc_manager_state{};
+ Interval when is_integer(Interval) ->
+ Interval2 = Interval + InitialDelay,
+ Next = riak_cs_gc:timestamp() + Interval2,
+ TimerRef = erlang:send_after(Interval2 * 1000, self(), {start, []}),
+ _ = lager:info("Scheduled next batch at ~s",
+ [riak_cs_gc_console:human_time(Next)]),
+
+ #gc_manager_state{next=Next,
+ interval=Interval,
+ initial_delay=InitialDelay,
+ timer_ref=TimerRef}
+ end,
+ {ok, idle, State};
+init([testing]) ->
+ {ok, idle, #gc_manager_state{}}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc All gen_fsm:send_event/2 call should be ignored
+idle(_Event, State) ->
+ {next_state, idle, State}.
+running(_Event, State) ->
+ {next_state, running, State}.
+
+%% @private
+%% @doc
+idle({start, Options}, _From, State) ->
+ case start_batch(State, Options) of
+ {ok, NextState} ->
+ {reply, ok, running, NextState};
+ Error ->
+ {reply, Error, idle, State}
+ end;
+idle(_, _From, State) ->
+ {reply, {error, idle}, idle, State}.
+
+running(cancel, _From, State = #gc_manager_state{gc_batch_pid=Pid}) ->
+ %% stop gc_batch here
+ catch riak_cs_gc_batch:stop(Pid),
+ NextState = schedule_next(State),
+ {reply, ok, idle, NextState};
+running({finished, Report}, _From, State) ->
+ %% Add report to history
+ NextState=schedule_next(State),
+ {reply, ok, idle,
+ NextState#gc_manager_state{batch_history=[Report]}};
+running(_Event, _From, State) ->
+ Reply = {error, running},
+ {reply, Reply, running, State}.
+
+%% @private
+%% @doc Not used.
+handle_event(_Event, StateName, State) ->
+ {next_state, StateName, State}.
+
+%% @private
+%% @doc
+handle_sync_event({set_interval, Initerval}, _From, StateName, State) ->
+ NewState0 = maybe_cancel_timer(State#gc_manager_state{interval=Initerval}),
+ NewState = schedule_next(NewState0),
+ {reply, ok, StateName, NewState};
+handle_sync_event(status, _From, StateName, #gc_manager_state{gc_batch_pid=Pid} = State) ->
+ {_GCDStateName, GCDState} = maybe_current_state(Pid),
+ NewState = State#gc_manager_state{current_batch=GCDState},
+ {reply, {ok, {StateName, NewState}}, StateName, NewState};
+handle_sync_event(stop, _, _, State) ->
+ %% for tests
+ {stop, normal, ok, State};
+handle_sync_event(_Event, _From, StateName, State) ->
+ Reply = {error, illegal_sync_send_all_state_call},
+ {reply, Reply, StateName, State}.
+
+%% @private
+%% @doc Because process flag of trap_exit is true, gc_batch
+%% failure will be delivered as a message.
+handle_info({'EXIT', Pid, Reason}, _StateName,
+ #gc_manager_state{gc_batch_pid=Pid} = State) ->
+ case Reason of
+ Reason when Reason =/= normal andalso Reason =/= cancel ->
+ _ = lager:warning("GC batch has terminated for reason: ~p", [Reason]);
+ _ ->
+ ok
+ end,
+ NextState = schedule_next(State#gc_manager_state{gc_batch_pid=undefined}),
+ {next_state, idle, NextState};
+handle_info({start, Options}, idle, State) ->
+ case start_batch(State, Options) of
+ {ok, NextState} ->
+ {next_state, running, NextState};
+ Error ->
+ _ = lager:error("Cannot start batch. Reason: ~p", [Error]),
+ NextState = schedule_next(State),
+ {next_state, idle, NextState}
+ end;
+handle_info(Info, StateName, State) ->
+ _ = lager:warning("Unexpected message received at GC process (~p): ~p",
+ [StateName, Info]),
+ {next_state, StateName, State}.
+
+%% @private
+%% @doc
+terminate(_Reason, _StateName, _State = #gc_manager_state{gc_batch_pid=Pid}) ->
+ catch riak_cs_gc_batch:stop(Pid),
+ ok.
+
+%% @private
+%% @doc Not used
+code_change(_OldVsn, StateName, State, _Extra) ->
+ {ok, StateName, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+-spec start_batch(#gc_manager_state{}, proplists:proplist()) ->
+ {ok, #gc_manager_state{}} |
+ {error, term()}.
+start_batch(State, Options) ->
+ MaxWorkers = riak_cs_gc:gc_max_workers(),
+ %% StartKey = proplists:get_value(start, Options,
+ %% riak_cs_gc:epoch_start()),
+ BatchStart = riak_cs_gc:timestamp(),
+ %% EndKey = proplists:get_value('end', Options, BatchStart),
+ Leeway = proplists:get_value(leeway, Options,
+ riak_cs_gc:leeway_seconds()),
+
+ %% set many items to GCDState here
+ GCDState = #gc_batch_state{
+ batch_start=BatchStart,
+ leeway=Leeway,
+ max_workers=MaxWorkers},
+
+ case riak_cs_gc_batch:start_link(GCDState) of
+ {ok, Pid} ->
+ _ = lager:info("Starting garbage collection in ~p: "
+ "leeway=~p, batch_start=~p, max_workers=~p",
+ [Pid, Leeway, BatchStart, MaxWorkers]),
+ {ok, State#gc_manager_state{gc_batch_pid=Pid,
+ current_batch=GCDState}};
+ Error ->
+ Error
+ end.
+
+-ifdef(TEST).
+maybe_current_state(undefined) -> {not_running, undefined};
+maybe_current_state(mock_pid) -> {not_running, undefined}.
+-else.
+maybe_current_state(undefined) -> {not_running, undefined};
+maybe_current_state(Pid) when is_pid(Pid) ->
+ riak_cs_gc_batch:current_state(Pid).
+-endif.
+
+maybe_cancel_timer(#gc_manager_state{timer_ref=Ref}=State)
+ when is_reference(Ref) ->
+ _ = erlang:cancel_timer(Ref),
+ State#gc_manager_state{timer_ref=undefined,
+ next=undefined};
+maybe_cancel_timer(State) ->
+ State.
+
+%% @doc Setup the automatic trigger to start the next
+%% scheduled batch calculation.
+-spec schedule_next(#gc_manager_state{}) -> #gc_manager_state{}.
+schedule_next(#gc_manager_state{timer_ref=Ref}=State)
+ when Ref =/= undefined ->
+ case erlang:read_timer(Ref) of
+ false ->
+ schedule_next(State#gc_manager_state{timer_ref=undefined});
+ _ ->
+ _ = lager:debug("Timer is already scheduled, maybe manually triggered?"),
+ %% Timer is already scheduled, do nothing
+ State
+ end;
+schedule_next(#gc_manager_state{interval=infinity}=State) ->
+ %% nothing to schedule, all triggers manual
+ State#gc_manager_state{next=undefined};
+schedule_next(#gc_manager_state{interval=Interval}=State) ->
+ RevisedNext = riak_cs_gc:timestamp() + Interval,
+ TimerValue = Interval * 1000,
+ TimerRef = erlang:send_after(TimerValue, self(), {start, []}),
+ _ = lager:info("Scheduled next batch at ~s",
+ [riak_cs_gc_console:human_time(RevisedNext)]),
+ State#gc_manager_state{next=RevisedNext,
+ timer_ref=TimerRef}.
+
+translate(gc_batch_pid, _) -> [];
+translate(batch_history, []) -> [];
+translate(batch_history, [H|_]) ->
+ #gc_batch_state{batch_start = Last} = H,
+ [{last, Last}];
+translate(batch_history, #gc_batch_state{batch_start = Last}) ->
+ [{last, Last}];
+translate(current_batch, undefined) -> [];
+translate(current_batch, GCDState) ->
+ riak_cs_gc_batch:status_data(GCDState);
+translate(initial_delay, _) -> [];
+translate(timer_ref, _) -> [];
+translate(T, V) -> {T, V}.
diff --git a/src/riak_cs_gc_worker.erl b/src/riak_cs_gc_worker.erl
index 8dd30c242..8983731d4 100644
--- a/src/riak_cs_gc_worker.erl
+++ b/src/riak_cs_gc_worker.erl
@@ -44,12 +44,16 @@
terminate/3,
code_change/4]).
--include("riak_cs_gc_d.hrl").
+-include("riak_cs_gc.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
-export([current_state/1]).
-define(STATE, #gc_worker_state).
--define(GC_D, riak_cs_gc_d).
+-define(GC_D, riak_cs_gc_batch).
%%%===================================================================
%%% API
@@ -86,7 +90,8 @@ init([BagId, Keys]) ->
%% handle messages from the outside world (like `status').
fetching_next_fileset(continue, ?STATE{batch=[]}=State) ->
%% finished with this batch
- gen_fsm:send_event(?GC_D, {batch_complete, self(), State}),
+ %% gen_fsm:send_event(?GC_D, {batch_complete, self(), State}),
+ gen_fsm:send_all_state_event(?GC_D, {batch_complete, self(), State}),
{stop, normal, State};
fetching_next_fileset(continue, State=?STATE{batch=[FileSetKey | RestKeys],
batch_skips=BatchSkips,
@@ -165,7 +170,8 @@ initiating_file_delete(_Msg, _From, State) ->
{next_state, initiating_file_delete, State}.
waiting_file_delete({Pid, DelFsmReply}, _From, State=?STATE{delete_fsm_pid=Pid}) ->
- ok_reply(initiating_file_delete, handle_delete_fsm_reply(DelFsmReply, State));
+ Reply=handle_delete_fsm_reply(DelFsmReply, State),
+ ok_reply(initiating_file_delete, Reply);
waiting_file_delete(_Msg, _From, State) ->
{next_state, initiating_file_delete, State}.
diff --git a/src/riak_cs_sup.erl b/src/riak_cs_sup.erl
index a694e02bd..7dbbcb79f 100644
--- a/src/riak_cs_sup.erl
+++ b/src/riak_cs_sup.erl
@@ -75,9 +75,9 @@ process_specs() ->
Storage = {riak_cs_storage_d,
{riak_cs_storage_d, start_link, []},
permanent, 5000, worker, [riak_cs_storage_d]},
- GC = {riak_cs_gc_d,
- {riak_cs_gc_d, start_link, []},
- permanent, 5000, worker, [riak_cs_gc_d]},
+ GC = {riak_cs_gc_manager,
+ {riak_cs_gc_manager, start_link, []},
+ permanent, 5000, worker, [riak_cs_gc_manager]},
Stats = {riak_cs_stats, {riak_cs_stats, start_link, []},
permanent, 5000, worker, dynamic},
DeleteFsmSup = {riak_cs_delete_fsm_sup,
diff --git a/test/riak_cs_gc_d_eqc.erl b/test/riak_cs_gc_d_eqc.erl
deleted file mode 100644
index dfa3bbdb0..000000000
--- a/test/riak_cs_gc_d_eqc.erl
+++ /dev/null
@@ -1,281 +0,0 @@
-%% ---------------------------------------------------------------------
-%%
-%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License. You may obtain
-%% a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied. See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-%% ---------------------------------------------------------------------
-
-%% @doc Quickcheck test module for `riak_cs_gc_d'.
-
--module(riak_cs_gc_d_eqc).
-
--include("riak_cs_gc_d.hrl").
-
--ifdef(EQC).
--include_lib("eqc/include/eqc.hrl").
--include_lib("eqc/include/eqc_fsm.hrl").
--include_lib("eunit/include/eunit.hrl").
-
-%% eqc properties
--export([prop_set_interval/0,
- prop_manual_commands/0,
- prop_status/0]).
-
-%% States
--export([idle/1,
- fetching_next_batch/1,
- feeding_workers/1,
- waiting_for_workers/1,
- paused/2]).
-
-%% eqc_fsm callbacks
--export([initial_state/0,
- initial_state_data/0,
- next_state_data/5,
- precondition/4,
- postcondition/5]).
-
-%% Helpers
--export([test/0,
- test/1]).
-
--compile(export_all).
-
--define(QC_OUT(P),
- eqc:on_output(fun(Str, Args) ->
- io:format(user, Str, Args) end, P)).
-
--define(TEST_ITERATIONS, 500).
--define(GCD_MODULE, riak_cs_gc_d).
-
--define(P(EXPR), PPP = (EXPR),
- case PPP of
- true -> ok;
- _ -> io:format(user, "PPP=~p at line ~p: ~s~n", [PPP, ?LINE, ??EXPR])
- end,
- PPP).
-
--define(STATE, #gc_d_state).
--record(mc_state, {}).
-
-%%====================================================================
-%% Eunit tests
-%%====================================================================
-
-eqc_test_() ->
- {spawn,
- [
- {timeout, 20, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(10, ?QC_OUT(prop_set_interval()))))},
- {timeout, 60, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(30, ?QC_OUT(prop_manual_commands()))))},
- {timeout, 20, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(10, ?QC_OUT(prop_status()))))}
- ]
- }.
-
-%% ====================================================================
-%% EQC Properties
-%% ====================================================================
-
-prop_set_interval() ->
- ?FORALL(Interval, ?LET(Nat, nat(), Nat + 1),
- catch begin
- catch riak_cs_gc_d:stop(),
- {ok, _} = riak_cs_gc_d:test_link(?DEFAULT_GC_INTERVAL),
- {_, State1} = riak_cs_gc_d:current_state(),
- riak_cs_gc_d:set_interval(Interval),
- {_, State2} = riak_cs_gc_d:current_state(),
- riak_cs_gc_d:stop(),
- conjunction([{initial_interval, equals(?DEFAULT_GC_INTERVAL, State1?STATE.interval)},
- {updated_interval, equals(Interval, State2?STATE.interval)}
- ])
- end).
-
-prop_manual_commands() ->
- ?FORALL(Cmds,
- commands(?MODULE),
- begin
- catch riak_cs_gc_d:stop(),
- {ok, _} = riak_cs_gc_d:test_link(infinity),
- {H, {_F, _S}, Res} = run_commands(?MODULE, Cmds),
- riak_cs_gc_d:stop(),
- aggregate(zip(state_names(H), command_names(Cmds)),
- ?WHENFAIL(
- begin
- eqc:format("Cmds: ~p~n~n",
- [zip(state_names(H),
- command_names(Cmds))]),
- eqc:format("Result: ~p~n~n", [Res]),
- eqc:format("History: ~p~n~n", [H])
- end,
- equals(ok, Res)))
- end).
-
-prop_status() ->
- ?FORALL({Interval, Last, Next,
- Start, Count, Skips, Batch},
- {nat(), riak_cs_gen:datetime(), riak_cs_gen:datetime(),
- nat(), nat(), nat(), list(nat())},
- begin
- State = ?STATE{interval=Interval,
- last=Last,
- next=Next,
- batch_start=Start,
- batch_count=Count,
- batch_skips=Skips,
- batch=Batch},
- Status = orddict:from_list(riak_cs_gc_d:status_data(State)),
- conjunction([{interval, equals(orddict:fetch(interval, Status), Interval)},
- {current, equals(orddict:fetch(current, Status), Start)},
- {next, equals(orddict:fetch(next, Status), Next)},
- {files_deleted, equals(orddict:fetch(files_deleted, Status), Count)},
- {files_skipped, equals(orddict:fetch(files_skipped, Status), Skips)},
- {files_left, equals(orddict:fetch(files_left, Status), length(Batch)) }
- ])
- end).
-
-%%====================================================================
-%% eqc_fsm callbacks
-%%====================================================================
-
-idle(_S) ->
- [
- {history, {call, ?GCD_MODULE, cancel_batch, []}},
- {history, {call, ?GCD_MODULE, resume, []}},
- {history, {call, ?GCD_MODULE, set_interval, [infinity]}},
- {{paused, idle}, {call, ?GCD_MODULE, pause, []}},
- {fetching_next_batch, {call, ?GCD_MODULE, manual_batch, [[testing]]}}
- ].
-
-fetching_next_batch(_S) ->
- [
- {history, {call, ?GCD_MODULE, manual_batch, [[testing]]}},
- {history, {call, ?GCD_MODULE, resume, []}},
- {history, {call, ?GCD_MODULE, set_interval, [infinity]}},
- {idle, {call, ?GCD_MODULE, cancel_batch, []}},
- {{paused, fetching_next_batch}, {call, ?GCD_MODULE, pause, []}},
- {feeding_workers, {call, ?GCD_MODULE, change_state, [feeding_workers]}}
- ].
-
-feeding_workers(_S) ->
- [
- {history, {call, ?GCD_MODULE, manual_batch, [[testing]]}},
- {history, {call, ?GCD_MODULE, resume, []}},
- {history, {call, ?GCD_MODULE, set_interval, [infinity]}},
- {idle, {call, ?GCD_MODULE, cancel_batch, []}},
- {{paused, feeding_workers}, {call, ?GCD_MODULE, pause, []}},
- {fetching_next_batch, {call, ?GCD_MODULE, change_state, [fetching_next_batch]}},
- {waiting_for_workers, {call, ?GCD_MODULE, change_state, [waiting_for_workers]}}
- ].
-
-waiting_for_workers(_S) ->
- [
- {history, {call, ?GCD_MODULE, manual_batch, [[testing]]}},
- {history, {call, ?GCD_MODULE, resume, []}},
- {history, {call, ?GCD_MODULE, set_interval, [infinity]}},
- {idle, {call, ?GCD_MODULE, cancel_batch, []}},
- {{paused, waiting_for_workers}, {call, ?GCD_MODULE, pause, []}},
- {feeding_workers, {call, ?GCD_MODULE, change_state, [feeding_workers]}}
- ].
-
-paused(PrevState, _S) ->
- [
- {history, {call, ?GCD_MODULE, manual_batch, [[testing]]}},
- {history, {call, ?GCD_MODULE, pause, []}},
- {history, {call, ?GCD_MODULE, set_interval, [infinity]}},
- {{paused, idle}, {call, ?GCD_MODULE, cancel_batch, []}},
- {PrevState, {call, ?GCD_MODULE, resume, []}}
- ].
-
-initial_state() ->
- idle.
-
-initial_state_data() ->
- #mc_state{}.
-
-next_state_data(_From, _To, S, _R, _C) ->
- S.
-
-precondition(_From, _To, _S, _C) ->
- true.
-
-postcondition(From, To, S , {call, _M, ManualCommad, _A}=C, R) ->
- {Actual, _} = riak_cs_gc_d:current_state(),
- ?assertEqual(actual_state(To), Actual),
- ExpectedRes = expected_result(From, To, ManualCommad),
- case R of
- ExpectedRes -> true;
- _ ->
- eqc:format("Result: ~p~n", [R]),
- eqc:format("Expected: ~p~n", [ExpectedRes]),
- eqc:format("when {From, To, S, C}: ~p~n", [{From, To, S, C}]),
- false
- end.
-
-actual_state({State, _}) -> State;
-actual_state(S) -> S.
-
-%% Handling of `set_interval' calls. Always succeeds.
-expected_result(From, From, set_interval) ->
- ok;
-
-%% Handling of `pause' and `resume' calls. Just becomes paused and back.
-expected_result({paused, StateToResume}, {paused, StateToResume}, pause) ->
- {error, already_paused};
-expected_result(From, {paused, From}, pause) ->
- ok;
-expected_result({paused, StateToResume}, StateToResume, resume) ->
- ok;
-expected_result(From, From, resume) when is_atom(From) ->
- {error, not_paused};
-
-%% Handling of `manual_batch' and `cancel_batch'.
-%% Almost just becomes fetching_next_batch and back to idle,
-%% only `cancel_batch' at paused is special.
-expected_result({paused, From}, {paused, From}, manual_batch) ->
- {error, already_paused};
-expected_result({paused, _StateAtPause}, {paused, idle}, cancel_batch) ->
- ok;
-expected_result(idle, fetching_next_batch, manual_batch) ->
- ok;
-expected_result(From, From, manual_batch) ->
- {error, already_deleting};
-expected_result(idle, idle, cancel_batch) ->
- {error, no_batch};
-expected_result(_From, idle, cancel_batch) ->
- ok;
-
-%% `change_state'
-expected_result(_From, _To, change_state) ->
- ok.
-
-weight(idle, fetching_next_batch, _) -> 10;
-weight(fetching_next_batch, feeding_workers, _) -> 5;
-weight(feeding_workers, waiting_for_workers, _) -> 3;
-weight(_, _, _) -> 1.
-
-%%====================================================================
-%% Helpers
-%%====================================================================
-
-test() ->
- test(500).
-
-test(Iterations) ->
- eqc:quickcheck(eqc:numtests(Iterations, prop_status())).
-
-test(Iterations, Prop) ->
- eqc:quickcheck(eqc:numtests(Iterations, ?MODULE:Prop())).
-
--endif.
diff --git a/test/riak_cs_gc_manager_eqc.erl b/test/riak_cs_gc_manager_eqc.erl
new file mode 100644
index 000000000..3dcb89df8
--- /dev/null
+++ b/test/riak_cs_gc_manager_eqc.erl
@@ -0,0 +1,198 @@
+%% ---------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% ---------------------------------------------------------------------
+
+%% @doc Quickcheck test module for `riak_cs_gc_manager'.
+
+-module(riak_cs_gc_manager_eqc).
+
+-include("include/riak_cs_gc.hrl").
+
+-ifdef(EQC).
+-include_lib("eqc/include/eqc.hrl").
+-include_lib("eqc/include/eqc_fsm.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+%% eqc properties
+-export([prop_set_interval/0,
+ prop_manual_commands/0]).
+
+%% States
+-export([idle/1,
+ running/1]).
+
+%% eqc_fsm callbacks
+-export([initial_state/0,
+ initial_state_data/0,
+ next_state_data/5,
+ precondition/4,
+ postcondition/5]).
+
+-define(QC_OUT(P),
+ eqc:on_output(fun(Str, Args) ->
+ io:format(user, Str, Args) end, P)).
+
+-define(TEST_ITERATIONS, 500).
+
+-define(P(EXPR), PPP = (EXPR),
+ case PPP of
+ true -> ok;
+ _ -> io:format(user, "PPP=~p at line ~p: ~s~n", [PPP, ?LINE, ??EXPR])
+ end,
+ PPP).
+
+-define(STATE, #gc_batch_state).
+-record(mc_state, {}).
+
+%%====================================================================
+%% Eunit tests
+%%====================================================================
+
+eqc_test_() ->
+ {spawn,
+ [{foreach,
+ fun() ->
+ meck:new(riak_cs_gc_batch, []),
+ meck:expect(riak_cs_gc_batch, start_link,
+ fun(_GCDState) -> {ok, mock_pid} end),
+ meck:expect(riak_cs_gc_batch, stop, fun(_) -> error(from_meck) end)
+ end,
+ fun(_) ->
+ meck:unload()
+ end,
+ [
+ {timeout, 20, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(10, ?QC_OUT(prop_set_interval()))))},
+ {timeout, 60, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(30, ?QC_OUT(prop_manual_commands()))))}
+ ]
+ }]}.
+
+%% ====================================================================
+%% EQC Properties
+%% ====================================================================
+
+prop_set_interval() ->
+ ?FORALL(Interval,
+ oneof([?LET(Nat, nat(), Nat + 1), infinity]),
+ begin
+ ok = application:set_env(riak_cs, initial_gc_delay, 0),
+ {ok, Pid} = riak_cs_gc_manager:test_link(),
+ try
+ {ok, {_, State1}} = riak_cs_gc_manager:status(),
+ ok = riak_cs_gc_manager:set_interval(Interval),
+ {ok, {_, State2}} = riak_cs_gc_manager:status(),
+ conjunction([{initial_interval,
+ equals(?DEFAULT_GC_INTERVAL, State1#gc_manager_state.interval)},
+ {updated_interval,
+ equals(Interval, State2#gc_manager_state.interval)}
+ ])
+ after
+ ok = gen_fsm:sync_send_all_state_event(Pid, stop)
+ end
+ end).
+
+prop_manual_commands() ->
+ ?FORALL(Cmds,
+ commands(?MODULE),
+ begin
+ {ok, Pid} = riak_cs_gc_manager:test_link(),
+ try
+ {H, {_F, _S}, Res} = run_commands(?MODULE, Cmds),
+ aggregate(zip(state_names(H), command_names(Cmds)),
+ ?WHENFAIL(
+ begin
+ eqc:format("Cmds: ~p~n~n",
+ [zip(state_names(H),
+ command_names(Cmds))]),
+ eqc:format("Result: ~p~n~n", [Res]),
+ eqc:format("History: ~p~n~n", [H])
+ end,
+ equals(ok, Res)))
+ after
+ ok = gen_fsm:sync_send_all_state_event(Pid, stop)
+ end
+ end).
+
+%%====================================================================
+%% eqc_fsm callbacks
+%%====================================================================
+
+idle(_S) ->
+ [
+ {history, {call, riak_cs_gc_manager, cancel_batch, []}},
+ {history, {call, riak_cs_gc_manager, set_interval, [infinity]}},
+ {running, {call, riak_cs_gc_manager, start_batch, [[{leeway, 30}]]}},
+ {idle, {call, riak_cs_gc_manager, finished, [report]}}
+ ].
+
+running(_S) ->
+ [
+ {history, {call, riak_cs_gc_manager, start_batch, [[{leeway, 30}]]}},
+ {history, {call, riak_cs_gc_manager, set_interval, [infinity]}},
+ {idle, {call, riak_cs_gc_manager, cancel_batch, []}},
+ {idle, {call, riak_cs_gc_manager, finished, [report]}}
+ ].
+
+initial_state() ->
+ idle.
+
+initial_state_data() ->
+ #mc_state{}.
+
+next_state_data(_From, _To, S, _R, _C) ->
+ S.
+
+precondition(_From, _To, _S, _C) ->
+ true.
+
+postcondition(From, To, S , {call, _M, ManualCommad, _A}=C, R) ->
+ {ok, {Actual, _}} = riak_cs_gc_manager:status(),
+ ?assertEqual(To, Actual),
+ ExpectedRes = expected_result(From, To, ManualCommad),
+ case R of
+ ExpectedRes when ManualCommad =/= status -> true;
+ {ok, {S, _}} -> true;
+ _ ->
+ eqc:format("Result: ~p~n", [R]),
+ eqc:format("Expected: ~p~n", [ExpectedRes]),
+ eqc:format("when {From, To, S, C}: ~p <- ~p~n", [{From, To, S, C}, ManualCommad]),
+ false
+ end.
+
+expected_result(S, S, set_interval) ->
+ ok;
+
+expected_result(idle, running, start_batch) ->
+ ok;
+expected_result(idle, idle, _) ->
+ {error, idle};
+
+expected_result(running, idle, finished) ->
+ ok;
+expected_result(running, running, _) ->
+ {error, running};
+
+expected_result(_From, idle, cancel_batch) ->
+ ok.
+
+%% weight(idle, fetching_next_batch, _) -> 10;
+%% weight(fetching_next_batch, feeding_workers, _) -> 5;
+%% weight(feeding_workers, waiting_for_workers, _) -> 3;
+%% weight(_, _, _) -> 1.
+
+-endif.
diff --git a/test/riak_cs_gc_single_run_eqc.erl b/test/riak_cs_gc_single_run_eqc.erl
index 4daa47fdf..281471983 100644
--- a/test/riak_cs_gc_single_run_eqc.erl
+++ b/test/riak_cs_gc_single_run_eqc.erl
@@ -19,12 +19,12 @@
%% ---------------------------------------------------------------------
%% @doc EQC test module for single gc run.
-%% Test targets is a combination of `riak_cs_gc_d' and `riak_cs_gc_worker'.
+%% Test targets is a combination of `riak_cs_gc_batch' and `riak_cs_gc_worker'.
%% All calls to riak, 2i/GET/DELETE, are mocked away by `meck'.
-module(riak_cs_gc_single_run_eqc).
--include("riak_cs_gc_d.hrl").
+-include("riak_cs_gc.hrl").
-ifdef(EQC).
-include_lib("eqc/include/eqc.hrl").
@@ -64,12 +64,8 @@
eqc_test_() ->
{foreach,
fun() ->
- error_logger:tty(false),
- error_logger:logfile({open, "riak_cs_gc_single_run_eqc.log"}),
-
application:set_env(riak_cs, gc_batch_size, 7),
- application:set_env(riak_cs, gc_interval, infinity),
- application:set_env(riak_cs, gc_paginated_indexes, true),
+ meck:new(riak_cs_gc_manager, []),
meck:new(riakc_pb_socket, [passthrough]),
%% For riak_cs_gc_worker, it starts/stops pool worker directly.
@@ -81,38 +77,44 @@ eqc_test_() ->
end,
fun(_) ->
meck:unload(),
- stop_and_wait_for_gc_d()
+ stop_and_wait_for_gc_batch()
end,
[
{timeout, ?TESTING_TIME*2,
?_assert(quickcheck(eqc:testing_time(?TESTING_TIME,
- ?QC_OUT(prop_gc_manual_batch(no_error)))))},
+ ?QC_OUT(prop_gc_batch(no_error)))))},
{timeout, ?TESTING_TIME*2,
?_assert(quickcheck(eqc:testing_time(?TESTING_TIME,
- ?QC_OUT(prop_gc_manual_batch(with_errors)))))}
+ ?QC_OUT(prop_gc_batch(with_errors)))))}
]}.
%% EQC of single GC runs.
%% 1. EQC generates `ListOfFilesetKeysInput', for exapmle
%% `[{3, no_error}, {14, with_errors}, {15, with_errors}, {92, no_error}]'.
-%% 2. `riak_cs_gc_d' requests 2i for `riak-cs-gc' and gets response.
+%% 2. `riak_cs_gc_batch' requests 2i for `riak-cs-gc' and gets response.
%% including list of fileset keys, such as `[<<"1">>, <<"2">>, <<"3">>]'.
-%% 3. `riak_cs_gc_d' starts workers for each fileset key,
+%% 3. `riak_cs_gc_batch' starts workers for each fileset key,
%% GET the fileset, spawns riak_cs_delete_fsm and DELETE fileset key at the end.
-%% 4. `riak_cs_gc_d' gathers workers' results and this test asserts them.
-prop_gc_manual_batch(ErrorOrNot) ->
- ?FORALL(ListOfFilesetKeysInput, non_empty(list(fileset_keys_input(ErrorOrNot))),
+%% 4. `riak_cs_gc_batch' gathers workers' results and this test asserts them.
+prop_gc_batch(ErrorOrNot) ->
+ ?FORALL(ListOfFilesetKeysInput,
+ non_empty(list(fileset_keys_input(ErrorOrNot))),
begin
- Res = gc_manual_batch(ListOfFilesetKeysInput),
+ Self = self(),
+ meck:expect(riak_cs_gc_manager, finished,
+ fun(State) ->
+ Self ! {batch_finished, State}
+ end),
+ Res = gc_batch(ListOfFilesetKeysInput),
{ExpectedBatchCount,
ExpectedBatchSkips,
ExpectedManifCount,
- ExpectedBlockCount} =
- expectations(ListOfFilesetKeysInput),
- stop_and_wait_for_gc_d(),
+ ExpectedBlockCount} = expectations(ListOfFilesetKeysInput),
+ stop_and_wait_for_gc_batch(),
?WHENFAIL(
begin
- eqc:format("ListOfFilesetKeysInput: ~p", [ListOfFilesetKeysInput])
+ eqc:format("ListOfFilesetKeysInput: ~p~n",
+ [ListOfFilesetKeysInput])
end,
conjunction([{batch_count, equals(ExpectedBatchCount, element(1, Res))},
{batch_skips, equals(ExpectedBatchSkips, element(2, Res))},
@@ -120,9 +122,9 @@ prop_gc_manual_batch(ErrorOrNot) ->
{block_count, equals(ExpectedBlockCount, element(4, Res))}]))
end).
-stop_and_wait_for_gc_d() ->
- Pid = whereis(riak_cs_gc_d),
- catch riak_cs_gc_d:stop(),
+stop_and_wait_for_gc_batch() ->
+ Pid = whereis(riak_cs_gc_batch),
+ catch riak_cs_gc_batch:stop(),
wait_for_stop(Pid).
wait_for_stop(undefined) ->
@@ -136,19 +138,22 @@ wait_for_stop(Pid) ->
ok
end.
--spec gc_manual_batch([fileset_keys_input()]) -> eqc:property().
-gc_manual_batch(ListOfFilesetKeysInput) ->
+-spec gc_batch([fileset_keys_input()]) -> eqc:property().
+gc_batch(ListOfFilesetKeysInput) ->
%% For `riak-cs-gc' 2i query, use a process to hold `ListOfFilesetKeysInput'.
+ %% ?debugVal(ListOfFilesetKeysInput),
meck:expect(riakc_pb_socket, get_index_range,
dummy_get_index_range_fun(ListOfFilesetKeysInput)),
- {ok, _} = riak_cs_gc_d:start_link(),
- riak_cs_gc_d:manual_batch([]),
+ {ok, _} = riak_cs_gc_batch:start_link(#gc_batch_state{
+ batch_start=riak_cs_gc:timestamp(),
+ max_workers=5,
+ leeway=1}),
receive
- {batch_finished, #gc_d_state{batch_count=BatchCount,
- batch_skips=BatchSkips,
- manif_count=ManifCount,
- block_count=BlockCount} = _State} ->
+ {batch_finished, #gc_batch_state{batch_count=BatchCount,
+ batch_skips=BatchSkips,
+ manif_count=ManifCount,
+ block_count=BlockCount} = _State} ->
{BatchCount, BatchSkips, ManifCount, BlockCount};
OtherMsg ->
eqc:format("OtherMsg: ~p~n", [OtherMsg]),