Skip to content

Commit

Permalink
Remove obsolete wrapper around sweeper state data
Browse files Browse the repository at this point in the history
We no longer need the {?MODULE, State} construct since we're not using
the State:update_whatever style of coding anymore.
  • Loading branch information
nickelization committed Nov 23, 2016
1 parent a17c768 commit 3f84f3f
Showing 1 changed file with 37 additions and 46 deletions.
83 changes: 37 additions & 46 deletions src/riak_kv_sweeper_state.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
%% ====================================================================
%% Types
%% ====================================================================
-type state() :: {?MODULE, #state{}}.
-type state() :: #state{}.
-type index() :: non_neg_integer().

-type scheduler_event() :: {request, index()} | tick.
Expand Down Expand Up @@ -97,28 +97,28 @@ new() ->
#state{sweep_participants = SP}
end,
State1 = State0#state{sweeps = get_persistent_sweeps()},
update_sweep_specs({?MODULE, State1}).
update_sweep_specs(State1).

-spec add_sweep_participant(Participant:: #sweep_participant{}, state()) -> state().
add_sweep_participant(Participant, {?MODULE, State}) ->
add_sweep_participant(Participant, State) ->
#state{sweep_participants = SP} = State,
SP1 = dict:store(Participant#sweep_participant.module, Participant, SP),
persist_participants(SP1),
new(State#state{sweep_participants = SP1}).
State#state{sweep_participants = SP1}.

-spec remove_sweep_participant(Module :: atom(), state()) -> {ok, boolean(), state()}.
remove_sweep_participant(Module, {?MODULE, State}) ->
remove_sweep_participant(Module, State) ->
#state{sweeps = Sweeps,
sweep_participants = SP} = State,
Removed = dict:is_key(Module, SP),
SP1 = dict:erase(Module, SP),
persist_participants(SP1),
disable_sweep_participant_in_running_sweep(Module, Sweeps),
State1 = new(State#state{sweep_participants = SP1}),
State1 = State#state{sweep_participants = SP1},
{ok, Removed, State1}.

-spec update_sweep_specs(state()) -> state().
update_sweep_specs({?MODULE, State}) ->
update_sweep_specs(State) ->
#state{sweeps = Sweeps} = State,
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
Indices = riak_core_ring:my_indices(Ring),
Expand All @@ -130,29 +130,29 @@ update_sweep_specs({?MODULE, State}) ->
not lists:member(Index, Indices)],
Sweeps2 = remove_sweeps(NotOwnerIdx, Sweeps1),

new(State#state{sweeps = Sweeps2}).
State#state{sweeps = Sweeps2}.

-spec status(state()) -> {ok,
{Participant :: [#sweep_participant{}],
Sweeps :: [#sweep{}]},
state()}.
status({?MODULE, State}) ->
status(State) ->
Participants =
[Participant ||
{_Mod, Participant} <- dict:to_list(State#state.sweep_participants)],
Sweeps = [Sweep || {_Index, Sweep} <- dict:to_list(State#state.sweeps)],
{ok, {Participants, Sweeps}, new(State)}.
{ok, {Participants, Sweeps}, State}.

-spec stop_all_sweeps(state()) -> {ok, Running :: non_neg_integer(), state()}.
stop_all_sweeps({?MODULE, State}) ->
stop_all_sweeps(State) ->
#state{sweeps = Sweeps} = State,
Running = [Sweep || Sweep <- get_running_sweeps(Sweeps)],
[stop_sweep(Sweep) || Sweep <- Running],
{ok, length(Running), new(State)}.
{ok, length(Running), State}.

-spec get_active_participants(index(), state()) ->
{ok, Participants :: [#sweep_participant{}], state()}.
get_active_participants(Index, {?MODULE, State}) ->
get_active_participants(Index, State) ->
#state{sweep_participants = Participants} = State,
Funs =
[{Participant, Module:participate_in_sweep(Index, self())} ||
Expand All @@ -162,33 +162,30 @@ get_active_participants(Index, {?MODULE, State}) ->
ActiveParticipants =
[Participant#sweep_participant{sweep_fun = Fun, acc = InitialAcc} ||
{Participant, {ok, Fun, InitialAcc}} <- Funs],
{ok, ActiveParticipants, new(State)}.
{ok, ActiveParticipants, State}.

-spec get_estimate_keys(index(), state()) ->
{ok, OldEstimate :: {EstimatedNrKeys :: non_neg_integer(), erlang:timestamp()}, state()} |
{ok, OldEstimate :: undefined, state()}.
get_estimate_keys(Index, {?MODULE, State}) ->
get_estimate_keys(Index, State) ->
#state{sweeps = Sweeps} = State,
#sweep{estimated_keys = OldEstimate} = dict:fetch(Index, Sweeps),
{ok, OldEstimate, new(State)}.
{ok, OldEstimate, State}.

-spec update_finished_sweep(index(), any(), state()) -> state().
update_finished_sweep(Index, Result, {?MODULE, State}) ->
update_finished_sweep(Index, Result, State) ->
#state{sweeps = Sweeps} = State,
State1 =
case dict:find(Index, Sweeps) of
{ok, Sweep} ->
Sweep1 = store_result(Result, Sweep),
finish_sweep(Sweep1,
State#state{sweeps = dict:store(Index, Sweep1, Sweeps)});
finish_sweep(Sweep1, State#state{sweeps = dict:store(Index, Sweep1, Sweeps)});
_ ->
State
end,
new(State1).
end.

-spec update_started_sweep(index(), ActiveParticipants :: [],
Estimate :: non_neg_integer(), state()) -> state().
update_started_sweep(Index, ActiveParticipants, Estimate, {?MODULE, State}) ->
update_started_sweep(Index, ActiveParticipants, Estimate, State) ->
Sweeps = State#state.sweeps,
SweepParticipants = State#state.sweep_participants,
TS = os:timestamp(),
Expand All @@ -204,24 +201,21 @@ update_started_sweep(Index, ActiveParticipants, Estimate, {?MODULE, State}) ->
start_time = TS,
end_time = undefined}
end, Sweeps),
State1 = State#state{sweeps = Sweeps1},
new(State1).
State#state{sweeps = Sweeps1}.

-spec update_progress(index(), SweptKeys :: integer(), state()) -> state().
update_progress(Index, SweptKeys, {?MODULE, State}) ->
update_progress(Index, SweptKeys, State) ->
#state{sweeps = Sweeps} = State,
State1 =
case dict:find(Index, Sweeps) of
{ok, Sweep} ->
Sweep1 = Sweep#sweep{swept_keys = SweptKeys},
State#state{sweeps = dict:store(Index, Sweep1, Sweeps)};
_ ->
State
end,
new(State1).
end.

-spec maybe_schedule_sweep(state()) -> {ok, index(), state()} | state().
maybe_schedule_sweep({?MODULE, State}) ->
maybe_schedule_sweep(State) ->
Enabled = scheduler_enabled(),
ConcurrenyLimit = get_concurrency_limit(),
SweepWindow = sweep_window(),
Expand All @@ -237,13 +231,13 @@ maybe_schedule_sweep({?MODULE, State}) ->
end,
case Result of
{#sweep{index=Index}, State1} ->
{ok, Index, new(State1)};
{ok, Index, State1};
#state{} = State1 ->
new(State1)
State1
end.

-spec sweep_request(index(), state()) -> {ok, index(), state()} | state().
sweep_request(Index, {?MODULE, State}) ->
sweep_request(Index, State) ->
#state{sweeps = Sweeps} = State,
ConcurrenyLimit = get_concurrency_limit(),

Expand All @@ -265,12 +259,12 @@ sweep_request(Index, {?MODULE, State}) ->
end,
case Result of
{#sweep{index=Index}, State1} ->
{ok, Index, new(State1)};
{ok, Index, State1};
#state{} = State1 ->
new(State1)
State1
end.

start_sweep(Index, Pid, {?MODULE, State}) ->
start_sweep(Index, Pid, State) ->
#state{ sweeps = Sweeps} = State,
Sweeps1 =
dict:update(Index,
Expand All @@ -279,14 +273,11 @@ start_sweep(Index, Pid, {?MODULE, State}) ->
pid = Pid,
queue_time = undefined}
end, Sweeps),
new(State#state{sweeps = Sweeps1}).
State#state{sweeps = Sweeps1}.

%% =============================================================================
%% Internal Functions
%% =============================================================================
new(#state{} = State) ->
{?MODULE, State}.

finish_sweep(#sweep{index = Index}, #state{sweeps = Sweeps} = State) ->
Sweeps1 =
dict:update(Index,
Expand Down Expand Up @@ -526,7 +517,7 @@ queue_sweep(Index, #state{sweeps = Sweeps} = State) ->
State
end.

persist_sweeps({?MODULE, State}) ->
persist_sweeps(State) ->
#state{sweeps = Sweeps} = State,
CleanedSweep =
dict:map(fun(_Key, Sweep) ->
Expand All @@ -537,7 +528,7 @@ persist_sweeps({?MODULE, State}) ->
}
end, Sweeps),
file:write_file(sweep_file(?SWEEPS_FILE) , io_lib:fwrite("~p.\n",[CleanedSweep])),
{?MODULE, State}.
State.

sweep_file() ->
sweep_file(?SWEEPS_FILE).
Expand Down Expand Up @@ -611,7 +602,7 @@ setup() ->
meck:expect(riak_core_ring, my_indices, fun(ring) -> MyRingPart end),
meck:new(riak_kv_vnode),
meck:expect(riak_kv_vnode, sweep, fun(_, _, _) -> [] end),
State = update_sweep_specs({?MODULE, #state{}}),
State = update_sweep_specs(#state{}),
State1 = add_test_sweep_participant(State, Participants),
{MyRingPart, Participants, State1}.

Expand All @@ -635,7 +626,7 @@ test_sweep_participant(N) ->
get_module(N) ->
list_to_atom(integer_to_list(N)).

test_initiate_sweeps({MyRingPart, _Participants, {?MODULE, State}}) ->
test_initiate_sweeps({MyRingPart, _Participants, State}) ->
fun() ->
?assertEqual(length(MyRingPart), dict:size(State#state.sweeps))
end.
Expand All @@ -646,7 +637,7 @@ test_find_never_sweeped({MyRingPart, Participants, State}) ->
%% so it will be returnd by get_never_sweeped
[NoResult | Rest] = MyRingPart,
Result = [{get_module(Part), succ} ||Part <- Participants],
{?MODULE, State1} =
State1 =
lists:foldl(fun(Index, AccState) ->
update_finished_sweep(Index, {0, Result}, AccState)
end, State, Rest),
Expand All @@ -667,7 +658,7 @@ test_find_missing_part({MyRingPart, Participants, State}) ->
end, State, Rest),

Result2 = [{get_module(Part), succ} || Part <- tl(Participants)],
{?MODULE, State2} = update_finished_sweep(NotAllResult, {0, Result2}, State1),
State2 = update_finished_sweep(NotAllResult, {0, Result2}, State1),
?assertEqual([], get_never_runned_sweeps(State2#state.sweeps)),
MissingPart = find_expired_participant(os:timestamp(), State2#state.sweeps,
State2#state.sweep_participants),
Expand Down

0 comments on commit 3f84f3f

Please sign in to comment.