Skip to content

Commit

Permalink
WHISTLE-250: j5_acctmgr listens for config changes to the sip service…
Browse files Browse the repository at this point in the history
… doc for its account and updates totals accordingly
  • Loading branch information
James Aimonetti committed Nov 17, 2011
1 parent 171afc7 commit 51a88de
Showing 1 changed file with 109 additions and 63 deletions.
172 changes: 109 additions & 63 deletions whistle_apps/apps/jonny5/src/j5_acctmgr.erl
Expand Up @@ -52,8 +52,10 @@
-spec start_link/1 :: (ne_binary()) -> {'ok', pid()} | 'ignore' | {'error', term()}.
start_link(AcctID) ->
%% why are we receiving messages for account IDs we don't bind to?
gen_listener:start_link(?MODULE, [{bindings, [{self, []}, {jonny5, [{account_id, AcctID}]}]}
,{conf, [{db, AcctID}, {doc_type, <<"sip_service">>}]} % bind to config changes for this account.pvt_type
gen_listener:start_link(?MODULE, [{bindings, [{self, []}
,{jonny5, [{account_id, AcctID}]}
,{conf, [{doc_id, AcctID}, {doc_type, <<"sip_service">>}]}
]}
,{responders, [{ {?MODULE, handle_call_event}, [{<<"call_event">>, <<"*">>} % call events
,{<<"call_detail">>, <<"*">>} % and CDR
]
Expand Down Expand Up @@ -142,27 +144,46 @@ init([AcctID]) ->

StartTime = wh_util:current_tstamp(),

put(callid, AcctID),
case get_trunks_available(AcctID, account) of
{error, not_found} ->
?LOG_SYS("No account found for ~s", [AcctID]),
{stop, no_account};
{undefined, undefined, undefined, account} ->
{TwoWay, Inbound, Prepay, ts} = get_trunks_available(AcctID, ts),
?LOG_SYS("Init for ts ~s complete", [AcctID]),

{ok, Rev} = couch_mgr:lookup_doc_rev(<<"ts">>, AcctID),

{ok, #state{prepay=try_update_value(Prepay, 0.0)
,two_way=try_update_value(TwoWay, 0)
,inbound=try_update_value(Inbound, 0)
,max_two_way=try_update_value(TwoWay, 0)
,max_inbound=try_update_value(Inbound, 0)
,acct_rev=Rev, acct_id=AcctID, acct_type=ts
,start_time=StartTime, sync_ref=SyncRef
}};
{TwoWay, Inbound, Prepay, account} ->
?LOG_SYS("Init for account ~s complete", [AcctID]),
{ok, #state{prepay=Prepay
,two_way=TwoWay, inbound=Inbound
,max_two_way=TwoWay, max_inbound=Inbound

{ok, #state{prepay=try_update_value(Prepay, 0.0)
,two_way=try_update_value(TwoWay, 0)
,inbound=try_update_value(Inbound, 0)
,max_two_way=try_update_value(TwoWay, 0)
,max_inbound=try_update_value(Inbound, 0)
,acct_id=AcctID, acct_type=account
,start_time=StartTime, sync_ref=SyncRef
}};
{TwoWay, Inbound, Prepay, ts} ->
?LOG_SYS("Init for ts ~s complete", [AcctID]),
couch_mgr:add_change_handler(<<"ts">>, AcctID),

{ok, Rev} = couch_mgr:lookup_doc_rev(<<"ts">>, AcctID),

{ok, #state{prepay=Prepay
,two_way=TwoWay, inbound=Inbound
,max_two_way=TwoWay, max_inbound=Inbound
{ok, #state{prepay=try_update_value(Prepay, 0.0)
,two_way=try_update_value(TwoWay, 0)
,inbound=try_update_value(Inbound, 0)
,max_two_way=try_update_value(TwoWay, 0)
,max_inbound=try_update_value(Inbound, 0)
,acct_rev=Rev, acct_id=AcctID, acct_type=ts
,start_time=StartTime, sync_ref=SyncRef
}}
Expand Down Expand Up @@ -245,20 +266,65 @@ handle_call({authz, JObj, outbound}, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast(refresh, #state{acct_type=AcctType, acct_id=AcctID, max_two_way=_OldTwo, max_inbound=_OldIn}=State) ->
handle_cast(refresh, #state{acct_type=AcctType, acct_id=AcctID, max_two_way=OldTwo, max_inbound=OldIn, prepay=OldPrepay}=State) ->
case catch(get_trunks_available(AcctID, AcctType)) of
{MaxTwo, MaxIn, _Prepay, AcctType} ->
?LOG("Updating max two to ~b (from ~b), max inbound to ~b (from ~b)", [MaxTwo, _OldTwo, MaxIn, _OldIn]),
{noreply, State#state{max_two_way=MaxTwo, max_inbound=MaxIn}};
{Trunks, InboundTrunks, Prepay, _} ->
?LOG("Maybe changing max two way from ~b to ~p", [OldTwo, Trunks]),
?LOG("Maybe changing max inbound from ~b to ~p", [OldIn, InboundTrunks]),
?LOG("Maybe changing prepay from ~b to ~p", [OldPrepay, Prepay]),
{noreply, State#state{max_two_way=try_update_value(Trunks, OldTwo)
,max_inbound=try_update_value(InboundTrunks, OldIn)
,prepay=try_update_value(Prepay, OldPrepay)
}};
_E ->
?LOG("Failed to refresh: ~p", [_E]),
{noreply, State}
end;

handle_cast({conf_change, EvtName, JObj}, State) ->
io:format("Evt: ~s~n", [EvtName]),
[io:format("KV: ~p~n", [KV]) || KV <- wh_json:to_proplist(JObj)],
{noreply, State};
handle_cast({conf_change, <<"doc_edited">>, JObj}, #state{acct_id=AcctID, acct_type=AcctType
,max_two_way=MTW, max_inbound=MI, prepay=P
,two_way=_TW, inbound=_I
,trunks_in_use=Dict
}=State) ->
ConfAcctID = wh_json:get_value(<<"ID">>, JObj, <<"missing">>),
Doc = wh_json:get_value(<<"Doc">>, JObj),

{Trunks, InboundTrunks, Prepay, _} = case AcctType of
account when ConfAcctID =:= AcctID ->
case get_account_values(Doc) of
{undefined, undefined, undefined, account} ->
get_ts_values(Doc);
Levels -> Levels
end;
ts when ConfAcctID =:= AcctID ->
get_ts_values(Doc);
_ ->
?LOG("No change necessary"),
?LOG("Conf acct id: ~s", [ConfAcctID]),
{MTW, MI, P}
end,

NMTW = try_update_value(Trunks, MTW),
NMI = try_update_value(InboundTrunks,MI),

{NTWIU, NTIIU} = dict:fold(fun(_CallID, two_way, {Two, In}) ->
{Two-1, In};
(_CallID, inbound, {Two, In}) ->
{Two, In-1};
(_, _, Acc) -> Acc
end, {NMTW, NMI}, Dict),

?LOG("changing max two way from ~b to ~p", [MTW, NMTW]),
?LOG("changing max inbound from ~b to ~p", [MI, NMI]),
?LOG("changing two-way in use from ~b to ~b", [_TW, NTWIU]),
?LOG("changing inbound in use from ~b to ~b", [_I, NTIIU]),
?LOG("Maybe changing prepay from ~p to ~p", [P, Prepay]),
{noreply, State#state{max_two_way=NMTW
,max_inbound=NMI
,two_way=NTWIU
,inbound=NTIIU
,prepay=try_update_value(Prepay, P)
}, hibernate};

handle_cast({j5_msg, <<"sync_req">>, JObj}, State) ->
spawn(fun() -> send_levels_resp(JObj, State, fun wapi_jonny5:publish_sync_resp/2) end),
Expand Down Expand Up @@ -351,24 +417,6 @@ handle_info({timeout, SyncRef, sync}, #state{start_time=StartTime, sync_ref=Sync
end),
{noreply, State#state{sync_ref=erlang:start_timer(?SYNC_TIMER + sync_fudge(), self(), sync)}};

handle_info({document_changes, AcctID, Changes}, #state{acct_rev=Rev, acct_id=AcctID, acct_type=AcctType}=State) ->
?LOG_SYS("change to account ~s to be processed", [AcctID]),
State1 = lists:foldl(fun(Prop, State0) ->
case props:get_value(<<"rev">>, Prop) of
undefined -> State0;
Rev -> State0;
_NewRev ->
?LOG_SYS("Updating account ~s from ~s to ~s", [AcctID, Rev, _NewRev]),
{Two, In, _, _} = get_trunks_available(AcctID, AcctType),
State0#state{max_two_way=Two, max_inbound=In}
end
end, State, Changes),
{noreply, State1, hibernate};

handle_info({document_deleted, DocID}, State) ->
?LOG_SYS("account ~s deleted, going down", [DocID]),
{stop, normal, State};

handle_info(#'basic.consume_ok'{}, State) ->
{noreply, State};

Expand Down Expand Up @@ -409,17 +457,16 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================

-spec get_trunks_available/2 :: (AcctID, Type) -> {error, not_found} | {non_neg_integer(), non_neg_integer(), float(), account | ts} when
AcctID :: binary(),
Type :: account | ts.
-spec get_trunks_available/2 :: (ne_binary(), 'account' | 'ts') -> {'error', 'not_found'} |
{non_neg_integer(), non_neg_integer(), float(), 'account' | 'ts'}.
get_trunks_available(AcctID, account) ->
case couch_mgr:get_results(whapps_util:get_db_name(AcctID, encoded), <<"limits/crossbar_listing">>, [{<<"include_docs">>, true}]) of
{ok, []} ->
?LOG("No results from view, trying account doc"),
get_trunks_available_from_account_doc(AcctID);
?LOG("No results from view, trying ts doc"),
get_trunks_available(AcctID, ts);
{error, not_found} ->
?LOG("Error loading view, trying account doc"),
get_trunks_available_from_account_doc(AcctID);
?LOG("Error loading view, trying ts doc"),
get_trunks_available(AcctID, ts);
{ok, [JObj|_]} ->
?LOG("View result retrieved"),
get_account_values(JObj)
Expand All @@ -430,32 +477,26 @@ get_trunks_available(AcctID, ts) ->
?LOG_SYS("No account found in ts: ~s", [AcctID]),
E;
{ok, JObj} ->
Acct = wh_json:get_value(<<"account">>, JObj, ?EMPTY_JSON_OBJECT),
Credits = wh_json:get_value(<<"credits">>, Acct, ?EMPTY_JSON_OBJECT),

Trunks = wh_util:to_integer(wh_json:get_value(<<"trunks">>, Acct, 0)),
InboundTrunks = wh_util:to_integer(wh_json:get_value(<<"inbound_trunks">>, Acct, 0)),
Prepay = wh_util:to_float(wh_json:get_value(<<"prepay">>, Credits, 0.0)),
%% Balance = ?DOLLARS_TO_UNITS(),
?LOG_SYS("Found trunk levels for ~s: ~b two way, ~b inbound, and $ ~p prepay", [AcctID, Trunks, InboundTrunks, Prepay]),
{Trunks, InboundTrunks, Prepay, ts}
get_ts_values(JObj)
end.

get_trunks_available_from_account_doc(AcctID) ->
case couch_mgr:open_doc(whapps_util:get_db_name(AcctID, encoded), AcctID) of
{error, not_found} ->
?LOG_SYS("Account ~s not found, trying ts", [AcctID]),
get_trunks_available(AcctID, ts);
{ok, JObj} ->
get_account_values(JObj)
end.
get_ts_values(JObj) ->
Acct = wh_json:get_value(<<"account">>, JObj, wh_json:new()),
Credits = wh_json:get_value(<<"credits">>, Acct, wh_json:new()),

Trunks = wh_json:get_integer_value(<<"trunks">>, Acct),
InboundTrunks = wh_json:get_integer_value(<<"inbound_trunks">>, Acct),
Prepay = wh_json:get_float_value(<<"prepay">>, Credits),
%% Balance = ?DOLLARS_TO_UNITS(),
?LOG_SYS("Found ts trunk levels: ~p two way, ~p inbound, and $ ~p prepay", [Trunks, InboundTrunks, Prepay]),
{Trunks, InboundTrunks, Prepay, ts}.

get_account_values(JObj) ->
Trunks = wh_util:to_integer(wh_json:get_value(<<"trunks">>, JObj, 0)),
InboundTrunks = wh_util:to_integer(wh_json:get_value(<<"inbound_trunks">>, JObj, 0)),
Prepay = wh_util:to_float(wh_json:get_value(<<"prepay">>, JObj, 0.0)),
Trunks = wh_json:get_integer_value(<<"trunks">>, JObj),
InboundTrunks = wh_json:get_integer_value(<<"inbound_trunks">>, JObj),
Prepay = wh_json:get_float_value(<<"prepay">>, JObj),
%% Balance = ?DOLLARS_TO_UNITS(),
?LOG_SYS("Found trunk levels: ~b two way, ~b inbound, and $ ~p prepay", [Trunks, InboundTrunks, Prepay]),
?LOG_SYS("Found trunk levels: ~p two way, ~p inbound, and $ ~p prepay", [Trunks, InboundTrunks, Prepay]),
{Trunks, InboundTrunks, Prepay, account}.

-spec try_inbound_then_twoway/2 :: (CallID, State) -> {{boolean(), proplist()}, #state{}} when
Expand Down Expand Up @@ -601,3 +642,8 @@ send_levels_resp(JObj, #state{two_way=Two, inbound=In, trunks_in_use=Dict, acct_
-spec sync_fudge/0 :: () -> 1..?SYNC_TIMER.
sync_fudge() ->
crypto:rand_uniform(1, ?SYNC_TIMER).

try_update_value(undefined, Default) ->
Default;
try_update_value(New, _) ->
New.

0 comments on commit 51a88de

Please sign in to comment.