diff --git a/core/whistle_couch-1.0.0/src/couch_compactor_fsm.erl b/core/whistle_couch-1.0.0/src/couch_compactor_fsm.erl index b17f5c518e4..d57cef32291 100644 --- a/core/whistle_couch-1.0.0/src/couch_compactor_fsm.erl +++ b/core/whistle_couch-1.0.0/src/couch_compactor_fsm.erl @@ -33,7 +33,7 @@ ]). %% Internal --export([compact_shard/3 +-export([compact_shard/4 ,rebuild_design_docs/3 ]). @@ -753,37 +753,39 @@ compact({'compact', N, D, [], _}, #state{dbs=[Db|Dbs]}=State) -> {'next_state', 'compact', State#state{dbs=Dbs}}; compact({'compact', N, D, Ss, DDs}, #state{admin_conn=AdminConn + ,conn=Conn ,dbs=[] }=State) -> try lists:split(?MAX_COMPACTING_SHARDS, Ss) of {Compact, Shards} -> lager:debug("compacting ~b shards for ~s on ~s", [?MAX_COMPACTING_SHARDS, D, N]), - ShardsPidRef = compact_shards(AdminConn, N, Compact, DDs), + ShardsPidRef = compact_shards(Conn, AdminConn, N, Compact, DDs), {'next_state', 'compact', State#state{shards_pid_ref=ShardsPidRef ,next_compaction_msg={'compact', N, D, Shards, DDs} }} catch 'error':'badarg' -> lager:debug("compacting last of the shards for ~s on ~s", [D, N]), - ShardsPidRef = compact_shards(AdminConn, N, Ss, DDs), + ShardsPidRef = compact_shards(Conn, AdminConn, N, Ss, DDs), {'next_state', 'compact', State#state{shards_pid_ref=ShardsPidRef ,next_compaction_msg='compact' }} end; compact({'compact', N, D, Ss, DDs}, #state{admin_conn=AdminConn + ,conn=Conn ,dbs=[Db|Dbs] }=State) -> try lists:split(?MAX_COMPACTING_SHARDS, Ss) of {Compact, Shards} -> lager:debug("compacting ~b shards for ~s on ~s", [?MAX_COMPACTING_SHARDS, D, N]), - ShardsPidRef = compact_shards(AdminConn, N, Compact, DDs), + ShardsPidRef = compact_shards(Conn, AdminConn, N, Compact, DDs), {'next_state', 'compact', State#state{shards_pid_ref=ShardsPidRef ,next_compaction_msg={'compact', N, D, Shards, DDs} }} catch 'error':'badarg' -> lager:debug("compacting last of the shards for ~s on ~s", [D, N]), - ShardsPidRef = compact_shards(AdminConn, N, Ss, DDs), + ShardsPidRef = compact_shards(Conn, AdminConn, N, Ss, DDs), {'next_state', 'compact', State#state{dbs=Dbs ,shards_pid_ref=ShardsPidRef ,next_compaction_msg={'compact', N, Db} @@ -835,17 +837,19 @@ compact({'compact_db', N, D, [], _}, #state{nodes=[Node|Ns]}=State) -> ,nodes=Ns ,dbs=[D] }}; -compact({'compact_db', N, D, Ss, DDs}, #state{admin_conn=AdminConn}=State) -> +compact({'compact_db', N, D, Ss, DDs}, #state{admin_conn=AdminConn + ,conn=Conn + }=State) -> lager:debug("compacting shards for db '~s' on node '~s'", [D, N]), try lists:split(?MAX_COMPACTING_SHARDS, Ss) of {Compact, Shards} -> - ShardsPidRef = compact_shards(AdminConn, N, Compact, DDs), + ShardsPidRef = compact_shards(Conn, AdminConn, N, Compact, DDs), {'next_state', 'compact', State#state{shards_pid_ref=ShardsPidRef ,next_compaction_msg={'compact_db', N, D, Shards, DDs} }} catch 'error':'badarg' -> - ShardsPidRef = compact_shards(AdminConn, N, Ss, DDs), + ShardsPidRef = compact_shards(Conn, AdminConn, N, Ss, DDs), {'next_state', 'compact', State#state{shards_pid_ref=ShardsPidRef ,next_compaction_msg={'rebuild_views', N, D, DDs} }} @@ -1223,11 +1227,13 @@ rebuild_view(Conn, D, DD, View) -> 'ok' = timer:sleep(?SLEEP_BETWEEN_VIEWS) end. --spec compact_shards(server(), list(), list(), list()) -> pid_ref(). -compact_shards(AdminConn, Node, Ss, DDs) -> +-spec compact_shards(server(), server(), list(), list(), list()) -> pid_ref(). +compact_shards(Conn, AdminConn, Node, Ss, DDs) -> PR = spawn_monitor(fun() -> put('callid', Node), - Ps = [spawn_monitor(?MODULE, 'compact_shard', [AdminConn, Shard, DDs]) || Shard <- Ss], + Ps = [spawn_monitor(?MODULE, 'compact_shard', [Conn, AdminConn, Shard, DDs]) + || Shard <- Ss + ], lager:debug("shard compaction pids: ~p", [Ps]), wait_for_pids(?MAX_WAIT_FOR_COMPACTION_PIDS, Ps) end), @@ -1250,8 +1256,8 @@ wait_for_pids(MaxWait, [{P,Ref}|Ps]) -> wait_for_pids(MaxWait, Ps) end. --spec compact_shard(server(), ne_binary(), ne_binaries()) -> 'ok'. -compact_shard(AdminConn, S, DDs) -> +-spec compact_shard(server(), server(), ne_binary(), ne_binaries()) -> 'ok'. +compact_shard(Conn, AdminConn, S, DDs) -> put('callid', 'compact_shard'), wait_for_compaction(AdminConn, S), @@ -1259,27 +1265,27 @@ compact_shard(AdminConn, S, DDs) -> case get_db_disk_and_data(AdminConn, S) of 'undefined' -> lager:debug("beginning compacting shard"), - start_compacting_shard(AdminConn, S, DDs); + start_compacting_shard(Conn, AdminConn, S, DDs); 'not_found' -> 'ok'; {BeforeDisk, BeforeData} -> lager:debug("beginning compacting shard: ~p disk/~p data", [BeforeDisk, BeforeData]), - start_compacting_shard(AdminConn, S, DDs) + start_compacting_shard(Conn, AdminConn, S, DDs) end. --spec start_compacting_shard(server(), ne_binary(), ne_binaries()) -> 'ok'. -start_compacting_shard(AdminConn, S, DDs) -> +-spec start_compacting_shard(server(), server(), ne_binary(), ne_binaries()) -> 'ok'. +start_compacting_shard(Conn, AdminConn, S, DDs) -> case couch_util:db_compact(AdminConn, S) of - 'true' -> continue_compacting_shard(AdminConn, S, DDs); + 'true' -> continue_compacting_shard(Conn, AdminConn, S, DDs); 'false' -> lager:debug("compaction of shard failed, skipping") end. --spec continue_compacting_shard(server(), ne_binary(), ne_binaries()) -> 'ok'. -continue_compacting_shard(AdminConn, S, DDs) -> +-spec continue_compacting_shard(server(), server(), ne_binary(), ne_binaries()) -> 'ok'. +continue_compacting_shard(Conn, AdminConn, S, DDs) -> wait_for_compaction(AdminConn, S), %% cleans up old view indexes lager:debug("db view cleanup starting"), - couch_util:db_view_cleanup(AdminConn, S), + couch_util:db_view_cleanup(Conn, S), wait_for_compaction(AdminConn, S), diff --git a/core/whistle_couch-1.0.0/src/couch_util.erl b/core/whistle_couch-1.0.0/src/couch_util.erl index 56d7518b23c..2c5411be93c 100644 --- a/core/whistle_couch-1.0.0/src/couch_util.erl +++ b/core/whistle_couch-1.0.0/src/couch_util.erl @@ -89,7 +89,7 @@ max_bulk_insert() -> ?MAX_BULK_INSERT. %%------------------------------------------------------------------------------ -spec get_new_connection(nonempty_string() | ne_binary(), pos_integer(), string(), string()) -> server() | - {'error', 'timeout'}. + {'error', 'timeout' | 'ehostunreach' | term()}. get_new_connection(Host, Port, "", "") -> get_new_conn(Host, Port, ?IBROWSE_OPTS); get_new_connection(Host, Port, User, Pass) ->