Skip to content

Commit

Permalink
WHISTLE-201: Fleshed out couch compactor for peeking behind the veil …
Browse files Browse the repository at this point in the history
…of a proxy
  • Loading branch information
James Aimonetti committed Jul 9, 2011
1 parent d2a99a7 commit afe53ec
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 75 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -11,5 +11,6 @@ whistle_apps/apps/*/ebin/*.app
whistle_apps/apps/*/log/*
lib/whistle*/ebin/*.beam
lib/whistle*/ebin/*.app
lib/whistle*/log/*.sasl
utils/*/ebin/*.beam
utils/*/ebin/*.app
Empty file.
25 changes: 25 additions & 0 deletions lib/whistle_couch-1.0.0/conf/vm.args
@@ -0,0 +1,25 @@
# Each node in the system must have a unique name. We recommend running with
# -name and a fqdn
-name whistle_couch

# Inter-node communication requires each node to have the same cookie
# Comment this line out if you use ~/.erlang.cookie to manage your cookie
# file
-setcookie W41stl3@mqP

# Tell SASL not to log progress reports, and log SASL errors to a file
-sasl errlog_type error
-sasl sasl_error_logger '{file, "log/error_log.sasl"}'
-boot start_sasl

# Use kernel poll functionality if supported by emulator
+K true

# Start a pool of asynchronous IO threads
+A 8

# Comment this line out if you want the Erlang shell
+Bd

# Limit the size of error reports
-riak_err term_max_size 8192 fmt_max_bytes 9000
4 changes: 2 additions & 2 deletions lib/whistle_couch-1.0.0/include/wh_couch.hrl
Expand Up @@ -3,7 +3,7 @@
-include_lib("whistle/include/wh_log.hrl").

-record(design_data, {
db_name = <<>> :: binary()
db_name = <<>> :: binary() %% the actual DB name, encoded (/ -> %2f)
,design_name = <<>> :: binary()
,shards = [] :: list(binary()) | []
,disk_size = 0 :: non_neg_integer()
Expand All @@ -12,7 +12,7 @@
,admin_conn = #server{} :: #server{}
}).
-record(db_data, {
db_name = <<>> :: binary()
db_name = <<>> :: binary() %% the shard name
,disk_size = 0 :: non_neg_integer()
,data_size = 0 :: non_neg_integer()
,conn = #server{} :: #server{}
Expand Down
Empty file.
124 changes: 67 additions & 57 deletions lib/whistle_couch-1.0.0/src/couch_compactor.erl
Expand Up @@ -13,7 +13,7 @@
-include("wh_couch.hrl").

%% API
-export([start_link/0, get_ratios/0, force_compaction/2, compact_db/1]).
-export([start_link/0, force_compaction/0, force_compaction/2, compact_db/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
Expand All @@ -35,14 +35,16 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

-spec(get_ratios/0 :: () -> list(#db_data{}) | []).
get_ratios() ->
gen_server:call(?SERVER, get_ratios, infinity).

-spec(force_compaction/2 :: (MDS :: integer(), CT :: integer()) -> ok).
-spec force_compaction/2 :: (MDS, CT) -> ok when
MDS :: integer(),
CT :: integer().
force_compaction(MDS, CT) ->
gen_server:cast(?SERVER, {force_compaction, MDS, CT}).

-spec force_compaction/0 :: () -> ok.
force_compaction() ->
gen_server:cast(?SERVER, {force_compaction, 1, 1}).

-spec compact_db/1 :: (DBName) -> ok when
DBName :: binary().
compact_db(DBName) ->
Expand All @@ -67,23 +69,7 @@ init([]) ->
?LOG_SYS("Started compactor"),
{ok, ok, ?TIMEOUT}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @spec handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call(get_ratios, From, State) ->
%% ?LOG_SYS("Retrieving ratios"),
%% spawn(fun() -> gen_server:reply(From, get_dbs_and_designs()) end),
handle_call(_,_,State) ->
{noreply, State}.

%%--------------------------------------------------------------------
Expand All @@ -100,7 +86,7 @@ handle_cast({force_compaction, MDS, CT}, State) ->
spawn(fun() -> compact_nodes(MDS, CT) end),
{noreply, State};
handle_cast({compact_db, DBName}, State) ->
%% spawn(fun() -> compact_a_db(DBName) end),
spawn(fun() -> compact_a_db(DBName) end),
{noreply, State}.

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -154,27 +140,27 @@ code_change(_OldVsn, State, _Extra) ->
MDS :: pos_integer(),
CT :: pos_integer().
compact_nodes(MDS, CT) ->
%% rpc:call(couch@hostname, mochiweb_socket_server, get, [couch_httpd, port])
case couch_mgr:admin_all_docs(<<"nodes">>) of
{ok, []} ->
?LOG_SYS("No Nodes to compact");
{ok, Nodes} ->
NodesData = [ get_node_data(wh_json:get_value(<<"id">>, Node)) || Node <- Nodes ],
[ compact(D, MDS, CT) || NodeData <- NodesData, D <- NodeData ];
put(callid, undefined),
[ spawn(fun() -> [ compact(D, MDS, CT) || D <- NodeData ] end) || NodeData <- NodesData ];
{error, _E} ->
?LOG_SYS("Failed to lookup nodes: ~p", [_E])
end.

-spec get_node_data/1 :: (Node) -> [#db_data{} | #design_data{},...] | [] when
Node :: binary().
get_node_data(Node) ->
put(callid, Node),
[_Name, H] = binary:split(Node, <<"@">>),
Host = whistle_util:to_list(H),
?LOG_SYS("Trying to contact host ~s (node ~s)", [Host, _Name]),

{User,Pass} = couch_mgr:get_creds(),
Port = couch_mgr:get_port(),
AdminPort = couch_mgr:get_admin_port(),
{Port,AdminPort} = get_ports(whistle_util:to_atom(Node, true)),

{Conn, AdminConn} = get_conns(Host, Port, User, Pass, AdminPort),
get_dbs_and_designs(Conn, AdminConn).
Expand All @@ -189,6 +175,35 @@ get_conns(Host, Port, User, Pass, AdminPort) ->
{couch_util:get_new_connection(Host, Port, User, Pass),
couch_util:get_new_connection(Host, AdminPort, User, Pass)}.

-spec get_ports/1 :: (Node) -> {integer(), integer()} when
Node :: atom().
get_ports(Node) ->
erlang:set_cookie(Node, couch_mgr:get_node_cookie()),
get_ports(Node, net_adm:ping(Node)).

get_ports(Node, pong) ->
?LOG_SYS("Trying to find ports from node ~s", [Node]),
Port = case rpc:call(Node, couch_config, get, ["chttpd", "port"]) of
{badrpc, _} ->
?LOG_SYS("Failed to get port from RPC"),
couch_mgr:get_port();
P ->
?LOG_SYS("Got port ~s", [P]),
whistle_util:to_integer(P)
end,
AdminPort = case rpc:call(Node, couch_config, get, ["httpd", "port"]) of
{badrpc, _} ->
?LOG_SYS("Failed to get admin port from RPC"),
couch_mgr:get_admin_port();
AP ->
?LOG_SYS("Got admin port ~s", [AP]),
whistle_util:to_integer(AP)
end,
{Port, AdminPort};
get_ports(_Node, pang) ->
?LOG_SYS("Using same ports as couch_mgr"),
{couch_mgr:get_port(), couch_mgr:get_admin_port()}.

-spec get_dbs_and_designs/2 :: (Conn, AdminConn) -> [#db_data{} | #design_data{},...] | [] when
Conn :: #server{},
AdminConn :: #server{}.
Expand Down Expand Up @@ -242,6 +257,9 @@ get_design_docs(Conn, AdminConn, DBData) ->
lists:foldr(fun({DBName, DesignID}, Acc) ->
case get_design_data(Conn, DBName, DesignID) of
{error, failed} -> Acc;
{ok, _ErrCode, _, _Resp} ->
?LOG_SYS("Skipping ~s:~s, ~s: ~s", [DBName, DesignID, _ErrCode, _Resp]),
Acc;
{ok, DDocData} ->
DataSize = wh_json:get_value([<<"view_index">>, <<"data_size">>], DDocData, -1),
DiskSize = wh_json:get_value([<<"view_index">>, <<"disk_size">>], DDocData, -1),
Expand Down Expand Up @@ -348,32 +366,24 @@ find_shards(DBName, [#db_data{db_name=Shard}|DBs], Acc) ->
_ -> find_shards(DBName, DBs, [Shard | Acc])
end.

%% compact_a_db(DBName) ->
%% ?LOG_SYS("Compacting ~s", [DBName]),
%% {ok, ShardDBs} = couch_mgr:admin_db_info(),
%% ?LOG_SYS("Found shards: ~b", [length(ShardDBs)]),
%% DBShards = [create_db_data(binary:replace(Shard, <<"/">>, <<"%2f">>, [global]))
%% || Shard <- ShardDBs, binary:match(Shard, DBName) =/= nomatch],
%% case DBShards of
%% [] ->
%% ?LOG_SYS("Sad, no shards matched ~s", [DBName]),
%% sad_face;
%% L ->
%% ?LOG_SYS("Found ~b shards related", [length(L)]),
%% DBandDDocs = get_db_design_docs(DBName),
%% ?LOG_SYS("DB and design docs: ~b", [length(DBandDDocs)]),

%% Compactable = lists:foldr(fun({Name, DesignID}, L0) ->
%% {ok, DDocData} = get_design_data(Name, DesignID),

%% DataSize = wh_json:get_value([<<"view_index">>, <<"data_size">>], DDocData, -1),
%% DiskSize = wh_json:get_value([<<"view_index">>, <<"disk_size">>], DDocData, -1),

%% ?LOG_SYS("design info for ~s:~s: Dataset: ~b Disksize: ~b", [DesignID, Name, DataSize, DiskSize]),

%% [ #design_data{db_name=Name, design_name=DesignID, shards=find_shards(Name, L)
%% ,disk_size=DiskSize, data_size=DataSize}
%% | L0]
%% end, L, DBandDDocs),
%% [compact(D, 1, 1) || D <- Compactable]
%% end.
compact_a_db(DBName) ->
?LOG_SYS("Compacting ~s", [DBName]),

case couch_mgr:admin_all_docs(<<"nodes">>) of
{ok, []} ->
?LOG_SYS("No Nodes to compact ~s", [DBName]);
{ok, Nodes} ->
NodesData = [ get_node_data(wh_json:get_value(<<"id">>, Node)) || Node <- Nodes ],
[ spawn(fun() -> [ compact(D, 1, 1) || D <- NodeData, is_the_db(binary:replace(DBName, <<"/">>, <<"%2f">>, [global]), D) ] end) || NodeData <- NodesData ];
{error, _E} ->
?LOG_SYS("Failed to lookup nodes: ~p", [_E])
end.

is_the_db(DBName, #db_data{db_name=Shard}) ->
?LOG_SYS("Does ~s match ~s?", [Shard, DBName]),
binary:match(Shard, DBName) =/= nomatch;
is_the_db(DBName, #design_data{db_name=Shard}) ->
?LOG_SYS("Does ~s match ~s?", [Shard, DBName]),
binary:match(Shard, DBName) =/= nomatch;
is_the_db(_,_) ->
false.

0 comments on commit afe53ec

Please sign in to comment.