diff --git a/ecallmgr/src/ecallmgr_amqp_pool_worker.erl b/ecallmgr/src/ecallmgr_amqp_pool_worker.erl index baff362a76b..68fd6cf6e06 100644 --- a/ecallmgr/src/ecallmgr_amqp_pool_worker.erl +++ b/ecallmgr/src/ecallmgr_amqp_pool_worker.erl @@ -164,6 +164,21 @@ handle_info({'DOWN', Ref, process, Pid, _Info}, #state{status=busy, ref=Ref, par ecallmgr_amqp_pool:worker_free(Parent, self(), 0), {noreply, #state{}}; +handle_info({timeout, ReqRef, req_timeout}, #state{status=busy, from=From, parent=Parent, ref=Ref + ,start=Start, req_ref=ReqRef + }) -> + ?LOG("request took too long, timing out caller"), + Elapsed = timer:now_diff(erlang:now(), Start), + ?LOG("received response after ~b ms, returning to pool ~p", [Elapsed div 1000, Parent]), + + erlang:demonitor(Ref, [flush]), + erlang:cancel_timer(ReqRef), + + gen_server:reply(From, {error, timeout}), + + ecallmgr_amqp_pool:worker_free(Parent, self(), Elapsed), + {noreply, #state{}}; + handle_info(req_timeout, #state{status=busy, from=From, parent=Parent, ref=Ref ,start=Start, req_ref=ReqRef }) -> diff --git a/lib/whistle-1.0.0/src/gen_listener.erl b/lib/whistle-1.0.0/src/gen_listener.erl index bb045de6730..2d8c7cd18fc 100644 --- a/lib/whistle-1.0.0/src/gen_listener.erl +++ b/lib/whistle-1.0.0/src/gen_listener.erl @@ -334,7 +334,7 @@ handle_info({amqp_host_down, _H}=Down, #state{bindings=Bindings, params=Params}= {ok, Q} -> Self = self(), _ = erlang:send_after(?TIMEOUT_RETRY_CONN, Self, is_consuming), - spawn(fun() -> [ add_binding(Self, Type, BindProps) || {Type, BindProps} <- Bindings ] end), + proc_lib:spawn(fun() -> [ add_binding(Self, Type, BindProps) || {Type, BindProps} <- Bindings ] end), {noreply, State#state{queue=Q, is_consuming=false}, hibernate}; {error, _} -> ?LOG("failed to start amqp, waiting another second"), @@ -411,17 +411,17 @@ process_req(#state{queue=Queue, responders=Responders, module=Module, module_sta case Props1 of ignore -> ignore; _Else -> - spawn_link(fun() -> _ = wh_util:put_callid(JObj), process_req(Props1, Responders, JObj) end) + proc_lib:spawn_link(fun() -> _ = wh_util:put_callid(JObj), process_req(Props1, Responders, JObj) end) end. -spec process_req/3 :: (wh_proplist(), responders(), wh_json:json_object()) -> 'ok'. process_req(Props, Responders, JObj) -> Key = wh_util:get_event_type(JObj), - Handlers = [spawn_monitor(fun() -> - _ = wh_util:put_callid(JObj), - Responder:Fun(JObj, Props) - end) + Handlers = [proc_lib:spawn_monitor(fun() -> + _ = wh_util:put_callid(JObj), + Responder:Fun(JObj, Props) + end) || {Evt, {Responder, Fun}} <- Responders, maybe_event_matches_key(Key, Evt) ], @@ -464,7 +464,7 @@ start_amqp(Props) -> stop_amqp(<<>>, _) -> ok; stop_amqp(Q, Bindings) -> Self = self(), - spawn(fun() -> [ gen_listener:rm_binding(Self, Type, Prop) || {Type, Prop} <- Bindings] end), + proc_lib:spawn(fun() -> [ gen_listener:rm_binding(Self, Type, Prop) || {Type, Prop} <- Bindings] end), amqp_util:queue_delete(Q). -spec set_qos/1 :: ('undefined' | non_neg_integer()) -> 'ok'. diff --git a/lib/whistle_couch-1.0.0/src/couch_compactor.erl b/lib/whistle_couch-1.0.0/src/couch_compactor.erl index f6319dd736b..599f81d9765 100644 --- a/lib/whistle_couch-1.0.0/src/couch_compactor.erl +++ b/lib/whistle_couch-1.0.0/src/couch_compactor.erl @@ -25,7 +25,7 @@ start_link() -> init(Parent) -> case {couch_config:fetch(compact_automatically), couch_config:fetch(conflict_strategy)} of - {true, undefined} -> + {true, null} -> ?LOG_SYS("just compacting"), proc_lib:init_ack(Parent, {ok, self()}), compact_all(); @@ -36,7 +36,7 @@ init(Parent) -> {false, _Strategy} -> ?LOG_SYS("auto-compaction not enabled"), proc_lib:init_ack(Parent, ignore); - {undefined, _Strategy} -> + {null, _Strategy} -> ?LOG_SYS("auto-compaction not enabled"), proc_lib:init_ack(Parent, ignore), couch_config:store(compact_automatically, false) diff --git a/lib/whistle_couch-1.0.0/src/couch_config.erl b/lib/whistle_couch-1.0.0/src/couch_config.erl index 3582f1fc515..77bca8fcf10 100644 --- a/lib/whistle_couch-1.0.0/src/couch_config.erl +++ b/lib/whistle_couch-1.0.0/src/couch_config.erl @@ -14,7 +14,7 @@ %%%------------------------------------------------------------------- -module(couch_config). --export([start_link/0]). +-export([start_link/0, ready/0]). -export([load_config/1, write_config/1]). -export([fetch/1, fetch/2]). -export([store/2, store/3]). @@ -33,11 +33,11 @@ start_link() -> load_config(Path) -> ?LOG("loading ~s", [Path]), case file:consult(Path) of - {ok, Startup} -> - _ = [cache_from_file(T) || T <- Startup], - ok; - {error, enoent}=E -> - E + {ok, Startup} -> + _ = [cache_from_file(T) || T <- Startup], + ok; + {error, enoent}=E -> + E end. %% convert 3..n-tuples to 2 tuples with the value being (3..n)-1 tuples @@ -53,9 +53,12 @@ cache_from_file(T) when is_tuple(T) -> -spec write_config/1 :: (file:name()) -> 'ok' | {'error', file:posix() | 'badarg' | 'terminated' | 'system_limit'}. write_config(Path) -> Contents = lists:foldl(fun(I, Acc) -> [io_lib:format("~p.~n", [I]) | Acc] end - , "", whapps_config:get_all_kvs(?CONFIG_CAT)), + , "", whapps_config:get_all_kvs(?CONFIG_CAT)), file:write_file(Path, Contents). +ready() -> + whapps_config:couch_ready(). + fetch(Key) -> fetch(Key, undefined). @@ -66,8 +69,8 @@ fetch(Key, Default) -> fetch(Key, Default, Cache) -> case wh_cache:fetch_local(Cache, {?MODULE, Key}) of - {error, not_found} -> Default; - {ok, V} -> V + {error, not_found} -> Default; + {ok, V} -> V end. -spec store/2 :: (term(), term()) -> 'ok'. diff --git a/lib/whistle_couch-1.0.0/src/couch_mgr.erl b/lib/whistle_couch-1.0.0/src/couch_mgr.erl index 6c8eac221f8..a610044ef20 100755 --- a/lib/whistle_couch-1.0.0/src/couch_mgr.erl +++ b/lib/whistle_couch-1.0.0/src/couch_mgr.erl @@ -390,8 +390,8 @@ ensure_saved(DbName, Doc, Options) -> save_doc(DbName, Doc, Opts) -> couch_util:save_doc(get_conn(), DbName, Doc, Opts). --spec save_docs/2 :: (ne_binary(), wh_json:json_objects()) -> {'ok', wh_json:json_objects()} | {'error', atom()}. --spec save_docs/3 :: (ne_binary(), wh_json:json_objects(), proplist()) -> {'ok', wh_json:json_objects()} | {'error', atom()}. +-spec save_docs/2 :: (ne_binary(), wh_json:json_objects()) -> {'ok', wh_json:json_objects()}. +-spec save_docs/3 :: (ne_binary(), wh_json:json_objects(), proplist()) -> {'ok', wh_json:json_objects()}. save_docs(DbName, Docs) when is_list(Docs) -> save_docs(DbName, Docs, []). save_docs(DbName, Docs, Opts) when is_list(Docs) -> @@ -513,7 +513,14 @@ get_creds() -> gen_server:call(?SERVER, get_creds). get_conn() -> - gen_server:call(?SERVER, get_conn). + case whereis(?SERVER) of + Srv when is_pid(Srv) -> + gen_server:call(?SERVER, get_conn); + _E -> + ?LOG("no server by the name of ~s", [?SERVER]), + ST = erlang:get_stacktrace(), + ?LOG_STACKTRACE(ST) + end. get_admin_conn() -> gen_server:call(?SERVER, get_admin_conn). @@ -749,6 +756,8 @@ init_state() -> init_state_from_config(undefined) -> init_state_from_config({"localhost", ?DEFAULT_PORT, "", "", ?DEFAULT_ADMIN_PORT}); +init_state_from_config(null) -> + init_state_from_config({"localhost", ?DEFAULT_PORT, "", "", ?DEFAULT_ADMIN_PORT}); init_state_from_config(H) when not is_tuple(H) -> init_state_from_config({H, ?DEFAULT_PORT, "", "", ?DEFAULT_ADMIN_PORT}); init_state_from_config({H, Port}) -> @@ -762,6 +771,7 @@ init_state_from_config({H, Port, User, Pass, AdminPort}) -> AdminConn = couch_util:get_new_connection(H, wh_util:to_integer(AdminPort), User, Pass), ?LOG_SYS("returning state record"), + couch_config:ready(), #state{connection=Conn ,admin_connection=AdminConn ,host={H, wh_util:to_integer(Port), wh_util:to_integer(AdminPort)} diff --git a/whistle_apps/apps/jonny5/src/dth_blacklist.erl b/whistle_apps/apps/jonny5/src/dth_blacklist.erl deleted file mode 100644 index 52a2afa7f09..00000000000 --- a/whistle_apps/apps/jonny5/src/dth_blacklist.erl +++ /dev/null @@ -1,192 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author James Aimonetti -%%% @copyright (C) 2011, VoIP INC -%%% @doc -%%% Query DTH whapps for their blacklist -%%% @end -%%% Created : 30 Aug 2011 by James Aimonetti -%%%------------------------------------------------------------------- --module(dth_blacklist). - --behaviour(gen_listener). - -%% API --export([start_link/0, is_blacklisted/2, handle_req/2, stop/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, handle_event/2 - ,terminate/2, code_change/3]). - --include("jonny5.hrl"). --include_lib("dth/include/dth_amqp.hrl"). - --define(RESPONDERS, [ - {?MODULE, [{<<"dth">>, <<"blacklist_resp">>}]} - ]). --define(BINDINGS, [ - {self, []} - ]). - --define(SERVER, ?MODULE). --define(BLACKLIST_UPDATE_TIMER, 5000). - --record(state, { - blacklist = dict:new() :: dict() %% {AccountID, Reason} - }). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @doc -%% Starts the server -%% -%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} -%% @end -%%-------------------------------------------------------------------- -start_link() -> - gen_listener:start_link(?MODULE, [{responders, ?RESPONDERS} - ,{bindings, ?BINDINGS} - ], []). - -stop(Srv) -> - gen_listener:stop(Srv). - --spec is_blacklisted/2 :: (Srv, AccountID) -> 'false' | 'true' | {'true', binary()} when - Srv :: pid(), - AccountID :: binary(). -is_blacklisted(Srv, AccountID) -> - gen_listener:call(Srv, {is_blacklisted, AccountID}). - --spec handle_req/2 :: (JObj, Props) -> 'ok' when - JObj :: wh_json:json_object(), - Props :: proplist(). -handle_req(JObj, Props) -> - true = dth_api:blacklist_resp_v(JObj), - Srv = props:get_value(server, Props), - ?LOG_SYS("Sending blacklist to ~p", [Srv]), - Accounts = wh_json:get_value(<<"Accounts">>, JObj, []), - gen_listener:cast(Srv, {blacklist, Accounts}). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Initializes the server -%% -%% @spec init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% @end -%%-------------------------------------------------------------------- -init([]) -> - ?LOG_SYS("DTH blacklist server started"), - true = is_reference(erlang:send_after(?BLACKLIST_UPDATE_TIMER, self(), update_blacklist)), - {ok, #state{}}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling call messages -%% -%% @spec handle_call(Request, From, State) -> -%% {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_call({is_blacklisted, AccountID}, _From, #state{blacklist=BL}=State) -> - try - Reason = dict:fetch(AccountID, BL), - {reply, {true, Reason}, State} - catch - _:_ -> - {reply, false, State} - end. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling cast messages -%% -%% @spec handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_cast({blacklist, {struct, Accounts}}, State) -> - ?LOG_SYS("Updating blacklist with ~p", [Accounts]), - {noreply, State#state{blacklist=dict:from_list(Accounts)}}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling all non call/cast messages -%% -%% @spec handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_info(update_blacklist, State) -> - true = is_reference(erlang:send_after(?BLACKLIST_UPDATE_TIMER, self(), update_blacklist)), - Self = self(), - spawn(fun() -> request_blacklist(Self) end), - {noreply, State}; -handle_info(_Info, State) -> - ?LOG_SYS("Unhandled message: ~p", [_Info]), - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling AMQP event objects -%% -%% @spec handle_event(JObj, State) -> {reply, Props} -%% @end -%%-------------------------------------------------------------------- -handle_event(_, _) -> - {reply, [{server, self()}]}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. -%% -%% @spec terminate(Reason, State) -> void() -%% @end -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> - ok. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Convert process state when code is changed -%% -%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} -%% @end -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== -request_blacklist(Srv) -> - Queue = gen_listener:queue_name(Srv), - Prop = wh_api:default_headers(Queue, <<"dth">>, <<"blacklist_req">>, ?APP_NAME, ?APP_VERSION), - {ok, JSON} = dth_api:blacklist_req(Prop), - ?LOG_SYS("Sending request for blacklist: ~s", [JSON]), - amqp_util:callmgr_publish(JSON, <<"application/json">>, ?KEY_DTH_BLACKLIST_REQ). diff --git a/whistle_apps/apps/jonny5/src/j5_acctmgr.erl b/whistle_apps/apps/jonny5/src/j5_acctmgr.erl index 8291e0dfe62..18470a59f1c 100644 --- a/whistle_apps/apps/jonny5/src/j5_acctmgr.erl +++ b/whistle_apps/apps/jonny5/src/j5_acctmgr.erl @@ -14,11 +14,11 @@ -export([start_link/1, authz_trunk/3, known_calls/1, status/1, refresh/1]). -export([handle_call_event/2, handle_conf_change/2 - ,handle_authz_win/2, handle_money_msg/2]). + ,handle_authz_win/2, handle_money_msg/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, handle_event/2 - ,terminate/2, code_change/3]). + ,terminate/2, code_change/3]). -include("jonny5.hrl"). @@ -26,19 +26,18 @@ -define(SYNC_TIMER, 60000). -record(state, { - acct_id = <<>> :: binary() + acct_id = <<>> :: binary() ,acct_rev = <<>> :: binary() - ,acct_type = 'account' :: 'account' | 'ts' - ,max_two_way = 0 :: non_neg_integer() + ,max_two_way = 0 :: non_neg_integer() ,max_inbound = 0 :: non_neg_integer() - ,two_way = 0 :: non_neg_integer() + ,two_way = 0 :: non_neg_integer() ,inbound = 0 :: non_neg_integer() ,prepay = 0 :: non_neg_integer() %% in UNITS, not dollars ,trunks_in_use = dict:new() :: dict() %% {CallID, {Type :: inbound | twoway | prepay, CallMonitor :: pid()}} - ,start_time = 1 :: pos_integer() + ,start_time = 1 :: pos_integer() ,sync_ref :: reference() - ,ledger_db = <<>> :: binary() %% where to write credits/debits - }). + ,ledger_db = <<>> :: binary() %% where to write credits/debits + }). %%%=================================================================== %%% API @@ -55,22 +54,22 @@ start_link(AcctID) -> %% why are we receiving messages for account IDs we don't bind to? gen_listener:start_link(?MODULE, [{bindings, [{self, []} - ,{money, [{account_id, AcctID}]} - ,{conf, [{doc_id, AcctID}, {doc_type, <<"sip_service">>}]} - ]} - ,{responders, [{ {?MODULE, handle_call_event}, [{<<"call_event">>, <<"*">>} % call events - ,{<<"call_detail">>, <<"*">>} % and CDR - ] - } - ,{ {?MODULE, handle_money_msg}, [{<<"transaction">>, <<"credit">>} - ,{<<"transaction">>, <<"debit">>} - ,{<<"transaction">>, <<"balance_req">>} - ] - } - ,{ {?MODULE, handle_conf_change}, [{<<"configuration">>, <<"*">>}]} - ,{ {?MODULE, handle_authz_win}, [{<<"dialplan">>, <<"authz_win">>}] } % won the authz - ]} - ], [AcctID]). + ,{money, [{account_id, AcctID}]} + ,{conf, [{doc_id, AcctID}, {doc_type, <<"sip_service">>}]} + ]} + ,{responders, [{ {?MODULE, handle_call_event}, [{<<"call_event">>, <<"*">>} % call events + ,{<<"call_detail">>, <<"*">>} % and CDR + ] + } + ,{ {?MODULE, handle_money_msg}, [{<<"transaction">>, <<"credit">>} + ,{<<"transaction">>, <<"debit">>} + ,{<<"transaction">>, <<"balance_req">>} + ] + } + ,{ {?MODULE, handle_conf_change}, [{<<"configuration">>, <<"*">>}]} + ,{ {?MODULE, handle_authz_win}, [{<<"dialplan">>, <<"authz_win">>}] } % won the authz + ]} + ], [AcctID]). -spec status/1 :: (pid()) -> wh_json:json_object(). status(Srv) -> @@ -87,36 +86,36 @@ authz_trunk(Pid, JObj, CallDir) when is_pid(Pid) -> authz_trunk(AcctID, JObj, CallDir) -> case j5_util:fetch_account_handler(AcctID) of - {ok, AcctPID} -> - case erlang:is_process_alive(AcctPID) of - true -> - ?LOG_SYS("Account(~s) AuthZ proc ~p found", [AcctID, AcctPID]), - j5_acctmgr:authz_trunk(AcctPID, JObj, CallDir); - false -> - ?LOG_SYS("Account(~s) AuthZ proc ~p not alive", [AcctID, AcctPID]), - {ok, AcctPID} = jonny5_acct_sup:start_proc(AcctID), - j5_acctmgr:authz_trunk(AcctPID, JObj, CallDir) - end; - {error, not_found} -> - ?LOG_SYS("No AuthZ proc for account ~s, starting", [AcctID]), - try - {ok, AcctPID} = jonny5_acct_sup:start_proc(AcctID), - j5_acctmgr:authz_trunk(AcctPID, JObj, CallDir) - catch - E:R -> - ST = erlang:get_stacktrace(), - ?LOG_SYS("Error: ~p: ~p", [E, R]), - _ = [ ?LOG_SYS("Stacktrace: ~p", [ST1]) || ST1 <- ST], - {false, []} - end + {ok, AcctPID} -> + case erlang:is_process_alive(AcctPID) of + true -> + ?LOG_SYS("Account(~s) AuthZ proc ~p found", [AcctID, AcctPID]), + j5_acctmgr:authz_trunk(AcctPID, JObj, CallDir); + false -> + ?LOG_SYS("Account(~s) AuthZ proc ~p not alive", [AcctID, AcctPID]), + {ok, AcctPID} = jonny5_acct_sup:start_proc(AcctID), + j5_acctmgr:authz_trunk(AcctPID, JObj, CallDir) + end; + {error, not_found} -> + ?LOG_SYS("No AuthZ proc for account ~s, starting", [AcctID]), + try + {ok, AcctPID} = jonny5_acct_sup:start_proc(AcctID), + j5_acctmgr:authz_trunk(AcctPID, JObj, CallDir) + catch + E:R -> + ST = erlang:get_stacktrace(), + ?LOG_SYS("Error: ~p: ~p", [E, R]), + _ = [ ?LOG_SYS("Stacktrace: ~p", [ST1]) || ST1 <- ST], + {false, []} + end end. known_calls(Pid) when is_pid(Pid) -> gen_server:call(Pid, known_calls); known_calls(AcctID) when is_binary(AcctID) -> case j5_util:fetch_account_handler(AcctID) of - {error, _}=E -> E; - {ok, AcctPid} when is_pid(AcctPid) -> known_calls(AcctPid) + {error, _}=E -> E; + {ok, AcctPid} when is_pid(AcctPid) -> known_calls(AcctPid) end. handle_call_event(JObj, Props) -> @@ -157,55 +156,18 @@ init([AcctID]) -> StartTime = wh_util:current_tstamp(), put(callid, AcctID), - case get_trunks_available(AcctID, account) of - {undefined, undefined, _, account} -> - {TwoWay, Inbound, Prepay, ts} = get_trunks_available(AcctID, ts), - ?LOG_SYS("Init for ts ~s complete", [AcctID]), - - {ok, Rev} = couch_mgr:lookup_doc_rev(<<"ts">>, AcctID), - - LedgerDB = wh_util:format_account_id(AcctID, encoded), - couch_mgr:db_create(LedgerDB), - - {ok, #state{prepay=try_update_value(Prepay, 0) - ,two_way=try_update_value(TwoWay, 0) - ,inbound=try_update_value(Inbound, 0) - ,max_two_way=try_update_value(TwoWay, 0) - ,max_inbound=try_update_value(Inbound, 0) - ,acct_rev=Rev, acct_id=AcctID, acct_type=ts - ,start_time=StartTime, sync_ref=SyncRef - ,ledger_db=LedgerDB - }}; - {TwoWay, Inbound, Prepay, account} -> - ?LOG_SYS("Init for account ~s complete", [AcctID]), - - {ok, #state{prepay=try_update_value(Prepay, 0) - ,two_way=try_update_value(TwoWay, 0) - ,inbound=try_update_value(Inbound, 0) - ,max_two_way=try_update_value(TwoWay, 0) - ,max_inbound=try_update_value(Inbound, 0) - ,acct_id=AcctID, acct_type=account - ,start_time=StartTime, sync_ref=SyncRef - ,ledger_db=wh_util:format_account_id(AcctID, encoded) - }}; - {TwoWay, Inbound, Prepay, ts} -> - ?LOG_SYS("Init for ts ~s complete", [AcctID]), - - {ok, Rev} = couch_mgr:lookup_doc_rev(<<"ts">>, AcctID), - - LedgerDB = wh_util:format_account_id(AcctID, encoded), - couch_mgr:db_create(LedgerDB), - - {ok, #state{prepay=try_update_value(Prepay, 0) - ,two_way=try_update_value(TwoWay, 0) - ,inbound=try_update_value(Inbound, 0) - ,max_two_way=try_update_value(TwoWay, 0) - ,max_inbound=try_update_value(Inbound, 0) - ,acct_rev=Rev, acct_id=AcctID, acct_type=ts - ,start_time=StartTime, sync_ref=SyncRef - ,ledger_db=LedgerDB - }} - end. + {TwoWay, Inbound, Prepay} = get_trunks_available(AcctID), + ?LOG_SYS("Init for account ~s complete", [AcctID]), + + {ok, #state{prepay=try_update_value(Prepay, 0) + ,two_way=try_update_value(TwoWay, 0) + ,inbound=try_update_value(Inbound, 0) + ,max_two_way=try_update_value(TwoWay, 0) + ,max_inbound=try_update_value(Inbound, 0) + ,acct_id=AcctID + ,start_time=StartTime, sync_ref=SyncRef + ,ledger_db=wh_util:format_account_id(AcctID, encoded) + }}. %%-------------------------------------------------------------------- %% @private @@ -222,16 +184,16 @@ init([AcctID]) -> %% @end %%-------------------------------------------------------------------- handle_call(status, _, #state{max_two_way=MaxTwo, max_inbound=MaxIn - ,two_way=Two, inbound=In, trunks_in_use=Dict - ,prepay=Prepay, acct_id=Acct}=State) -> + ,two_way=Two, inbound=In, trunks_in_use=Dict + ,prepay=Prepay, acct_id=Acct}=State) -> {reply, wh_json:from_list([{<<"max_two_way">>, MaxTwo} - ,{<<"max_inbound">>, MaxIn} - ,{<<"two_way">>, Two} - ,{<<"inbound">>, In} - ,{<<"prepay">>, wapi_money:units_to_dollars(Prepay)} - ,{<<"account_id">>, Acct} - ,{<<"trunks">>, trunks_to_json(Dict)} - ]), State}; + ,{<<"max_inbound">>, MaxIn} + ,{<<"two_way">>, Two} + ,{<<"inbound">>, In} + ,{<<"prepay">>, wapi_money:units_to_dollars(Prepay)} + ,{<<"account_id">>, Acct} + ,{<<"trunks">>, trunks_to_json(Dict)} + ]), State}; handle_call(known_calls, _, #state{trunks_in_use=Dict}=State) -> {reply, dict:to_list(Dict), State}; @@ -243,18 +205,18 @@ handle_call({authz, JObj, inbound}, _From, #state{two_way=T,inbound=I,prepay=P}= ?LOG(CallID, "Trunks available: Two: ~b In: ~b Pre: ~b Per-min: ~b", [T, I, P, wapi_money:default_per_min_charge()]), ToDID = case binary:split(wh_json:get_value(<<"To">>, JObj), <<"@">>) of - [<<"nouser">>, _] -> - [RUser, _] = binary:split(wh_json:get_value(<<"Request">>, JObj, <<"nouser">>), <<"@">>), - wnm_util:to_e164(RUser); - [ToUser, _] -> wnm_util:to_e164(ToUser) - end, + [<<"nouser">>, _] -> + [RUser, _] = binary:split(wh_json:get_value(<<"Request">>, JObj, <<"nouser">>), <<"@">>), + wnm_util:to_e164(RUser); + [ToUser, _] -> wnm_util:to_e164(ToUser) + end, ?LOG("ToDID: ~s", [ToDID]), {Resp, State1} = case is_us48(ToDID) of - true -> try_inbound_then_twoway(CallID, State); - false -> try_prepay(CallID, State, wapi_money:default_per_min_charge()) - end, + true -> try_inbound_then_twoway(CallID, State); + false -> try_prepay(CallID, State, wapi_money:default_per_min_charge()) + end, {reply, Resp, State1, hibernate}; handle_call({authz, JObj, outbound}, _From, #state{two_way=T,prepay=P}=State) -> @@ -263,21 +225,21 @@ handle_call({authz, JObj, outbound}, _From, #state{two_way=T,prepay=P}=State) -> ?LOG(CallID, "Trunks available: Two: ~b Pre: ~b Per-min: ~b", [T, P, wapi_money:default_per_min_charge()]), ToDID = case binary:split(wh_json:get_value(<<"To">>, JObj), <<"@">>) of - [<<"nouser">>, _] -> - [RUser, _] = binary:split(wh_json:get_value(<<"Request">>, JObj, <<"nouser">>), <<"@">>), - wnm_util:to_e164(RUser); - [ToUser, _] -> wnm_util:to_e164(ToUser) - end, + [<<"nouser">>, _] -> + [RUser, _] = binary:split(wh_json:get_value(<<"Request">>, JObj, <<"nouser">>), <<"@">>), + wnm_util:to_e164(RUser); + [ToUser, _] -> wnm_util:to_e164(ToUser) + end, ?LOG("ToDID: ~s", [ToDID]), {Resp, State1} = case {erlang:byte_size(ToDID) > 6, is_us48(ToDID)} of - {true, true} -> try_twoway_then_prepay(CallID, State); - {true, false} -> try_prepay(CallID, State, wapi_money:default_per_min_charge()); - {false, _} -> - ?LOG(CallID, "Auto-authz call to internal-seeming extension: ~s", [ToDID]), - {{true, [{<<"Trunk-Type">>, <<"internal">>}]}, State} - end, + {true, true} -> try_twoway_then_prepay(CallID, State); + {true, false} -> try_prepay(CallID, State, wapi_money:default_per_min_charge()); + {false, _} -> + ?LOG(CallID, "Auto-authz call to internal-seeming extension: ~s", [ToDID]), + {{true, [{<<"Trunk-Type">>, <<"internal">>}]}, State} + end, {reply, Resp, State1, hibernate}. %%-------------------------------------------------------------------- @@ -291,22 +253,22 @@ handle_call({authz, JObj, outbound}, _From, #state{two_way=T,prepay=P}=State) -> %% @end %%-------------------------------------------------------------------- handle_cast({money, <<"balance_req">>, JObj}, #state{max_two_way=MaxTwoWay, max_inbound=MaxInbound - ,two_way=TwoWay, inbound=Inbound, prepay=Prepay - ,acct_id=AcctId, trunks_in_use=Dict - }=State) -> + ,two_way=TwoWay, inbound=Inbound, prepay=Prepay + ,acct_id=AcctId, trunks_in_use=Dict + }=State) -> SrvId = wh_json:get_value(<<"Server-ID">>, JObj), ?LOG("Sending balance resp to ~s", [SrvId]), wapi_money:publish_balance_resp(SrvId, [ - {<<"Max-Two-Way">>, MaxTwoWay} - ,{<<"Two-Way">>, TwoWay} - ,{<<"Max-Inbound">>, MaxInbound} - ,{<<"Inbound">>, Inbound} - ,{<<"Prepay">>, wapi_money:units_to_dollars(Prepay)} - ,{<<"Account-ID">>, AcctId} - ,{<<"Trunks">>, trunks_to_json(Dict)} - ,{<<"Node">>, wh_util:to_binary(node())} - | wh_api:default_headers(?MODULE, ?APP_VERSION) - ]), + {<<"Max-Two-Way">>, MaxTwoWay} + ,{<<"Two-Way">>, TwoWay} + ,{<<"Max-Inbound">>, MaxInbound} + ,{<<"Inbound">>, Inbound} + ,{<<"Prepay">>, wapi_money:units_to_dollars(Prepay)} + ,{<<"Account-ID">>, AcctId} + ,{<<"Trunks">>, trunks_to_json(Dict)} + ,{<<"Node">>, wh_util:to_binary(node())} + | wh_api:default_headers(?MODULE, ?APP_VERSION) + ]), {noreply, State}; handle_cast({money, _Evt, _JObj}, #state{prepay=Prepay, acct_id=AcctId}=State) -> @@ -322,28 +284,28 @@ handle_cast({money, _Evt, _JObj}, #state{prepay=Prepay, acct_id=AcctId}=State) - handle_cast({authz_win, JObj}, #state{trunks_in_use=Dict}=State) -> spawn(fun() -> - ?LOG("Authz won!"), + ?LOG("Authz won!"), - CID = wh_json:get_value(<<"Call-ID">>, JObj), + CID = wh_json:get_value(<<"Call-ID">>, JObj), - [Pid] = [ P || {CallID,{_,P}} <- dict:to_list(Dict), CallID =:= CID], - ?LOG("Sending authz_win to ~p", [Pid]), - j5_call_monitor:authz_won(Pid) - end), + [Pid] = [ P || {CallID,{_,P}} <- dict:to_list(Dict), CallID =:= CID], + ?LOG("Sending authz_win to ~p", [Pid]), + j5_call_monitor:authz_won(Pid) + end), {noreply, State}; -handle_cast(refresh, #state{acct_type=AcctType, acct_id=AcctID, max_two_way=OldTwo, max_inbound=OldIn, prepay=OldPrepay}=State) -> - case catch(get_trunks_available(AcctID, AcctType)) of - {Trunks, InboundTrunks, Prepay, _} -> - ?LOG("Maybe changing max two way from ~b to ~p", [OldTwo, Trunks]), - ?LOG("Maybe changing max inbound from ~b to ~p", [OldIn, InboundTrunks]), - ?LOG("Maybe changing prepay from ~b to ~p", [OldPrepay, Prepay]), - {noreply, State#state{max_two_way=try_update_value(Trunks, OldTwo) - ,max_inbound=try_update_value(InboundTrunks, OldIn) - ,prepay=try_update_value(Prepay, OldPrepay) - }}; - _E -> - ?LOG("Failed to refresh: ~p", [_E]), - {noreply, State} +handle_cast(refresh, #state{acct_id=AcctID, max_two_way=OldTwo, max_inbound=OldIn, prepay=OldPrepay}=State) -> + case catch get_trunks_available(AcctID) of + {Trunks, InboundTrunks, Prepay} -> + ?LOG("Maybe changing max two way from ~b to ~p", [OldTwo, Trunks]), + ?LOG("Maybe changing max inbound from ~b to ~p", [OldIn, InboundTrunks]), + ?LOG("Maybe changing prepay from ~b to ~p", [OldPrepay, Prepay]), + {noreply, State#state{max_two_way=try_update_value(Trunks, OldTwo) + ,max_inbound=try_update_value(InboundTrunks, OldIn) + ,prepay=try_update_value(Prepay, OldPrepay) + }}; + _E -> + ?LOG("Failed to refresh: ~p", [_E]), + {noreply, State} end; handle_cast({conf_change, <<"doc_deleted">>, _JObj}, State) -> @@ -351,44 +313,42 @@ handle_cast({conf_change, <<"doc_deleted">>, _JObj}, State) -> {stop, normal, State}; handle_cast({conf_change, <<"doc_created">>, JObj}, State) -> handle_cast({conf_change, <<"doc_edited">>, JObj}, State); -handle_cast({conf_change, <<"doc_edited">>, JObj}, #state{acct_id=AcctID, acct_type=AcctType - ,max_two_way=MTW, max_inbound=MI, prepay=P - ,two_way=_TW, inbound=_I - ,trunks_in_use=Dict - }=State) -> +handle_cast({conf_change, <<"doc_edited">>, JObj}, #state{acct_id=AcctID + ,max_two_way=MTW, max_inbound=MI, prepay=P + ,two_way=_TW, inbound=_I + ,trunks_in_use=Dict + }=State) -> ConfAcctID = wh_json:get_value(<<"ID">>, JObj, <<"missing">>), Doc = wh_json:get_value(<<"Doc">>, JObj), - {Trunks, InboundTrunks, Prepay, _} = case AcctType of - account when ConfAcctID =:= AcctID -> - case get_account_values(AcctID, Doc) of - {undefined, undefined, _, account} -> - get_ts_values(AcctID, Doc); - Levels -> Levels - end; - ts when ConfAcctID =:= AcctID -> - get_ts_values(AcctID, Doc); - _ -> - ?LOG("No change necessary"), - ?LOG("Conf acct id: ~s", [ConfAcctID]), - {MTW, MI, P} - end, + {Trunks, InboundTrunks, Prepay} = case ConfAcctID =:= AcctID of + true -> + case get_account_values(AcctID, Doc) of + {undefined, undefined, _} -> + get_ts_values(AcctID, Doc); + Levels -> Levels + end; + false -> + ?LOG("No change necessary"), + ?LOG("Conf acct id: ~s", [ConfAcctID]), + {MTW, MI, P} + end, NMTW = try_update_value(Trunks, MTW), NMI = try_update_value(InboundTrunks,MI), {NTWIU, NTIIU} = dict:fold(fun(_CallID, {twoway, MonPid}, {Two, In}=Acc) -> - case erlang:is_process_alive(MonPid) of - true -> {Two-1, In}; - false -> Acc - end; - (_CallID, {inbound, MonPid}, {Two, In}=Acc) -> - case erlang:is_process_alive(MonPid) of - true -> {Two, In-1}; - false -> Acc - end; - (_, _, Acc) -> Acc %% ignore per-min - end, {NMTW, NMI}, Dict), + case erlang:is_process_alive(MonPid) of + true -> {Two-1, In}; + false -> Acc + end; + (_CallID, {inbound, MonPid}, {Two, In}=Acc) -> + case erlang:is_process_alive(MonPid) of + true -> {Two, In-1}; + false -> Acc + end; + (_, _, Acc) -> Acc %% ignore per-min + end, {NMTW, NMI}, Dict), Dict1 = dict:filter(fun(_CallID, {_, Pid}) -> erlang:is_process_alive(Pid) end, Dict), @@ -399,12 +359,12 @@ handle_cast({conf_change, <<"doc_edited">>, JObj}, #state{acct_id=AcctID, acct_t ?LOG("Maybe changing prepay from ~b to ~p", [P, Prepay]), {noreply, State#state{max_two_way=NMTW - ,max_inbound=NMI - ,two_way=NTWIU - ,inbound=NTIIU - ,prepay=try_update_value(Prepay, P) - ,trunks_in_use=Dict1 - }, hibernate}; + ,max_inbound=NMI + ,two_way=NTWIU + ,inbound=NTIIU + ,prepay=try_update_value(Prepay, P) + ,trunks_in_use=Dict1 + }, hibernate}; handle_cast(Req, State) -> ?LOG("Failed cast request: ~p", [Req]), @@ -420,28 +380,28 @@ handle_cast(Req, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_info({timeout, SyncRef, sync}, #state{sync_ref=SyncRef, acct_id=AcctID, acct_type=AcctType - ,max_two_way=Two, max_inbound=In, prepay=Pre - }=State) -> +handle_info({timeout, SyncRef, sync}, #state{sync_ref=SyncRef, acct_id=AcctID + ,max_two_way=Two, max_inbound=In, prepay=Pre + }=State) -> ?LOG_SYS("Syncing with DB"), - {NewTwo, NewIn, NewPre, _} = get_trunks_available(AcctID, AcctType), + {NewTwo, NewIn, NewPre} = get_trunks_available(AcctID), ?LOG("Old Maxs: two: ~p, in: ~p, prepay: ~p", [Two, In, Pre]), ?LOG("New Possible Maxs: two: ~p, in: ~p, prepay: ~p", [NewTwo, NewIn, NewPre]), {noreply, State#state{sync_ref=erlang:start_timer(?SYNC_TIMER + sync_fudge(), self(), sync) - ,max_two_way=try_update_value(NewTwo, Two) - ,max_inbound=try_update_value(NewIn, In) - ,prepay=try_update_value(NewPre, Pre) - }}; + ,max_two_way=try_update_value(NewTwo, Two) + ,max_inbound=try_update_value(NewIn, In) + ,prepay=try_update_value(NewPre, Pre) + }}; handle_info({'DOWN', _Ref, process, Pid, Reason}, #state{two_way=T, inbound=I, trunks_in_use=Dict}=State) -> ?LOG("Pid ~p down: ~p, checking for call monitor proc", [Pid, Reason]), case unmonitor_call(Pid, Dict) of - {twoway, Dict1} -> ?LOG("Was two-way trunk, adding 1 to ~b", [T]), {noreply, State#state{two_way=T+1, trunks_in_use=Dict1}}; - {inbound, Dict1} -> ?LOG("Was inbound trunk, adding 1 to ~b", [I]), {noreply, State#state{inbound=T+1, trunks_in_use=Dict1}}; - {per_min, Dict1} -> ?LOG("Was prepay trunk"), {noreply, State#state{trunks_in_use=Dict1}}; - _ -> ?LOG("Ignoring down proc"), {noreply, State} + {twoway, Dict1} -> ?LOG("Was two-way trunk, adding 1 to ~b", [T]), {noreply, State#state{two_way=T+1, trunks_in_use=Dict1}}; + {inbound, Dict1} -> ?LOG("Was inbound trunk, adding 1 to ~b", [I]), {noreply, State#state{inbound=T+1, trunks_in_use=Dict1}}; + {per_min, Dict1} -> ?LOG("Was prepay trunk"), {noreply, State#state{trunks_in_use=Dict1}}; + _ -> ?LOG("Ignoring down proc"), {noreply, State} end; handle_info(#'basic.consume_ok'{}, State) -> @@ -483,30 +443,27 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== --spec get_trunks_available/2 :: (ne_binary(), 'account' | 'ts') -> {'undefined' | non_neg_integer() - ,'undefined' | non_neg_integer() - ,integer() - ,'account' | 'ts' - }. -get_trunks_available(AcctID, account) -> - case couch_mgr:get_results(wh_util:format_account_id(AcctID, encoded), <<"limits/crossbar_listing">>, [{<<"include_docs">>, true}]) of - {ok, []} -> - ?LOG("No results from view, trying ts doc"), - get_trunks_available(AcctID, ts); - {error, not_found} -> - ?LOG("Error loading view, trying ts doc"), - get_trunks_available(AcctID, ts); - {ok, [JObj|_]} -> - ?LOG("View result retrieved"), - get_account_values(AcctID, JObj) - end; -get_trunks_available(AcctID, ts) -> - case couch_mgr:open_doc(<<"ts">>, AcctID) of - {error, not_found} -> - ?LOG_SYS("No account found in ts: ~s", [AcctID]), - {0,0,j5_util:current_usage(AcctID),account}; - {ok, JObj} -> - get_ts_values(AcctID, JObj) +-spec get_trunks_available/1 :: (ne_binary()) -> {'undefined' | non_neg_integer() + ,'undefined' | non_neg_integer() + ,integer() + }. +get_trunks_available(AcctID) -> + AcctDB = wh_util:format_account_id(AcctID, encoded), + case couch_mgr:get_results(AcctDB, <<"limits/crossbar_listing">>, [{<<"include_docs">>, true}]) of + {ok, [JObj|_]} -> + ?LOG("View result retrieved"), + get_account_values(AcctID, JObj); + _ -> + case couch_mgr:get_results(AcctDB, <<"trunkstore/crossbar_listing">>, [{<<"reduce">>, false} + ,{<<"include_docs">>, true} + ]) of + {ok, [JObj|_]} -> + ?LOG("ts view result retrieved"), + get_ts_values(AcctID, JObj); + _ -> + {ok, JObj} = couch_mgr:open_doc(<<"ts">>, AcctID), + get_ts_values(AcctID, JObj) + end end. get_ts_values(AcctID, JObj) -> @@ -518,7 +475,7 @@ get_ts_values(AcctID, JObj) -> Prepay = j5_util:current_usage(AcctID), ?LOG_SYS("Found ts trunk levels: ~p two way, ~p inbound, and $ ~p prepay", [Trunks, InboundTrunks, Prepay]), - {Trunks, InboundTrunks, Prepay, ts}. + {Trunks, InboundTrunks, Prepay}. get_account_values(AcctID, JObj) -> Trunks = wh_json:get_integer_value(<<"trunks">>, JObj), @@ -526,26 +483,26 @@ get_account_values(AcctID, JObj) -> Prepay = j5_util:current_usage(AcctID), ?LOG_SYS("Found trunk levels: ~p two way, ~p inbound, and $ ~p prepay", [Trunks, InboundTrunks, Prepay]), - {Trunks, InboundTrunks, Prepay, account}. + {Trunks, InboundTrunks, Prepay}. -spec try_inbound_then_twoway/2 :: (ne_binary(), #state{}) -> {{boolean(), proplist()}, #state{}}. try_inbound_then_twoway(CallID, State) -> case try_inbound(CallID, State) of - {{true, _}, _}=Resp -> - ?LOG_END(CallID, "Inbound call authorized with inbound trunk", []), - Resp; - {{false, _}, State2} -> - try_twoway_then_prepay(CallID, State2) + {{true, _}, _}=Resp -> + ?LOG_END(CallID, "Inbound call authorized with inbound trunk", []), + Resp; + {{false, _}, State2} -> + try_twoway_then_prepay(CallID, State2) end. -spec try_twoway_then_prepay/2 :: (ne_binary(), #state{}) -> {{boolean(), proplist()}, #state{}}. try_twoway_then_prepay(CallID, State) -> case try_twoway(CallID, State) of - {{true, _}, _}=Resp -> - ?LOG_END(CallID, "Authorized using a two-way trunk", []), - Resp; - {{false, _}, State2} -> - try_prepay(CallID, State2, wapi_money:default_per_min_charge()) + {{true, _}, _}=Resp -> + ?LOG_END(CallID, "Authorized using a two-way trunk", []), + Resp; + {{false, _}, State2} -> + try_prepay(CallID, State2, wapi_money:default_per_min_charge()) end. -spec try_twoway/2 :: (ne_binary(), #state{}) -> {{boolean(), proplist()}, #state{}}. @@ -582,35 +539,35 @@ try_prepay(CallID, #state{prepay=Pre, acct_id=AcctId}=State, PerMinCharge) when %% Alert admins of the situation whapps_util:alert(<<"alert">>, ["Source: ~s(~p)~n" - ,"Alert: Insufficient prepay to authorize the call.~n" - ,"Call-ID: ~s~n" - ,"Account-ID: ~s~n" - ,"Current Prepay Balance: ~p~n" - ] - ,[?MODULE, ?LINE, CallID, AcctId, wapi_money:units_to_dollars(Pre)]), + ,"Alert: Insufficient prepay to authorize the call.~n" + ,"Call-ID: ~s~n" + ,"Account-ID: ~s~n" + ,"Current Prepay Balance: ~p~n" + ] + ,[?MODULE, ?LINE, CallID, AcctId, wapi_money:units_to_dollars(Pre)]), case whapps_config:get_is_true(<<"jonny5">>, <<"authz_on_no_prepay">>, true) of - true -> - ?LOG("authz_on_no_prepay set to true, authz the call"), - {{true, [{<<"Trunk-Type">>, <<"pre_min">>}]}}; - false -> - ?LOG("authz_on_no_prepay set to false, denying the call"), - {{false, [{<<"Error">>, <<"Insufficient Funds">>}]}, State} + true -> + ?LOG("authz_on_no_prepay set to true, authz the call"), + {{true, [{<<"Trunk-Type">>, <<"pre_min">>}]}}; + false -> + ?LOG("authz_on_no_prepay set to false, denying the call"), + {{false, [{<<"Error">>, <<"Insufficient Funds">>}]}, State} end; try_prepay(CallID, #state{acct_id=AcctId, prepay=Prepay, trunks_in_use=Dict, ledger_db=LedgerDB}=State, PerMinCharge) -> case jonny5_listener:is_blacklisted(AcctId) of - {true, Reason} -> - ?LOG_SYS(CallID, "Authz false for per_min: ~s", [Reason]), - {{false, [{<<"Error">>, Reason}]}, State}; - false -> - PrepayLeft = Prepay - PerMinCharge, - ?LOG_SYS(CallID, "Authz a per_min trunk; ~b prepay left, ~b charged up-front", [PrepayLeft, PerMinCharge]), - {ok, Pid} = monitor_call(CallID, LedgerDB, per_min, PerMinCharge), - erlang:monitor(process, Pid), - - {{true, [{<<"Trunk-Type">>, <<"per_min">>}]} - ,State#state{trunks_in_use=dict:store(CallID, {per_min, Pid}, Dict), prepay=PrepayLeft} - } + {true, Reason} -> + ?LOG_SYS(CallID, "Authz false for per_min: ~s", [Reason]), + {{false, [{<<"Error">>, Reason}]}, State}; + false -> + PrepayLeft = Prepay - PerMinCharge, + ?LOG_SYS(CallID, "Authz a per_min trunk; ~b prepay left, ~b charged up-front", [PrepayLeft, PerMinCharge]), + {ok, Pid} = monitor_call(CallID, LedgerDB, per_min, PerMinCharge), + erlang:monitor(process, Pid), + + {{true, [{<<"Trunk-Type">>, <<"per_min">>}]} + ,State#state{trunks_in_use=dict:store(CallID, {per_min, Pid}, Dict), prepay=PrepayLeft} + } end. -spec monitor_call/3 :: (ne_binary(), ne_binary(), call_types()) -> {'ok', pid()}. @@ -624,11 +581,11 @@ monitor_call(CallID, LedgerDB, CallType, Debit) -> -spec unmonitor_call/2 :: (pid(), dict()) -> {call_types() | 'ignore', dict()}. unmonitor_call(Pid, Dict) -> dict:fold(fun(CallId, {Type, MonPid}, {_, Dict0}) when MonPid =:= Pid -> - ?LOG(CallId, "Found monitor pid: ~p for trunk of type ~s", [Pid, Type]), - {Type, Dict0}; - (CallId, V, {Type, Dict0}) -> - {Type, dict:store(CallId, V, Dict0)} - end, {ignore, dict:new()}, Dict). + ?LOG(CallId, "Found monitor pid: ~p for trunk of type ~s", [Pid, Type]), + {Type, Dict0}; + (CallId, V, {Type, Dict0}) -> + {Type, dict:store(CallId, V, Dict0)} + end, {ignore, dict:new()}, Dict). %% Match +1XXXYYYZZZZ as US-48; all others are not is_us48(<<"+1", Rest/binary>>) when erlang:byte_size(Rest) =:= 10 -> true; diff --git a/whistle_apps/apps/jonny5/src/j5_dth_blacklist.erl b/whistle_apps/apps/jonny5/src/j5_dth_blacklist.erl index ba037398a81..a3c5f7682a3 100644 --- a/whistle_apps/apps/jonny5/src/j5_dth_blacklist.erl +++ b/whistle_apps/apps/jonny5/src/j5_dth_blacklist.erl @@ -15,24 +15,23 @@ %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, handle_event/2 - ,terminate/2, code_change/3]). + ,terminate/2, code_change/3]). -include("jonny5.hrl"). --include_lib("dth/include/dth_amqp.hrl"). -define(RESPONDERS, [ - {?MODULE, [{<<"dth">>, <<"blacklist_resp">>}]} - ]). + {?MODULE, [{<<"dth">>, <<"blacklist_resp">>}]} + ]). -define(BINDINGS, [ - {self, []} - ]). + {self, []} + ]). -define(SERVER, ?MODULE). -define(BLACKLIST_UPDATE_TIMER, 5000). -record(state, { - blacklist = dict:new() :: dict() %% {AccountID, Reason} - }). + blacklist = dict:new() :: dict() %% {AccountID, Reason} + }). %%%=================================================================== %%% API @@ -47,8 +46,8 @@ %%-------------------------------------------------------------------- start_link() -> gen_listener:start_link(?MODULE, [{responders, ?RESPONDERS} - ,{bindings, ?BINDINGS} - ], []). + ,{bindings, ?BINDINGS} + ], []). stop(Srv) -> gen_listener:stop(Srv). @@ -105,11 +104,11 @@ init([]) -> %%-------------------------------------------------------------------- handle_call({is_blacklisted, AccountID}, _From, #state{blacklist=BL}=State) -> try - Reason = dict:fetch(AccountID, BL), - {reply, {true, Reason}, State} + Reason = dict:fetch(AccountID, BL), + {reply, {true, Reason}, State} catch - _:_ -> - {reply, false, State} + _:_ -> + {reply, false, State} end. %%-------------------------------------------------------------------- @@ -184,9 +183,5 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -request_blacklist(Srv) -> - Queue = gen_listener:queue_name(Srv), - Prop = wh_api:default_headers(Queue, <<"dth">>, <<"blacklist_req">>, ?APP_NAME, ?APP_VERSION), - {ok, JSON} = dth_api:blacklist_req(Prop), - ?LOG_SYS("Sending request for blacklist: ~s", [JSON]), - amqp_util:callmgr_publish(JSON, <<"application/json">>, ?KEY_DTH_BLACKLIST_REQ). +request_blacklist(_Srv) -> + ok. diff --git a/whistle_apps/apps/notify/src/notify_first_occurrence.erl b/whistle_apps/apps/notify/src/notify_first_occurrence.erl index b308edf8d91..5bb6a9cbb03 100644 --- a/whistle_apps/apps/notify/src/notify_first_occurrence.erl +++ b/whistle_apps/apps/notify/src/notify_first_occurrence.erl @@ -52,7 +52,7 @@ init() -> %%-------------------------------------------------------------------- -spec start_crawler/0 :: () -> {ok, pid()}. start_crawler() -> - {ok, spawn_link(fun crawler_loop/0)}. + {ok, proc_lib:spawn_link(fun crawler_loop/0)}. %%-------------------------------------------------------------------- %% @public @@ -209,7 +209,7 @@ crawler_loop() -> end, erlang:send_after(30000, self(), wakeup), flush(), - erlang:hibernate(?MODULE, crawler_loop, []). + proc_lib:hibernate(?MODULE, crawler_loop, []). %%-------------------------------------------------------------------- %% @private diff --git a/whistle_apps/src/whapps_config.erl b/whistle_apps/src/whapps_config.erl index 161698715af..7c1cdd73bce 100644 --- a/whistle_apps/src/whapps_config.erl +++ b/whistle_apps/src/whapps_config.erl @@ -20,7 +20,7 @@ -export([get_non_empty/2, get_non_empty/3, get_non_empty/4]). -export([set/3, set/4, set_default/3]). --export([flush/0, import/1]). +-export([flush/0, import/1, couch_ready/0]). -type config_category() :: ne_binary() | nonempty_string() | atom(). -type config_key() :: ne_binary() | nonempty_string() | atom(). @@ -338,6 +338,7 @@ fetch_category(Category, Cache) -> %%----------------------------------------------------------------------------- -spec fetch_db_config/2 :: (ne_binary(), pid()) -> {'ok', wh_json:json_object()} | {'error', 'not_found'}. fetch_db_config(Category, Cache) -> + ?LOG("fetch db config for ~s", [Category]), case couch_mgr:open_doc(?WH_CONFIG_DB, Category) of {ok, JObj}=Ok -> wh_cache:store_local(Cache, {?MODULE, Category}, JObj), @@ -394,7 +395,6 @@ config_terms_to_json(Terms) -> -spec do_set/4 :: (config_category(), config_key(), term(), ne_binary()) -> {'ok', wh_json:json_object()}. do_set(Category, Key, Value, Node) -> {ok, Cache} = whistle_apps_sup:config_cache_proc(), - UpdateFun = fun(J) -> NodeConfig = wh_json:get_value(Node, J, wh_json:new()), wh_json:set_value(Key, Value, NodeConfig) @@ -409,8 +409,12 @@ do_set(Category, Key, Value, Node) -> %% @end %%----------------------------------------------------------------------------- -spec update_category_node/4 :: (ne_binary(), ne_binary(), fun((wh_json:json_object()) -> wh_json:json_object()) , pid()) -> {'ok', wh_json:json_object()}. -update_category_node(Category, Node, UpdateFun , Cache) -> - case is_pid(whereis(couch_mgr)) andalso couch_mgr:open_doc(?WH_CONFIG_DB, Category) of +update_category_node(Category, Node, UpdateFun, Cache) -> + DBReady = case wh_cache:fetch_local(Cache, {?MODULE, couch_mgr_ready}) of + {ok, true} -> true; + _ -> false + end, + case DBReady andalso couch_mgr:open_doc(?WH_CONFIG_DB, Category) of {ok, JObj} -> case wh_json:set_value(Node, UpdateFun(JObj), JObj) of JObj -> {ok, JObj}; @@ -488,3 +492,11 @@ category_to_file(<<"crossbar.shared_auth">>) -> [code:lib_dir(crossbar, priv), "/shared_auth/shared_auth.config"]; category_to_file(_) -> undefined. + +couch_ready() -> + case whereis(couch_mgr) =:= self() of + true -> + {ok, Cache} = whistle_apps_sup:config_cache_proc(), + wh_cache:store_local(Cache, {?MODULE, couch_mgr_ready}, true); + false -> ok + end. diff --git a/whistle_apps/src/whapps_controller.erl b/whistle_apps/src/whapps_controller.erl index 5850e4f4ef8..d1b4a1906df 100644 --- a/whistle_apps/src/whapps_controller.erl +++ b/whistle_apps/src/whapps_controller.erl @@ -72,7 +72,7 @@ running_apps() -> initialize_whapps() -> couch_mgr:db_create(?WH_CONFIG_DB), case whapps_config:get(?MODULE, <<"cookie">>) of - undefined -> ok; + null -> ok; Cookie -> ?LOG("changing the erlang cookie to ~s", [Cookie]), erlang:set_cookie(node(), wh_util:to_atom(Cookie, true)) diff --git a/whistle_apps/src/whapps_util.erl b/whistle_apps/src/whapps_util.erl index 4b584108e4f..f7488cd7ae3 100644 --- a/whistle_apps/src/whapps_util.erl +++ b/whistle_apps/src/whapps_util.erl @@ -269,10 +269,11 @@ should_alert_system_admin(AlertLevel) -> SystemLevel = whapps_config:get(<<"alerts">>, <<"system_admin_level">>, <<"debug">>), case alert_level_to_integer(SystemLevel) of 0 -> undefined; + null -> undefined; L when L =< AlertLevel -> case whapps_config:get(<<"alerts">>, <<"system_admin_email">>) of - undefined -> - undefined; + undefined -> undefined; + null -> undefined; Email when is_binary(Email) -> Email; Emails when is_list(Emails) -> @@ -293,6 +294,8 @@ should_alert_system_admin(AlertLevel) -> AccountId :: undefined | binary(). should_alert_account_admin(_, undefined) -> undefined; +should_alert_account_admin(_, null) -> + undefined; should_alert_account_admin(AlertLevel, AccountId) -> AccountDb = wh_util:format_account_id(AccountId, encoded), case couch_mgr:open_doc(AccountDb, AccountId) of @@ -300,6 +303,7 @@ should_alert_account_admin(AlertLevel, AccountId) -> AdminLevel = wh_json:get_value([<<"alerts">>, <<"level">>], JObj), case alert_level_to_integer(AdminLevel) of 0 -> undefined; + null -> undefined; L when L =< AlertLevel -> case wh_json:get_value([<<"alerts">>, <<"email">>], JObj) of undefined -> @@ -340,6 +344,10 @@ alert_level_to_integer(<<"debug">>) -> 1; alert_level_to_integer(<<_/binary>>) -> 0; +alert_level_to_integer(undefined) -> + 5; +alert_level_to_integer(null) -> + 5; alert_level_to_integer(Level) -> alert_level_to_integer(wh_util:to_binary(Level)).