Skip to content
Closed

Pr 3766 #3877

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rel/overlay/etc/default.ini
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,12 @@ partitioned||* = true
;[smoosh.slack_views]
;priority = slack
;min_priority = 536870912
;
; Directory to store the state of the smoosh priority queue
;state_dir = .
;
; Interval between writes of state of smoosh priority queue to save file
;state_checkpoint_interval_in_sec = 180

[ioq]
; The maximum number of concurrent in-flight IO requests that
Expand Down
177 changes: 153 additions & 24 deletions src/smoosh/src/smoosh_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,30 @@

% public api.
-export([start_link/1, close/1, suspend/1, resume/1, get_status/1]).
-export([enqueue/3, last_updated/2, flush/1]).
-export([enqueue/3, last_updated/2, flush/1, is_key/2]).

% gen_server api.
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3,
terminate/2
]).

-define(DEFAULT_CHECKPOINT_INTERVAL_IN_SEC, 180).

-define(VSN, 1).

% records.

-record(state, {
active = [],
name,
waiting = smoosh_priority_queue:new(),
waiting,
paused = true,
starting = []
starting = [],
opened = false
}).

% public functions.
Expand Down Expand Up @@ -65,32 +69,39 @@ close(ServerRef) ->
flush(ServerRef) ->
gen_server:call(ServerRef, flush).

is_key(ServerRef, Key) ->
gen_server:call(ServerRef, {is_key, Key}).

% gen_server functions.

