From dc7fa5c02b7de845612aa4e756937dc9bb8c9700 Mon Sep 17 00:00:00 2001 From: karl anderson Date: Thu, 26 Jun 2014 20:38:53 -0400 Subject: [PATCH] KAZOO-2617: make archive generic and tweaking the maintenance routines --- core/kazoo_modb-1.0.0/src/kazoo_modb.erl | 72 +++--------------- .../src/kazoo_modb_maintenance.erl | 61 ++++++++++----- core/whistle_couch-1.0.0/src/couch_util.erl | 74 +++++++++++++++++++ .../src/whistle_couch_maintenance.erl | 9 +++ 4 files changed, 137 insertions(+), 79 deletions(-) diff --git a/core/kazoo_modb-1.0.0/src/kazoo_modb.erl b/core/kazoo_modb-1.0.0/src/kazoo_modb.erl index 85d9fee4cce..e3b8d833967 100644 --- a/core/kazoo_modb-1.0.0/src/kazoo_modb.erl +++ b/core/kazoo_modb-1.0.0/src/kazoo_modb.erl @@ -14,7 +14,9 @@ -export([open_doc/2, open_doc/3, open_doc/4]). -export([save_doc/2, save_doc/3, save_doc/4]). -export([get_modb/1, get_modb/2, get_modb/3]). --export([maybe_archive_modb/1]). +-export([maybe_archive_modb/1 + ,archive_modb/1 + ]). -export([refresh_views/1]). %%-------------------------------------------------------------------- @@ -241,14 +243,19 @@ create_routines(AccountMODb) -> ,Routines ). +%%-------------------------------------------------------------------- +%% @public +%% @doc +%% +%% @end +%%-------------------------------------------------------------------- -spec maybe_archive_modb(ne_binary()) -> 'ok'. maybe_archive_modb(AccountMODb) -> {Year, Month, _} = erlang:date(), - case should_archive(AccountMODb, Year, Month) of 'true' -> lager:info("account modb ~s needs archiving", [AccountMODb]), - 'ok' = archive_modb(AccountMODb), + 'ok' = couch_util:archive(AccountMODb), lager:info("account modb ~s archived, removing the db", [AccountMODb]), Rm = couch_mgr:db_delete(AccountMODb), lager:info("account modb ~s deleted: ~p", [AccountMODb, Rm]); @@ -268,61 +275,4 @@ should_archive(AccountMODb, Year, Month) -> -spec archive_modb(ne_binary()) -> 'ok'. archive_modb(AccountMODb) -> - {'ok', DbInfo} = couch_mgr:db_info(AccountMODb), - - MaxDocs = whapps_config:get_integer(?CONFIG_CAT, <<"max_concurrent_docs_to_archive">>, 500), - Filename = filename:join(["/tmp", <>]), - {'ok', File} = file:open(Filename, ['write']), - - lager:debug("archiving to ~s", [Filename]), - archive_modb(AccountMODb, File, MaxDocs, wh_json:get_integer_value(<<"doc_count">>, DbInfo), 0), - file:close(File). - -%% MaxDocs = The biggest set of docs to pull from Couch -%% N = The number of docs in the DB that haven't been archived -%% Pos = Which doc will the next query start from (the offset) --spec archive_modb(ne_binary(), file:io_device(), pos_integer(), non_neg_integer(), non_neg_integer()) -> 'ok'. -archive_modb(AccountMODb, _File, _MaxDocs, 0, _Pos) -> - lager:debug("account modb ~s done with exportation", [AccountMODb]); -archive_modb(AccountMODb, File, MaxDocs, N, Pos) when N =< MaxDocs -> - lager:debug("fetching next ~b docs", [N]), - ViewOptions = [{'limit', N} - ,{'skip', Pos} - ,'include_docs' - ], - case couch_mgr:all_docs(AccountMODb, ViewOptions) of - {'ok', []} -> lager:debug("no docs left after pos ~p, done", [Pos]); - {'ok', Docs} -> - 'ok' = archive_modb_docs(File, Docs), - lager:debug("archived ~p docs, done here", [N]); - {'error', _E} -> - lager:debug("error ~p asking for ~p docs from pos ~p", [_E, N, Pos]), - timer:sleep(500), - archive_modb(AccountMODb, File, MaxDocs, N, Pos) - end; -archive_modb(AccountMODb, File, MaxDocs, N, Pos) -> - lager:debug("fetching next ~b docs", [MaxDocs]), - ViewOptions = [{'limit', MaxDocs} - ,{'skip', Pos} - ,'include_docs' - ], - case couch_mgr:all_docs(AccountMODb, ViewOptions) of - {'ok', []} -> lager:debug("no docs left after pos ~p, done", [Pos]); - {'ok', Docs} -> - 'ok' = archive_modb_docs(File, Docs), - lager:debug("archived ~p docs", [MaxDocs]), - archive_modb(AccountMODb, File, MaxDocs, N - MaxDocs, Pos + MaxDocs); - {'error', _E} -> - lager:debug("error ~p asking for ~p docs from pos ~p", [_E, N, Pos]), - timer:sleep(500), - archive_modb(AccountMODb, File, MaxDocs, N, Pos) - end. - --spec archive_modb_docs(file:io_device(), wh_json:objects()) -> 'ok'. -archive_modb_docs(File, Docs) -> - [archive_modb_doc(File, wh_json:get_value(<<"doc">>, D)) || D <- Docs], - 'ok'. - --spec archive_modb_doc(file:io_device(), wh_json:object()) -> 'ok'. -archive_modb_doc(File, Doc) -> - 'ok' = file:write(File, [wh_json:encode(Doc), $\n]). + 'ok' = couch_util:archive(AccountMODb). diff --git a/core/kazoo_modb-1.0.0/src/kazoo_modb_maintenance.erl b/core/kazoo_modb-1.0.0/src/kazoo_modb_maintenance.erl index 5bff12fa8f5..9a0ee9716f8 100644 --- a/core/kazoo_modb-1.0.0/src/kazoo_modb_maintenance.erl +++ b/core/kazoo_modb-1.0.0/src/kazoo_modb_maintenance.erl @@ -7,12 +7,17 @@ -module(kazoo_modb_maintenance). -export([delete_modbs/1 - ,archive_modbs/0, archive_modbs/1 + ,archive_modbs/0 + ,archive_modbs/1 ]). -export([verify_rollups/0 ,verify_rollups/1 ]). -export([fix_rollup/1]). +-export([rollup_accounts/0 + ,rollup_account/1 + ,rollup_account/2 + ]). -include("kazoo_modb.hrl"). @@ -20,7 +25,6 @@ -spec delete_modbs(ne_binary() | wh_year(), ne_binary() | wh_month()) -> 'ok' | 'no_return'. delete_modbs(Period) -> Regex = <<"(2[0-9]{3})(0[1-9]|1[0-2])">>, - case re:run(Period, Regex, [{'capture', 'all', 'binary'}]) of {'match', [_Full, Year, Month]} -> delete_modbs(Year, Month); @@ -58,8 +62,8 @@ should_delete(AccountModb, Months) -> ((ModYear * 12) + ModMonth) =< Months. -spec delete_modb(ne_binary()) -> 'ok'. -delete_modb(AccountModb) -> - io:format(" deleting '~s'~n", [AccountModb]), +delete_modb(<<_:42/binary, "-", _:4/binary, _:2/binary>> = AccountModb) -> + kazoo_modb:archive_modbs(AccountModb), _Deleted = couch_mgr:db_delete(AccountModb), io:format(" deleted: ~p~n", [_Deleted]), timer:sleep(5000). @@ -75,10 +79,9 @@ archive_modbs(AccountId) -> do_archive_modbs(MODbs, AccountId) -> wh_util:put_callid(?MODULE), _ = [kazoo_modb:maybe_archive_modb(MODb) || MODb <- MODbs], - Keep = whapps_config:get_integer(?CONFIG_CAT, <<"active_modbs">>, 6), From = case AccountId =:= 'undefined' of 'true' -> <<"all">>; 'false' -> AccountId end, - io:format("archived and removed ~s MODbs more than ~b months old~n", [From, Keep]), + io:format("archived ~s MODbs more than ~b months old~n", [From, Keep]), 'no_return'. -spec verify_rollups() -> 'ok'. @@ -89,7 +92,7 @@ verify_rollups() -> Total = erlang:length(Accounts), lists:foldr( fun(AccountDb, Current) -> - io:format("\e[46mverify rollup accounts (~p/~p) '~s'\e[0m~n" + io:format("verify rollup accounts (~p/~p) '~s'~n" ,[Current, Total, AccountDb]), verify_rollups(AccountDb), Current+1 @@ -104,28 +107,25 @@ verify_rollups(Account) -> verify_rollups(Account, Year, Month) -> AccountId = wh_util:format_account_id(Account, 'raw'), - {PYear, PMonth} = kazoo_modb_util:prev_year_month(Year, Month), - Balance = wht_util:previous_balance(Account - ,wh_util:to_binary(PYear) - ,wh_util:pad_month(PMonth)), - io:format("account ~s trying... @ ~p-~p ~n" - ,[AccountId, Year, Month]), case kazoo_modb:open_doc(Account, <<"monthly_rollup">>, Year, Month) of {'ok', JObj} -> + Balance = wht_util:previous_balance(Account + ,wh_util:to_binary(Year) + ,wh_util:pad_month(Month)), RollupBalance = rollup_balance(JObj), case Balance =:= RollupBalance of 'true' -> - io:format("account ~s \e[35m@ ~p-~p OK!\e[0m balance/rollup ~p/~p~n" - ,[AccountId, Year, Month, Balance, RollupBalance]), + io:format(" account ~s rollup for ~p-~p confirmed~n", [AccountId, Year, Month]), + {PYear, PMonth} = kazoo_modb_util:prev_year_month(Year, Month), verify_rollups(Account, PYear, PMonth); 'false' -> - io:format("\e[41maccount ~s has a discrepancy! rollup/balance ~p/~p \e[0m ~n" + io:format(" account ~s has a discrepancy! rollup/balance ~p/~p~n" ,[AccountId, RollupBalance, Balance]) end; {'error', 'not_found'} -> - io:format("\e[33maccount ~s : no modb @ ~p-~p \e[0m ~n", [AccountId, Year, Month]); + io:format(" account ~s : no modb @ ~p-~p~n", [AccountId, Year, Month]); Else -> - io:format("\e[41maccount ~s: error getting monthly rollup ~p \e[0m ~n", [AccountId, Else]) + io:format(" account ~s: error getting monthly rollup ~p~n", [AccountId, Else]) end. -spec fix_rollup(ne_binary()) -> 'ok'. @@ -141,6 +141,30 @@ fix_rollup(Account) -> lager:debug("rolling up ~p credits to ~s", [Balance, AccountMODb]), wht_util:rollup(AccountMODb, Balance). +-spec rollup_accounts() -> any(). +rollup_accounts() -> + Accounts = whapps_util:get_all_accounts(), + Total = length(Accounts), + lists:foldr(fun(AccountDb, Current) -> + io:format("rollup accounts (~p/~p) '~s'~n" + ,[Current, Total, AccountDb]), + _ = rollup_account(AccountDb), + Current + 1 + end, 1, Accounts). + +-spec rollup_account(ne_binary()) -> 'ok'. +rollup_account(Account) -> + AccountId = wh_util:format_account_id(Account, 'raw'), + Balance = wht_util:get_balance_from_account(AccountId, []), + rollup_account(AccountId, Balance). + +-spec rollup_account(ne_binary(), ne_binary()) -> 'ok'. +rollup_account(AccountId, Balance) -> + AccountMODb = kazoo_modb:get_modb(AccountId), + lager:debug("rolling up ~p credits to ~s" + ,[Balance, AccountMODb]), + wht_util:rollup(AccountMODb, Balance). + -spec rollup_balance(wh_json:object()) -> integer(). rollup_balance(JObj) -> Balance = wh_json:get_integer_value(<<"pvt_amount">>, JObj, 0), @@ -148,3 +172,4 @@ rollup_balance(JObj) -> <<"credit">> -> Balance; <<"debit">> -> Balance * -1 end. + diff --git a/core/whistle_couch-1.0.0/src/couch_util.erl b/core/whistle_couch-1.0.0/src/couch_util.erl index bcfef3f4052..14e0b4c881c 100644 --- a/core/whistle_couch-1.0.0/src/couch_util.erl +++ b/core/whistle_couch-1.0.0/src/couch_util.erl @@ -8,6 +8,10 @@ %%%----------------------------------------------------------------------------- -module(couch_util). +-export([archive/1 + ,archive/2 + ]). + -export([get_new_connection/4 ,get_db/2 ,server_url/1 @@ -74,6 +78,76 @@ -type ddoc() :: ne_binary() | 'all_docs' | 'design_docs'. +%%------------------------------------------------------------------------------ +%% @public +%% @doc +%% +%% @end +%%------------------------------------------------------------------------------ +-spec archive(ne_binary()) -> 'ok'. +archive(Db) -> + Folder = whapps_config:get(?CONFIG_CAT, <<"default_archive_folder">>, <<"/tmp">>), + archive(Db, filename:join([<>])). + +-spec archive(ne_binary(), ne_binary()) -> 'ok'. +archive(Db, Filename) -> + {'ok', DbInfo} = couch_mgr:db_info(Db), + {'ok', File} = file:open(Filename, ['write']), + 'ok' = file:write(File, <<"[">>), + io:format("archiving to ~s~n", [Filename]), + MaxDocs = whapps_config:get_integer(?CONFIG_CAT, <<"max_concurrent_docs_to_archive">>, 500), + archive(Db, File, MaxDocs, wh_json:get_integer_value(<<"doc_count">>, DbInfo), 0), + 'ok' = file:write(File, <<"]">>), + file:close(File). + +%% MaxDocs = The biggest set of docs to pull from Couch +%% N = The number of docs in the DB that haven't been archived +%% Pos = Which doc will the next query start from (the offset) +-spec archive(ne_binary(), file:io_device(), pos_integer(), non_neg_integer(), non_neg_integer()) -> 'ok'. +archive(Db, _File, _MaxDocs, 0, _Pos) -> + io:format(" archive ~s complete~n", [Db]); +archive(Db, File, MaxDocs, N, Pos) when N =< MaxDocs -> + ViewOptions = [{'limit', N} + ,{'skip', Pos} + ,'include_docs' + ], + case couch_mgr:all_docs(Db, ViewOptions) of + {'ok', []} -> io:format(" no docs left after pos ~p~n", [Pos]); + {'ok', Docs} -> + 'ok' = archive_docs(File, Docs), + io:format(" archived ~p docs~n", [N]); + {'error', _E} -> + io:format(" error ~p asking for ~p docs from pos ~p~n" + ,[_E, N, Pos]), + timer:sleep(500), + archive(Db, File, MaxDocs, N, Pos) + end; +archive(Db, File, MaxDocs, N, Pos) -> + ViewOptions = [{'limit', MaxDocs} + ,{'skip', Pos} + ,'include_docs' + ], + case couch_mgr:all_docs(Db, ViewOptions) of + {'ok', []} -> io:format(" no docs left after pos ~p~n", [Pos]); + {'ok', Docs} -> + 'ok' = archive_docs(File, Docs), + io:format(" archived ~p docs~n", [MaxDocs]), + archive(Db, File, MaxDocs, N - MaxDocs, Pos + MaxDocs); + {'error', _E} -> + io:format(" error ~p asking for ~p docs from pos ~p~n" + ,[_E, N, Pos]), + timer:sleep(500), + archive(Db, File, MaxDocs, N, Pos) + end. + +-spec archive_docs(file:io_device(), wh_json:objects()) -> 'ok'. +archive_docs(_, []) -> 'ok'; +archive_docs(File, [Doc]) -> + 'ok' = file:write(File, [wh_json:encode(Doc), $\n]); +archive_docs(File, [Doc|Docs]) -> + 'ok' = file:write(File, [wh_json:encode(Doc), $,, $\n]), + archive_docs(File, Docs). + %%------------------------------------------------------------------------------ %% @public %% @doc How many documents are chunked when doing a bulk save diff --git a/core/whistle_couch-1.0.0/src/whistle_couch_maintenance.erl b/core/whistle_couch-1.0.0/src/whistle_couch_maintenance.erl index 305b9ba22f1..835d17473e4 100644 --- a/core/whistle_couch-1.0.0/src/whistle_couch_maintenance.erl +++ b/core/whistle_couch-1.0.0/src/whistle_couch_maintenance.erl @@ -24,6 +24,9 @@ ,test_connection/0 ,test_admin_connection/0 + + ,archive/1 + ,archive/2 ]). -export([change_api_url/2, change_api_url/3]). @@ -64,6 +67,12 @@ test_connection() -> test_admin_connection() -> wh_couch_connections:test_admin_conn(). +archive(Db) -> + couch_util:archive(Db). + +archive(Db, Filename) -> + couch_util:archive(Db, Filename). + %% %% Change app's urls in bulk %%