Skip to content

Commit

Permalink
KAZOO-2617: make archive generic and tweaking the maintenance routines
Browse files Browse the repository at this point in the history
  • Loading branch information
k-anderson committed Jun 27, 2014
1 parent 08f9714 commit dc7fa5c
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 79 deletions.
72 changes: 11 additions & 61 deletions core/kazoo_modb-1.0.0/src/kazoo_modb.erl
Expand Up @@ -14,7 +14,9 @@
-export([open_doc/2, open_doc/3, open_doc/4]).
-export([save_doc/2, save_doc/3, save_doc/4]).
-export([get_modb/1, get_modb/2, get_modb/3]).
-export([maybe_archive_modb/1]).
-export([maybe_archive_modb/1
,archive_modb/1
]).
-export([refresh_views/1]).

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -241,14 +243,19 @@ create_routines(AccountMODb) ->
,Routines
).

%%--------------------------------------------------------------------
%% @public
%% @doc
%%
%% @end
%%--------------------------------------------------------------------
-spec maybe_archive_modb(ne_binary()) -> 'ok'.
maybe_archive_modb(AccountMODb) ->
{Year, Month, _} = erlang:date(),

case should_archive(AccountMODb, Year, Month) of
'true' ->
lager:info("account modb ~s needs archiving", [AccountMODb]),
'ok' = archive_modb(AccountMODb),
'ok' = couch_util:archive(AccountMODb),
lager:info("account modb ~s archived, removing the db", [AccountMODb]),
Rm = couch_mgr:db_delete(AccountMODb),
lager:info("account modb ~s deleted: ~p", [AccountMODb, Rm]);
Expand All @@ -268,61 +275,4 @@ should_archive(AccountMODb, Year, Month) ->

-spec archive_modb(ne_binary()) -> 'ok'.
archive_modb(AccountMODb) ->
{'ok', DbInfo} = couch_mgr:db_info(AccountMODb),

MaxDocs = whapps_config:get_integer(?CONFIG_CAT, <<"max_concurrent_docs_to_archive">>, 500),
Filename = filename:join(["/tmp", <<AccountMODb/binary, ".archive.json">>]),
{'ok', File} = file:open(Filename, ['write']),

lager:debug("archiving to ~s", [Filename]),
archive_modb(AccountMODb, File, MaxDocs, wh_json:get_integer_value(<<"doc_count">>, DbInfo), 0),
file:close(File).

%% MaxDocs = The biggest set of docs to pull from Couch
%% N = The number of docs in the DB that haven't been archived
%% Pos = Which doc will the next query start from (the offset)
-spec archive_modb(ne_binary(), file:io_device(), pos_integer(), non_neg_integer(), non_neg_integer()) -> 'ok'.
archive_modb(AccountMODb, _File, _MaxDocs, 0, _Pos) ->
lager:debug("account modb ~s done with exportation", [AccountMODb]);
archive_modb(AccountMODb, File, MaxDocs, N, Pos) when N =< MaxDocs ->
lager:debug("fetching next ~b docs", [N]),
ViewOptions = [{'limit', N}
,{'skip', Pos}
,'include_docs'
],
case couch_mgr:all_docs(AccountMODb, ViewOptions) of
{'ok', []} -> lager:debug("no docs left after pos ~p, done", [Pos]);
{'ok', Docs} ->
'ok' = archive_modb_docs(File, Docs),
lager:debug("archived ~p docs, done here", [N]);
{'error', _E} ->
lager:debug("error ~p asking for ~p docs from pos ~p", [_E, N, Pos]),
timer:sleep(500),
archive_modb(AccountMODb, File, MaxDocs, N, Pos)
end;
archive_modb(AccountMODb, File, MaxDocs, N, Pos) ->
lager:debug("fetching next ~b docs", [MaxDocs]),
ViewOptions = [{'limit', MaxDocs}
,{'skip', Pos}
,'include_docs'
],
case couch_mgr:all_docs(AccountMODb, ViewOptions) of
{'ok', []} -> lager:debug("no docs left after pos ~p, done", [Pos]);
{'ok', Docs} ->
'ok' = archive_modb_docs(File, Docs),
lager:debug("archived ~p docs", [MaxDocs]),
archive_modb(AccountMODb, File, MaxDocs, N - MaxDocs, Pos + MaxDocs);
{'error', _E} ->
lager:debug("error ~p asking for ~p docs from pos ~p", [_E, N, Pos]),
timer:sleep(500),
archive_modb(AccountMODb, File, MaxDocs, N, Pos)
end.

