Skip to content

Commit

Permalink
v3.06: execute view cleanup against 5984 vs 5986
Browse files Browse the repository at this point in the history
  • Loading branch information
James Aimonetti committed Feb 3, 2014
1 parent 399bb1d commit 014a971
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 22 deletions.
48 changes: 27 additions & 21 deletions core/whistle_couch-1.0.0/src/couch_compactor_fsm.erl
Expand Up @@ -33,7 +33,7 @@
]).

%% Internal
-export([compact_shard/3
-export([compact_shard/4
,rebuild_design_docs/3
]).

Expand Down Expand Up @@ -753,37 +753,39 @@ compact({'compact', N, D, [], _}, #state{dbs=[Db|Dbs]}=State) ->
{'next_state', 'compact', State#state{dbs=Dbs}};

compact({'compact', N, D, Ss, DDs}, #state{admin_conn=AdminConn
,conn=Conn
,dbs=[]
}=State) ->
try lists:split(?MAX_COMPACTING_SHARDS, Ss) of
{Compact, Shards} ->
lager:debug("compacting ~b shards for ~s on ~s", [?MAX_COMPACTING_SHARDS, D, N]),
ShardsPidRef = compact_shards(AdminConn, N, Compact, DDs),
ShardsPidRef = compact_shards(Conn, AdminConn, N, Compact, DDs),
{'next_state', 'compact', State#state{shards_pid_ref=ShardsPidRef
,next_compaction_msg={'compact', N, D, Shards, DDs}
}}
catch
'error':'badarg' ->
lager:debug("compacting last of the shards for ~s on ~s", [D, N]),
ShardsPidRef = compact_shards(AdminConn, N, Ss, DDs),
ShardsPidRef = compact_shards(Conn, AdminConn, N, Ss, DDs),
{'next_state', 'compact', State#state{shards_pid_ref=ShardsPidRef
,next_compaction_msg='compact'
}}
end;
compact({'compact', N, D, Ss, DDs}, #state{admin_conn=AdminConn
,conn=Conn
,dbs=[Db|Dbs]
}=State) ->
try lists:split(?MAX_COMPACTING_SHARDS, Ss) of
{Compact, Shards} ->
lager:debug("compacting ~b shards for ~s on ~s", [?MAX_COMPACTING_SHARDS, D, N]),
ShardsPidRef = compact_shards(AdminConn, N, Compact, DDs),
ShardsPidRef = compact_shards(Conn, AdminConn, N, Compact, DDs),
{'next_state', 'compact', State#state{shards_pid_ref=ShardsPidRef
,next_compaction_msg={'compact', N, D, Shards, DDs}
}}
catch
'error':'badarg' ->
lager:debug("compacting last of the shards for ~s on ~s", [D, N]),
ShardsPidRef = compact_shards(AdminConn, N, Ss, DDs),
ShardsPidRef = compact_shards(Conn, AdminConn, N, Ss, DDs),
{'next_state', 'compact', State#state{dbs=Dbs
,shards_pid_ref=ShardsPidRef
,next_compaction_msg={'compact', N, Db}
Expand Down Expand Up @@ -835,17 +837,19 @@ compact({'compact_db', N, D, [], _}, #state{nodes=[Node|Ns]}=State) ->
,nodes=Ns
,dbs=[D]
}};
compact({'compact_db', N, D, Ss, DDs}, #state{admin_conn=AdminConn}=State) ->
compact({'compact_db', N, D, Ss, DDs}, #state{admin_conn=AdminConn
,conn=Conn
}=State) ->
lager:debug("compacting shards for db '~s' on node '~s'", [D, N]),
try lists:split(?MAX_COMPACTING_SHARDS, Ss) of
{Compact, Shards} ->
ShardsPidRef = compact_shards(AdminConn, N, Compact, DDs),
ShardsPidRef = compact_shards(Conn, AdminConn, N, Compact, DDs),
{'next_state', 'compact', State#state{shards_pid_ref=ShardsPidRef
,next_compaction_msg={'compact_db', N, D, Shards, DDs}
}}
catch
'error':'badarg' ->
ShardsPidRef = compact_shards(AdminConn, N, Ss, DDs),
ShardsPidRef = compact_shards(Conn, AdminConn, N, Ss, DDs),
{'next_state', 'compact', State#state{shards_pid_ref=ShardsPidRef
,next_compaction_msg={'rebuild_views', N, D, DDs}
}}
Expand Down Expand Up @@ -1223,11 +1227,13 @@ rebuild_view(Conn, D, DD, View) ->
'ok' = timer:sleep(?SLEEP_BETWEEN_VIEWS)
end.

