Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

WHISTLE-1403: query the supervisor for the compactor pid, and conveni…

…ence queries for if the compactor is running and what its status is
  • Loading branch information...
commit 9f41d0bc75cb2d0ee9f15785db27d9b26c746de8 1 parent 790c292
@jamesaimonetti jamesaimonetti authored
View
84 lib/whistle_couch-1.0.0/src/couch_compactor.erl
@@ -8,7 +8,13 @@
%%%-------------------------------------------------------------------
-module(couch_compactor).
--export([start_link/0, init/1]).
+-export([start_link/0
+ ,init/1
+ ]).
+
+-export([is_compactor_running/0
+ ,status/0
+ ]).
-export([compact_all/0
,compact_node/1
@@ -17,7 +23,7 @@
]).
%% internal API functions
--export([compact_shard/3]).
+-export([compact_shard/4]).
-include_lib("whistle_couch/include/wh_couch.hrl").
-define(SLEEP_BETWEEN_COMPACTION, 60000).
@@ -45,6 +51,27 @@ init(Parent) ->
proc_lib:init_ack(Parent, ignore)
end.
+-spec is_compactor_running/0 :: () -> boolean().
+is_compactor_running() ->
+ P = whistle_couch_sup:compactor_pid(),
+ is_pid(P) andalso is_process_alive(P).
+
+-spec status/0 :: () -> {'ok', wh_json:json_strings()} |
+ {'error', 'not_running' | 'timeout'}.
+status() ->
+ case is_compactor_running() of
+ false -> {'error', 'not_running'};
+ true ->
+ Self = self(),
+ P = whistle_couch_sup:compactor_pid(),
+ P ! {status, Self},
+ receive
+ Msg -> Msg
+ after 5000 ->
+ {error, timeout}
+ end
+ end.
+
-spec compact_all/0 :: () -> 'done'.
compact_all() ->
lager:debug("compacting all nodes"),
@@ -109,24 +136,41 @@ compact_node_db(NodeBin, DB, Conn, AdminConn) ->
DesignDocs = try get_db_design_docs(Conn, DBEncoded)
catch _:_ -> []
end,
- compact_node_shards(Shards, AdminConn, DesignDocs),
+ compact_node_shards(NodeBin, Shards, AdminConn, DesignDocs),
ok
end.
--spec compact_node_shards/3 :: ([ne_binary(),...], server(), [ne_binary(),...] | []) -> 'ok'.
-compact_node_shards(Shards, AdminConn, DesignDocs) ->
+-spec compact_node_shards/4 :: (ne_binary(), [ne_binary(),...], server(), wh_json:json_strings()) -> 'ok'.
+compact_node_shards(NodeBin, Shards, AdminConn, DesignDocs) ->
case catch(lists:split(?MAX_COMPACTING_SHARDS, Shards)) of
{'EXIT', _} ->
- compact_shards(Shards, AdminConn, DesignDocs),
+ compact_shards(NodeBin, Shards, AdminConn, DesignDocs),
+ wait_for_inter_compaction_timeout(NodeBin),
ok;
{Compact, Remaining} ->
- compact_shards(Compact, AdminConn, DesignDocs),
- compact_node_shards(Remaining, AdminConn, DesignDocs)
+ compact_shards(NodeBin, Compact, AdminConn, DesignDocs),
+ wait_for_inter_compaction_timeout(NodeBin),
+ compact_node_shards(NodeBin, Remaining, AdminConn, DesignDocs)
+ end.
+
+wait_for_inter_compaction_timeout(NodeBin) ->
+ wait_for_inter_compaction_timeout(
+ NodeBin
+ ,couch_config:fetch(<<"sleep_between_compaction">>, ?SLEEP_BETWEEN_COMPACTION)
+ ).
+wait_for_inter_compaction_timeout(NodeBin, Timeout) ->
+ Start = erlang:now(),
+ receive
+ {status, Srv} ->
+ Srv ! [{status, inter_compaction_timeout, Timeout}, {node, NodeBin}],
+ wait_for_inter_compaction_timeout(NodeBin, Timeout - wh_util:elapsed_ms(Start))
+ after Timeout ->
+ ok
end.
--spec compact_shards/3 :: ([ne_binary(),...], server(), [ne_binary(),...] | []) -> 'ok'.
-compact_shards(Shards, AdminConn, DesignDocs) ->
- Pids = [spawn_monitor(?MODULE, compact_shard, [AdminConn, Shard, DesignDocs])
+-spec compact_shards/4 :: (ne_binary(), [ne_binary(),...], server(), [ne_binary(),...] | []) -> 'ok'.
+compact_shards(NodeBin, Shards, AdminConn, DesignDocs) ->
+ Pids = [spawn_monitor(?MODULE, compact_shard, [NodeBin, AdminConn, Shard, DesignDocs])
|| Shard <- Shards
],
@@ -134,16 +178,16 @@ compact_shards(Shards, AdminConn, DesignDocs) ->
couch_config:fetch(<<"max_wait_for_compaction_pid">>, ?MAX_WAIT_FOR_COMPACTION_PID)
),
- _ = [receive {'DOWN', Ref, process, Pid, _} -> ok
- after MaxWait ->
- lager:debug("tired of waiting on ~p(~p), moving on", [Pid, Ref])
- end || {Pid,Ref} <- Pids
- ],
- ok = timer:sleep(couch_config:fetch(<<"sleep_between_compaction">>, ?SLEEP_BETWEEN_COMPACTION)).
+ [receive {'DOWN', Ref, process, Pid, _} -> ok;
+ {status, Srv} -> Srv ! [{node, NodeBin}, {shards, Shards}]
+ after MaxWait ->
+ lager:debug("tired of waiting on ~p(~p), moving on", [Pid, Ref])
+ end || {Pid,Ref} <- Pids
+ ].
--spec compact_shard/3 :: (server(), ne_binary(), [ne_binary(),...] | []) -> 'ok'.
-compact_shard(AdminConn, Shard, DesignDocs) ->
- put(callid, ?LOG_SYSTEM_ID),
+-spec compact_shard/4 :: (ne_binary(), server(), ne_binary(), [ne_binary(),...] | []) -> 'ok'.
+compact_shard(NodeBin, AdminConn, Shard, DesignDocs) ->
+ put(callid, NodeBin),
lager:debug("compacting shard ~s", [Shard]),
wait_for_compaction(AdminConn, Shard),
couch_util:db_compact(AdminConn, Shard),
View
11 lib/whistle_couch-1.0.0/src/whistle_couch_sup.erl
@@ -11,8 +11,10 @@
-include_lib("wh_couch.hrl").
--export([start_link/0]).
--export([init/1]).
+-export([start_link/0
+ ,init/1
+ ,compactor_pid/0
+ ]).
-define(CHILD(Name, Type), fun(N, cache) -> {N, {wh_cache, start_link, [N]}, permanent, 5000, worker, [wh_cache]};
(N, T) -> {N, {N, start_link, []}, permanent, 5000, T, [N]} end(Name, Type)).
@@ -37,6 +39,11 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+-spec compactor_pid/0 :: () -> pid() | 'undefined'.
+compactor_pid() ->
+ [Pid] = [P || {couch_compactor, P, worker, _} <- supervisor:which_children(whistle_couch_sup)],
+ Pid.
+
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
Please sign in to comment.
Something went wrong with that request. Please try again.