-spec archive_modb_docs(file:io_device(), wh_json:objects()) -> 'ok'.
archive_modb_docs(File, Docs) ->
[archive_modb_doc(File, wh_json:get_value(<<"doc">>, D)) || D <- Docs],
'ok'.

-spec archive_modb_doc(file:io_device(), wh_json:object()) -> 'ok'.
archive_modb_doc(File, Doc) ->
'ok' = file:write(File, [wh_json:encode(Doc), $\n]).
'ok' = couch_util:archive(AccountMODb).
61 changes: 43 additions & 18 deletions core/kazoo_modb-1.0.0/src/kazoo_modb_maintenance.erl
Expand Up @@ -7,20 +7,24 @@
-module(kazoo_modb_maintenance).

-export([delete_modbs/1
,archive_modbs/0, archive_modbs/1
,archive_modbs/0
,archive_modbs/1
]).
-export([verify_rollups/0
,verify_rollups/1
]).
-export([fix_rollup/1]).
-export([rollup_accounts/0
,rollup_account/1
,rollup_account/2
]).

-include("kazoo_modb.hrl").

-spec delete_modbs(ne_binary()) -> 'ok' | 'no_return'.
-spec delete_modbs(ne_binary() | wh_year(), ne_binary() | wh_month()) -> 'ok' | 'no_return'.
delete_modbs(Period) ->
Regex = <<"(2[0-9]{3})(0[1-9]|1[0-2])">>,

case re:run(Period, Regex, [{'capture', 'all', 'binary'}]) of
{'match', [_Full, Year, Month]} ->
delete_modbs(Year, Month);
Expand Down Expand Up @@ -58,8 +62,8 @@ should_delete(AccountModb, Months) ->
((ModYear * 12) + ModMonth) =< Months.

-spec delete_modb(ne_binary()) -> 'ok'.
delete_modb(AccountModb) ->
io:format(" deleting '~s'~n", [AccountModb]),
delete_modb(<<_:42/binary, "-", _:4/binary, _:2/binary>> = AccountModb) ->
kazoo_modb:archive_modbs(AccountModb),
_Deleted = couch_mgr:db_delete(AccountModb),
io:format(" deleted: ~p~n", [_Deleted]),
timer:sleep(5000).
Expand All @@ -75,10 +79,9 @@ archive_modbs(AccountId) ->
do_archive_modbs(MODbs, AccountId) ->
wh_util:put_callid(?MODULE),
_ = [kazoo_modb:maybe_archive_modb(MODb) || MODb <- MODbs],

Keep = whapps_config:get_integer(?CONFIG_CAT, <<"active_modbs">>, 6),
From = case AccountId =:= 'undefined' of 'true' -> <<"all">>; 'false' -> AccountId end,
io:format("archived and removed ~s MODbs more than ~b months old~n", [From, Keep]),
io:format("archived ~s MODbs more than ~b months old~n", [From, Keep]),
'no_return'.

