Skip to content

Commit

Permalink
KAZOO-801: add ratio threshold (disk/data > 1.5) and min_data_size th…
Browse files Browse the repository at this point in the history
…reshold (1MB) when deciding whether to compact a db or not
  • Loading branch information
James Aimonetti committed Apr 6, 2013
1 parent 2fdab9d commit 6c962ad
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 33 deletions.
79 changes: 68 additions & 11 deletions whistle_apps/lib/whistle_couch-1.0.0/src/couch_compactor_fsm.erl
@@ -1,5 +1,5 @@
%%%-------------------------------------------------------------------
%%% @copyright (C) 2012, 2600Hz
%%% @copyright (C) 2012-2013, 2600Hz
%%% @doc
%%%
%%% @end
Expand Down Expand Up @@ -67,14 +67,21 @@

-define(AUTOCOMPACTION_CHECK_TIMEOUT, whapps_config:get_integer(?CONFIG_CAT, <<"autocompaction_check">>, 60000)).

-define(MIN_RATIO, whapps_config:get_float(?CONFIG_CAT, <<"min_ratio">>, 1.5)).
-define(MIN_DATA, whapps_config:get_integer(?CONFIG_CAT, <<"min_data_size">>, 1048576)).

-define(SERVER, ?MODULE).

-define(HEUR_NONE, 'none').
-define(HEUR_RATIO, 'ratio').

-type req_job() :: 'req_compact' |
{'req_compact_node', ne_binary()} |
{'req_compact_db', ne_binary()} |
{'req_compact_db', ne_binary(), ne_binary()}.

-type not_compacting() :: {'error', 'compactor_down'}.
-type compactor_heuristic() :: ?HEUR_NONE | ?HEUR_RATIO.

-record(state, {
nodes :: ne_binaries()
Expand All @@ -92,6 +99,7 @@
,queued_jobs = queue:new() :: queue()
,current_job_pid :: pid()
,current_job_ref :: reference()
,current_job_heuristic = ?HEUR_NONE :: compactor_heuristic()
}).