init(Name) ->
schedule_unpause(),
erlang:send_after(60 * 1000, self(), check_window),
{ok, #state{name = Name}}.
process_flag(trap_exit, true),
Queue = smoosh_priority_queue:new(Name),
State = #state{name = Name, waiting = Queue},
ok = gen_server:cast(self(), init),
{ok, State}.

handle_call({last_updated, Object}, _From, State0) ->
{ok, State} = code_change(nil, State0, nil),
State = maybe_open_queue(State0),
LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting),
{reply, LastUpdated, State};
handle_call(suspend, _From, State0) ->
{ok, State} = code_change(nil, State0, nil),
State = maybe_open_queue(State0),
#state{active = Active} = State,
[
catch erlang:suspend_process(Pid, [unless_suspending])
|| {_, Pid} <- Active
],
{reply, ok, State#state{paused = true}};
handle_call(resume, _From, State0) ->
{ok, State} = code_change(nil, State0, nil),
State = maybe_open_queue(State0),
#state{active = Active} = State,
[catch erlang:resume_process(Pid) || {_, Pid} <- Active],
{reply, ok, State#state{paused = false}};
handle_call(status, _From, State0) ->
{ok, State} = code_change(nil, State0, nil),
State = maybe_open_queue(State0),
{reply,
{ok, [
{active, length(State#state.active)},
Expand All @@ -99,17 +110,28 @@ handle_call(status, _From, State0) ->
]},
State};
handle_call(close, _From, State0) ->
{ok, State} = code_change(nil, State0, nil),
{stop, normal, ok, State};
State = maybe_open_queue(State0),
#state{waiting = Q} = State,
smoosh_priority_queue:close(Q),
{stop, normal, ok, State#state{waiting = nil, opened = false}};
handle_call(flush, _From, State0) ->
{ok, State} = code_change(nil, State0, nil),
{reply, ok, State#state{waiting = smoosh_priority_queue:new()}}.
#state{waiting = Q} = State = maybe_open_queue(State0),
{reply, ok, State#state{waiting = smoosh_priority_queue:flush(Q)}};
handle_call({is_key, Key}, _From, State0) ->
State = maybe_open_queue(State0),
#state{waiting = Waiting} = State,
{reply, smoosh_priority_queue:is_key(Key, Waiting), State}.

handle_cast(init, State0) ->
State1 = maybe_recover_state(State0),
State2 = maybe_open_queue(State1),
{noreply, State3} = handle_info(persist_queue, State2),
{noreply, State3};
handle_cast({enqueue, _Object, 0}, State0) ->
{ok, State} = code_change(nil, State0, nil),
State = maybe_open_queue(State0),
{noreply, State};
handle_cast({enqueue, Object, Priority}, State0) ->
{ok, State} = code_change(nil, State0, nil),
State = maybe_open_queue(State0),
{noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))}.

% We accept noproc here due to possibly having monitored a restarted compaction
Expand All @@ -118,7 +140,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) when
Reason == normal;
Reason == noproc
->
{ok, State} = code_change(nil, State0, nil),
State = maybe_open_queue(State0),
#state{active = Active, starting = Starting} = State,
{noreply,
maybe_start_compaction(
Expand All @@ -128,7 +150,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) when
}
)};
handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
{ok, State} = code_change(nil, State0, nil),
State = maybe_open_queue(State0),
#state{active = Active0, starting = Starting0} = State,
case lists:keytake(Job, 2, Active0) of
{value, {Key, _Pid}, Active1} ->
Expand All @@ -151,7 +173,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
end
end;
handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) ->
{ok, State} = code_change(nil, State0, nil),
State = maybe_open_queue(State0),
case lists:keytake(Ref, 1, State#state.starting) of
{value, {_, Key}, Starting1} ->
couch_log:notice(
Expand All @@ -168,7 +190,7 @@ handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) ->
{noreply, State}
end;
handle_info(check_window, State0) ->
{ok, State} = code_change(nil, State0, nil),
State = maybe_open_queue(State0),
#state{paused = Paused, name = Name} = State,
StrictWindow = smoosh_utils:get(Name, "strict_window", "false"),
FinalState =
Expand All @@ -194,21 +216,128 @@ handle_info(check_window, State0) ->
end,
erlang:send_after(60 * 1000, self(), check_window),
{noreply, FinalState};
handle_info(persist_queue, State0) ->
#state{waiting = Queue} = State0,
write_state_to_file(State0),
smoosh_priority_queue:write_to_file(Queue),
Checkpoint =
config:get_integer(
"smoosh", "state_checkpoint_interval_in_sec", ?DEFAULT_CHECKPOINT_INTERVAL_IN_SEC
) * 1000,
erlang:send_after(Checkpoint, self(), persist_queue),
{noreply, State0};
handle_info(pause, State0) ->
{ok, State} = code_change(nil, State0, nil),
State = maybe_open_queue(State0),
{noreply, State#state{paused = true}};
handle_info(unpause, State0) ->
{ok, State} = code_change(nil, State0, nil),
State = maybe_open_queue(State0),
{noreply, maybe_start_compaction(State#state{paused = false})}.

terminate(_Reason, _State) ->
terminate(_Reason, #state{name = Name, waiting = Q}) ->
file:delete(active_file_name(Name)),
file:delete(starting_file_name(Name)),
if
Q =/= nil ->
smoosh_priority_queue:close(Q);
true ->
nil
end,
ok.

code_change(_OldVsn, #state{} = State, _Extra) ->
{ok, State}.
maybe_recover_state(#state{name = Name} = State) ->
Active = recover(active_file_name(Name)),
Starting = recover(starting_file_name(Name)),
DatabaseDir = config:get("couchdb", "database_dir"),
ViewDir = config:get("couchdb", "view_index_dir"),
Active1 = get_matching_compact_files(DatabaseDir, Active),
Active2 = get_matching_compact_files(ViewDir, Active),
Active3 = Active1 ++ Active2,
State#state{active = Active3, starting = Starting}.

get_matching_compact_files(Dir, Active) ->
MatchingFiles = filelib:fold_files(
Dir,
"^[a-zA-Z0-9_.-]*.compact$",
true,
(fun(FilePath, Acc) ->
FilePrefix = filename:rootname(FilePath, ".compact"),
case lists:keyfind(FilePrefix, 1, Active) of
false ->
Acc;
Tuple ->
[Tuple | Acc]
end
end),
[]
),
lists:reverse(MatchingFiles).

recover(FilePath) ->
case do_recover(FilePath) of
{ok, List} ->
List;
error ->
[]
end.

do_recover(FilePath) ->
case file:read_file(FilePath) of
{ok, Content} ->
<<Vsn, Binary/binary>> = Content,
try parse_state(Vsn, ?VSN, Binary) of
Term ->
{ok, Term}
catch
error:Reason ->
couch_log:error(
"~p Invalid state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
),
file:delete(FilePath),
error
end;
{error, enoent} ->
couch_log:notice(
"~p (~p) State file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]
),
error;
{error, Reason} ->
couch_log:error(
"~p Cannot read the state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
),
file:delete(FilePath),
error
end.

parse_state(1, ?VSN, Binary) ->
erlang:binary_to_term(Binary, [safe]);
parse_state(Vsn, ?VSN, _) ->
error({unsupported_version, Vsn}).

write_state_to_file(#state{name = Name, active = Active, starting = Starting}) ->
write_to_file(Active, active_file_name(Name)),
write_to_file(Starting, starting_file_name(Name)).

write_to_file(List, FileName) ->
OnDisk = <<?VSN, (erlang:term_to_binary(List, [compressed, {minor_version, 1}]))/binary>>,
TmpFileName = FileName ++ ".tmp",
file:delete(TmpFileName),
file:write_file(TmpFileName, OnDisk, [sync]),
file:delete(FileName),
file:rename(TmpFileName, FileName).

active_file_name(Name) ->
filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active").

starting_file_name(Name) ->
filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting").

% private functions.

maybe_open_queue(#state{opened = true} = State) ->
State;
maybe_open_queue(#state{opened = false, waiting = Queue} = State) ->
State#state{waiting = smoosh_priority_queue:open(Queue), opened = true}.

add_to_queue(Key, Priority, State) ->
#state{active = Active, waiting = Q} = State,
case lists:keymember(Key, 1, Active) of
Expand Down
Loading