-spec verify_rollups() -> 'ok'.
Expand All @@ -89,7 +92,7 @@ verify_rollups() ->
Total = erlang:length(Accounts),
lists:foldr(
fun(AccountDb, Current) ->
io:format("\e[46mverify rollup accounts (~p/~p) '~s'\e[0m~n"
io:format("verify rollup accounts (~p/~p) '~s'~n"
,[Current, Total, AccountDb]),
verify_rollups(AccountDb),
Current+1
Expand All @@ -104,28 +107,25 @@ verify_rollups(Account) ->

verify_rollups(Account, Year, Month) ->
AccountId = wh_util:format_account_id(Account, 'raw'),
{PYear, PMonth} = kazoo_modb_util:prev_year_month(Year, Month),
Balance = wht_util:previous_balance(Account
,wh_util:to_binary(PYear)
,wh_util:pad_month(PMonth)),
io:format("account ~s trying... @ ~p-~p ~n"
,[AccountId, Year, Month]),
case kazoo_modb:open_doc(Account, <<"monthly_rollup">>, Year, Month) of
{'ok', JObj} ->
Balance = wht_util:previous_balance(Account
,wh_util:to_binary(Year)
,wh_util:pad_month(Month)),
RollupBalance = rollup_balance(JObj),
case Balance =:= RollupBalance of
'true' ->
io:format("account ~s \e[35m@ ~p-~p OK!\e[0m balance/rollup ~p/~p~n"
,[AccountId, Year, Month, Balance, RollupBalance]),
io:format(" account ~s rollup for ~p-~p confirmed~n", [AccountId, Year, Month]),
{PYear, PMonth} = kazoo_modb_util:prev_year_month(Year, Month),
verify_rollups(Account, PYear, PMonth);
'false' ->
io:format("\e[41maccount ~s has a discrepancy! rollup/balance ~p/~p \e[0m ~n"
io:format(" account ~s has a discrepancy! rollup/balance ~p/~p~n"
,[AccountId, RollupBalance, Balance])
end;
{'error', 'not_found'} ->
io:format("\e[33maccount ~s : no modb @ ~p-~p \e[0m ~n", [AccountId, Year, Month]);
io:format(" account ~s : no modb @ ~p-~p~n", [AccountId, Year, Month]);
Else ->
io:format("\e[41maccount ~s: error getting monthly rollup ~p \e[0m ~n", [AccountId, Else])
io:format(" account ~s: error getting monthly rollup ~p~n", [AccountId, Else])
end.

-spec fix_rollup(ne_binary()) -> 'ok'.
Expand All @@ -141,10 +141,35 @@ fix_rollup(Account) ->
lager:debug("rolling up ~p credits to ~s", [Balance, AccountMODb]),
wht_util:rollup(AccountMODb, Balance).

-spec rollup_accounts() -> any().
rollup_accounts() ->
Accounts = whapps_util:get_all_accounts(),
Total = length(Accounts),
lists:foldr(fun(AccountDb, Current) ->
io:format("rollup accounts (~p/~p) '~s'~n"
,[Current, Total, AccountDb]),
_ = rollup_account(AccountDb),
Current + 1
end, 1, Accounts).

-spec rollup_account(ne_binary()) -> 'ok'.
rollup_account(Account) ->
AccountId = wh_util:format_account_id(Account, 'raw'),
Balance = wht_util:get_balance_from_account(AccountId, []),
rollup_account(AccountId, Balance).

-spec rollup_account(ne_binary(), ne_binary()) -> 'ok'.
rollup_account(AccountId, Balance) ->
AccountMODb = kazoo_modb:get_modb(AccountId),
lager:debug("rolling up ~p credits to ~s"
,[Balance, AccountMODb]),
wht_util:rollup(AccountMODb, Balance).

-spec rollup_balance(wh_json:object()) -> integer().
rollup_balance(JObj) ->
Balance = wh_json:get_integer_value(<<"pvt_amount">>, JObj, 0),
case wh_json:get_value(<<"pvt_type">>, JObj) of
<<"credit">> -> Balance;
<<"debit">> -> Balance * -1
end.

74 changes: 74 additions & 0 deletions core/whistle_couch-1.0.0/src/couch_util.erl
Expand Up @@ -8,6 +8,10 @@
%%%-----------------------------------------------------------------------------
-module(couch_util).

-export([archive/1
,archive/2
]).

-export([get_new_connection/4
,get_db/2
,server_url/1
Expand Down Expand Up @@ -74,6 +78,76 @@

-type ddoc() :: ne_binary() | 'all_docs' | 'design_docs'.

%%------------------------------------------------------------------------------
%% @public
%% @doc
%%
%% @end
%%------------------------------------------------------------------------------
-spec archive(ne_binary()) -> 'ok'.
archive(Db) ->
Folder = whapps_config:get(?CONFIG_CAT, <<"default_archive_folder">>, <<"/tmp">>),
archive(Db, filename:join([<<Folder/binary, "/", Db/binary, ".json">>])).

-spec archive(ne_binary(), ne_binary()) -> 'ok'.
archive(Db, Filename) ->
{'ok', DbInfo} = couch_mgr:db_info(Db),
{'ok', File} = file:open(Filename, ['write']),
'ok' = file:write(File, <<"[">>),
io:format("archiving to ~s~n", [Filename]),
MaxDocs = whapps_config:get_integer(?CONFIG_CAT, <<"max_concurrent_docs_to_archive">>, 500),
archive(Db, File, MaxDocs, wh_json:get_integer_value(<<"doc_count">>, DbInfo), 0),
'ok' = file:write(File, <<"]">>),
file:close(File).

%% MaxDocs = The biggest set of docs to pull from Couch
%% N = The number of docs in the DB that haven't been archived
%% Pos = Which doc will the next query start from (the offset)
-spec archive(ne_binary(), file:io_device(), pos_integer(), non_neg_integer(), non_neg_integer()) -> 'ok'.
archive(Db, _File, _MaxDocs, 0, _Pos) ->
io:format(" archive ~s complete~n", [Db]);
archive(Db, File, MaxDocs, N, Pos) when N =< MaxDocs ->
ViewOptions = [{'limit', N}
,{'skip', Pos}
,'include_docs'
],
case couch_mgr:all_docs(Db, ViewOptions) of
{'ok', []} -> io:format(" no docs left after pos ~p~n", [Pos]);
{'ok', Docs} ->
'ok' = archive_docs(File, Docs),
io:format(" archived ~p docs~n", [N]);
{'error', _E} ->
io:format(" error ~p asking for ~p docs from pos ~p~n"
,[_E, N, Pos]),
timer:sleep(500),
archive(Db, File, MaxDocs, N, Pos)
end;
archive(Db, File, MaxDocs, N, Pos) ->
ViewOptions = [{'limit', MaxDocs}
,{'skip', Pos}
,'include_docs'
],
case couch_mgr:all_docs(Db, ViewOptions) of
{'ok', []} -> io:format(" no docs left after pos ~p~n", [Pos]);
{'ok', Docs} ->
'ok' = archive_docs(File, Docs),
io:format(" archived ~p docs~n", [MaxDocs]),
archive(Db, File, MaxDocs, N - MaxDocs, Pos + MaxDocs);
{'error', _E} ->
io:format(" error ~p asking for ~p docs from pos ~p~n"
,[_E, N, Pos]),
timer:sleep(500),
archive(Db, File, MaxDocs, N, Pos)
end.

-spec archive_docs(file:io_device(), wh_json:objects()) -> 'ok'.
archive_docs(_, []) -> 'ok';
archive_docs(File, [Doc]) ->
'ok' = file:write(File, [wh_json:encode(Doc), $\n]);
archive_docs(File, [Doc|Docs]) ->
'ok' = file:write(File, [wh_json:encode(Doc), $,, $\n]),
archive_docs(File, Docs).

%%------------------------------------------------------------------------------
%% @public
%% @doc How many documents are chunked when doing a bulk save
Expand Down
9 changes: 9 additions & 0 deletions core/whistle_couch-1.0.0/src/whistle_couch_maintenance.erl
Expand Up @@ -24,6 +24,9 @@

,test_connection/0
,test_admin_connection/0

,archive/1
,archive/2
]).

-export([change_api_url/2, change_api_url/3]).
Expand Down Expand Up @@ -64,6 +67,12 @@ test_connection() ->
test_admin_connection() ->
wh_couch_connections:test_admin_conn().

archive(Db) ->
couch_util:archive(Db).

archive(Db, Filename) ->
couch_util:archive(Db, Filename).

%%
%% Change app's urls in bulk
%%
Expand Down

0 comments on commit dc7fa5c

Please sign in to comment.