%%%===================================================================
Expand Down Expand Up @@ -226,6 +234,7 @@ ready('compact', State) ->
,admin_conn='undefined'
,current_node='undefined'
,current_db='undefined'
,current_job_heuristic=?HEUR_RATIO
}};
ready({'compact_node', N}, State) ->
gen_fsm:send_event(self(), 'compact'),
Expand All @@ -234,6 +243,7 @@ ready({'compact_node', N}, State) ->
,admin_conn='undefined'
,current_node=N
,current_db='undefined'
,current_job_heuristic=?HEUR_RATIO
}};
ready({'compact_db', D}, State) ->
[N|Ns] = get_nodes(D),
Expand All @@ -244,6 +254,7 @@ ready({'compact_db', D}, State) ->
,admin_conn='undefined'
,current_node=N
,current_db=D
,current_job_heuristic=?HEUR_NONE
}};
ready({'compact_db', N, D}, State) ->
gen_fsm:send_event(self(), {'compact_db', N, D}),
Expand All @@ -253,6 +264,7 @@ ready({'compact_db', N, D}, State) ->
,admin_conn='undefined'
,current_node=N
,current_db=D
,current_job_heuristic=?HEUR_NONE
}};
ready('next_job', #state{queued_jobs=Jobs}=State) ->
case queue:out(Jobs) of
Expand Down Expand Up @@ -455,10 +467,14 @@ compact({'compact', N}, #state{admin_conn=AdminConn}=State) ->
compact({'compact', N, D}, #state{conn=Conn
,admin_conn=AdminConn
,dbs=[]
,current_job_heuristic=Heur
}=State) ->
case couch_util:db_exists(Conn, encode_db(D)) of
Encoded = encode_db(D),
case couch_util:db_exists(Conn, Encoded) andalso
should_compact(Conn, Encoded, Heur)
of
'false' ->
lager:debug("db ~s not found on ~s", [D, N]),
lager:debug("db ~s not found on ~s OR heuristic not met", [D, N]),
gen_fsm:send_event(self(), 'compact'),
{'next_state', 'compact', State#state{current_db='undefined'}};
'true' ->
Expand All @@ -474,10 +490,14 @@ compact({'compact', N, D}, #state{conn=Conn
compact({'compact', N, D}, #state{conn=Conn
,admin_conn=AdminConn
,dbs=[Db|Dbs]
,current_job_heuristic=Heur
}=State) ->
case couch_util:db_exists(Conn, encode_db(D)) of
Encoded = encode_db(D),
case couch_util:db_exists(Conn, Encoded) andalso
should_compact(Conn, Encoded, Heur)
of
'false' ->
lager:debug("db ~s not found on ~s", [D, N]),
lager:debug("db ~s not found on ~s OR heuristic not met", [D, N]),
gen_fsm:send_event(self(), {'compact', N, Db}),
{'next_state', 'compact', State#state{dbs=Dbs
,current_db=Db
Expand All @@ -498,10 +518,14 @@ compact({'compact_db', N, D}, #state{conn=Conn
,nodes=[]
,current_job_pid=P
,current_job_ref=Ref
,current_job_heuristic=Heur
}=State) ->
case couch_util:db_exists(Conn, encode_db(D)) of
Encoded = encode_db(D),
case couch_util:db_exists(Conn, Encoded) andalso
should_compact(Conn, Encoded, Heur)
of
'false' ->
lager:debug("db ~s not found on ~s", [D, N]),
lager:debug("db ~s not found on ~s OR heuristic not met", [D, N]),
maybe_send_update(P, Ref, 'job_finished'),
gen_fsm:send_event(self(), 'next_job'),
{'next_state', 'ready', State#state{conn='undefined'
Expand All @@ -523,10 +547,14 @@ compact({'compact_db', N, D}, #state{conn=Conn
compact({'compact_db', N, D}, #state{conn=Conn
,admin_conn=AdminConn
,nodes=[Node|Ns]
,current_job_heuristic=Heur
}=State) ->
case couch_util:db_exists(Conn, encode_db(D)) of
Encoded = encode_db(D),
case couch_util:db_exists(Conn, Encoded) andalso
should_compact(Conn, Encoded, Heur)
of
'false' ->
lager:debug("db ~s not found on ~s", [D, N]),
lager:debug("db ~s not found on ~s OR heuristic not met", [D, N]),
gen_fsm:send_event(self(), {'compact_db', Node, D}),
{'next_state', 'compact', State#state{nodes=Ns
,current_node=Node
Expand Down Expand Up @@ -960,7 +988,8 @@ get_ports(Node, Cookie) ->
get_ports(Node) ->
try {get_port(Node, ["chttpd", "port"], fun wh_couch_connections:get_port/0)
,get_port(Node, ["httpd", "port"], fun wh_couch_connections:get_admin_port/0)
} of
}
of
Ports -> Ports
catch
_E:_R ->
Expand All @@ -986,7 +1015,7 @@ maybe_start_auto_compaction_job() ->
(catch wh_couch_connections:test_admin_conn())
of
{'ok', _} ->
gen_fsm:send_event(self(), 'compact');
gen_fsm:send_event_after(?AUTOCOMPACTION_CHECK_TIMEOUT, 'compact');
_ ->
erlang:send_after(?AUTOCOMPACTION_CHECK_TIMEOUT, self(), '$maybe_start_auto_compaction_job'),
'ok'
Expand Down Expand Up @@ -1031,3 +1060,31 @@ compact_automatically(Boolean) ->
],
wh_cache:store_local(?WH_COUCH_CACHE, <<"compact_automatically">>, Boolean, CacheProps).

should_compact(_Conn, _Encoded, ?HEUR_NONE) -> 'true';
should_compact(Conn, Encoded, ?HEUR_RATIO) ->
case couch_util:db_info(Conn, Encoded) of
{'ok', Info} -> should_compact_ratio(Info);
{'error', _E} ->
lager:debug("failed to lookup info: ~p", [_E]),
'true' % be pessimistic and compact the db
end.

should_compact_ratio(Info) ->
Disk = wh_json:get_integer_value(<<"disk_size">>, Info),
Data = wh_json:get_integer_value([<<"other">>, <<"data_size">>], Info),
min_data_met(Data, ?MIN_DATA) andalso min_ratio_met(Data, Disk, ?MIN_RATIO).

min_data_met(Data, Min) when Data > Min ->
'true';
min_data_met(_Data, _Min) ->
lager:debug("data size ~b is under min_data_size threshold ~b", [_Data, _Min]),
'false'.

min_ratio_met(Data, Disk, MinRatio) ->
case Data / Disk of
R when R > MinRatio ->
'true';
_R ->
lager:debug("ratio ~p is under min threshold ~p", [_R, MinRatio]),
'false'
end.
44 changes: 22 additions & 22 deletions whistle_apps/lib/whistle_couch-1.0.0/src/couch_util.erl
Expand Up @@ -87,7 +87,7 @@ max_bulk_insert() -> ?MAX_BULK_INSERT.
get_new_connection(Host, Port, "", "") ->
get_new_conn(Host, Port, ?IBROWSE_OPTS);
get_new_connection(Host, Port, User, Pass) ->
get_new_conn(Host, Port, [{basic_auth, {User, Pass}} | ?IBROWSE_OPTS]).
get_new_conn(Host, Port, [{'basic_auth', {User, Pass}} | ?IBROWSE_OPTS]).

-spec get_new_conn(nonempty_string() | ne_binary(), pos_integer(), wh_proplist()) -> server().
get_new_conn(Host, Port, Opts) ->
Expand All @@ -105,13 +105,13 @@ server_info(#server{}=Conn) -> couchbeam:server_info(Conn).

-spec server_url(server()) -> ne_binary().
server_url(#server{host=Host, port=Port, options=Options}) ->
UserPass = case props:get_value(basic_auth, Options) of
undefined -> <<>>;
UserPass = case props:get_value('basic_auth', Options) of
'undefined' -> <<>>;
{U, P} -> list_to_binary([U, <<":">>, P])
end,
Protocol = case wh_util:is_true(props:get_value(is_ssl, Options)) of
false -> <<"http">>;
true -> <<"https">>
Protocol = case wh_util:is_true(props:get_value('is_ssl', Options)) of
'false' -> <<"http">>;
'true' -> <<"https">>
end,

list_to_binary([Protocol, <<"://">>, UserPass
Expand All @@ -138,15 +138,15 @@ db_create(#server{}=Conn, DbName) ->
-spec db_create(server(), ne_binary(), db_create_options()) -> boolean().
db_create(#server{}=Conn, DbName, Options) ->
case couchbeam:create_db(Conn, wh_util:to_list(DbName), [], Options) of
{'error', _} -> false;
{'ok', _} -> true
{'error', _} -> 'false';
{'ok', _} -> 'true'
end.

-spec db_delete(server(), ne_binary()) -> boolean().
db_delete(#server{}=Conn, DbName) ->
case couchbeam:delete_db(Conn, wh_util:to_list(DbName)) of
{'error', _} -> false;
{'ok', _} -> true
{'error', _} -> 'false';
{'ok', _} -> 'true'
end.

-spec db_replicate(server(), wh_json:object() | wh_proplist()) ->
Expand Down Expand Up @@ -194,8 +194,8 @@ do_db_view_cleanup(#db{}=Db) ->
-spec design_compact(server(), ne_binary(), ne_binary()) -> boolean().
design_compact(#server{}=Conn, DbName, Design) ->
case couchbeam:compact(get_db(Conn, DbName), Design) of
{'error', _E} -> false;
ok -> true
{'error', _E} -> 'false';
'ok' -> 'true'
end.

-spec design_info(server(), ne_binary(), ne_binary()) ->
Expand Down Expand Up @@ -293,12 +293,12 @@ do_get_design_info(#db{}=Db, Design) ->
couchbeam_error().
open_cache_doc(#server{}=Conn, DbName, DocId, Options) ->
case wh_cache:peek_local(?WH_COUCH_CACHE, {?MODULE, DbName, DocId}) of
{ok, _}=Ok -> Ok;
{error, not_found} ->
{'ok', _}=Ok -> Ok;
{'error', 'not_found'} ->
case open_doc(Conn, DbName, DocId, Options) of
{error, _}=E -> E;
{ok, JObj}=Ok ->
CacheProps = [{origin, {db, DbName, DocId}}],
{'error', _}=E -> E;
{'ok', JObj}=Ok ->
CacheProps = [{'origin', {'db', DbName, DocId}}],
wh_cache:store_local(?WH_COUCH_CACHE, {?MODULE, DbName, DocId}, JObj, CacheProps),
Ok
end
Expand All @@ -312,8 +312,8 @@ flush_cache_docs() -> wh_cache:flush_local(?WH_COUCH_CACHE).
flush_cache_docs(DbName) ->
Filter = fun({?MODULE, _, DbName1, _}=K, _) when DbName1 =:= DbName ->
wh_cache:erase_local(?WH_COUCH_CACHE, K),
true;
(_, _) -> false
'true';
(_, _) -> 'false'
end,
wh_cache:filter_local(?WH_COUCH_CACHE, Filter).

Expand Down Expand Up @@ -343,8 +343,8 @@ save_docs(#server{}=Conn, DbName, Docs, Options) ->
couchbeam_error().
lookup_doc_rev(#server{}=Conn, DbName, DocId) ->
case do_fetch_rev(get_db(Conn, DbName), DocId) of
?NE_BINARY = Rev -> {ok, Rev};
{error, _}=E -> E
?NE_BINARY = Rev -> {'ok', Rev};
{'error', _}=E -> E
end.

-spec ensure_saved(server(), ne_binary(), wh_json:object(), wh_proplist()) ->
Expand Down Expand Up @@ -394,7 +394,7 @@ do_delete_docs(#db{}=Db, Docs) ->
prepare_doc_for_del(Doc) ->
wh_json:from_list([{<<"_id">>, wh_json:get_value(<<"_id">>, Doc)}
,{<<"_rev">>, wh_json:get_value(<<"_rev">>, Doc)}
,{<<"_deleted">>, true}
,{<<"_deleted">>, 'true'}
]).

-spec do_ensure_saved(couchbeam_db(), wh_json:object(), wh_proplist()) ->
Expand Down

0 comments on commit 6c962ad

Please sign in to comment.