-spec compact_shards(server(), list(), list(), list()) -> pid_ref().
compact_shards(AdminConn, Node, Ss, DDs) ->
-spec compact_shards(server(), server(), list(), list(), list()) -> pid_ref().
compact_shards(Conn, AdminConn, Node, Ss, DDs) ->
PR = spawn_monitor(fun() ->
put('callid', Node),
Ps = [spawn_monitor(?MODULE, 'compact_shard', [AdminConn, Shard, DDs]) || Shard <- Ss],
Ps = [spawn_monitor(?MODULE, 'compact_shard', [Conn, AdminConn, Shard, DDs])
|| Shard <- Ss
],
lager:debug("shard compaction pids: ~p", [Ps]),
wait_for_pids(?MAX_WAIT_FOR_COMPACTION_PIDS, Ps)
end),
Expand All @@ -1250,36 +1256,36 @@ wait_for_pids(MaxWait, [{P,Ref}|Ps]) ->
wait_for_pids(MaxWait, Ps)
end.

-spec compact_shard(server(), ne_binary(), ne_binaries()) -> 'ok'.
compact_shard(AdminConn, S, DDs) ->
-spec compact_shard(server(), server(), ne_binary(), ne_binaries()) -> 'ok'.
compact_shard(Conn, AdminConn, S, DDs) ->
put('callid', 'compact_shard'),

wait_for_compaction(AdminConn, S),

case get_db_disk_and_data(AdminConn, S) of
'undefined' ->
lager:debug("beginning compacting shard"),
start_compacting_shard(AdminConn, S, DDs);
start_compacting_shard(Conn, AdminConn, S, DDs);
'not_found' -> 'ok';
{BeforeDisk, BeforeData} ->
lager:debug("beginning compacting shard: ~p disk/~p data", [BeforeDisk, BeforeData]),
start_compacting_shard(AdminConn, S, DDs)
start_compacting_shard(Conn, AdminConn, S, DDs)
end.

-spec start_compacting_shard(server(), ne_binary(), ne_binaries()) -> 'ok'.
start_compacting_shard(AdminConn, S, DDs) ->
-spec start_compacting_shard(server(), server(), ne_binary(), ne_binaries()) -> 'ok'.
start_compacting_shard(Conn, AdminConn, S, DDs) ->
case couch_util:db_compact(AdminConn, S) of
'true' -> continue_compacting_shard(AdminConn, S, DDs);
'true' -> continue_compacting_shard(Conn, AdminConn, S, DDs);
'false' -> lager:debug("compaction of shard failed, skipping")
end.

-spec continue_compacting_shard(server(), ne_binary(), ne_binaries()) -> 'ok'.
continue_compacting_shard(AdminConn, S, DDs) ->
-spec continue_compacting_shard(server(), server(), ne_binary(), ne_binaries()) -> 'ok'.
continue_compacting_shard(Conn, AdminConn, S, DDs) ->
wait_for_compaction(AdminConn, S),

%% cleans up old view indexes
lager:debug("db view cleanup starting"),
couch_util:db_view_cleanup(AdminConn, S),
couch_util:db_view_cleanup(Conn, S),

wait_for_compaction(AdminConn, S),

Expand Down
2 changes: 1 addition & 1 deletion core/whistle_couch-1.0.0/src/couch_util.erl
Expand Up @@ -89,7 +89,7 @@ max_bulk_insert() -> ?MAX_BULK_INSERT.
%%------------------------------------------------------------------------------
-spec get_new_connection(nonempty_string() | ne_binary(), pos_integer(), string(), string()) ->
server() |
{'error', 'timeout'}.
{'error', 'timeout' | 'ehostunreach' | term()}.
get_new_connection(Host, Port, "", "") ->
get_new_conn(Host, Port, ?IBROWSE_OPTS);
get_new_connection(Host, Port, User, Pass) ->
Expand Down

0 comments on commit 014a971

Please sign in to comment.