Permalink
Browse files

WHISTLE-1217: clean up and start of the crawler

  • Loading branch information...
1 parent 5937f94 commit b9ebf7611c1144444b2de77e63ad57c703773fd9 @k-anderson k-anderson committed Aug 13, 2012
@@ -27,6 +27,9 @@
-type wh_deeplist() :: [term() | wh_deeplist()].
+-type wh_std_return() :: {'ok', _} | {'error', _}.
+-type wh_std_return(Thing) :: {'ok', Thing} | {'error', _}.
+
%% non-empty binary
-define(NE_BINARY, <<_:8,_/binary>>).
-type ne_binary() :: <<_:8,_:_*8>>.
@@ -5,6 +5,9 @@
"cascade_quantities": {
"map": "function (doc) {if (doc.pvt_type != 'service' || doc.pvt_deleted || !doc.quantities) return;for (var key in doc.quantities) {var obj = doc.quantities[key];for (var prop in obj) {if (obj[prop] < 1) continue;for (var idx in doc.pvt_tree) {emit([doc.pvt_tree[idx] || doc._id, key, prop], obj[prop]);}}}}",
"reduce": "_sum"
+ },
+ "dirty": {
+ "map": "function (doc) {if (doc.pvt_type != 'service' || doc.pvt_deleted || !doc.pvt_dirty) return; emit(doc.pvt_modified, null); }"
}
}
}
@@ -16,7 +16,7 @@
}).
-record(wh_service_updates, {bt_subscriptions = []
- ,billing_id
+ ,account_id
,bt_customer
}).
@@ -26,27 +26,35 @@
%%
%% @end
%%--------------------------------------------------------------------
-sync(Items, BillingId) ->
- Customer = fetch_or_create_customer(BillingId),
- sync(wh_service_items:to_list(Items), BillingId, #wh_service_updates{bt_customer=Customer}).
+sync(Items, AccountId) ->
+ Customer = fetch_or_create_customer(AccountId),
+ sync(wh_service_items:to_list(Items), AccountId, #wh_service_updates{bt_customer=Customer}).
-sync([], _BillingId, #wh_service_updates{bt_subscriptions=Subscriptions}) ->
+sync([], _AccountId, #wh_service_updates{bt_subscriptions=Subscriptions}) ->
_ = [braintree_subscription:update(Subscription)
|| #wh_service_update{bt_subscription=Subscription} <- Subscriptions
],
ok;
-sync([ServiceItem|ServiceItems], BillingId, Updates) ->
+sync([ServiceItem|ServiceItems], AccountId, Updates) ->
case braintree_plan_addon_id(ServiceItem) of
- {undefined, _} -> sync(ServiceItems, BillingId, Updates);
- {_, undefined} -> sync(ServiceItems, BillingId, Updates);
+ {undefined, _} ->
+ lager:debug("service item had no plan id: ~p", [ServiceItem]),
+ sync(ServiceItems, AccountId, Updates);
+ {_, undefined} ->
+ lager:debug("service item had no add on id: ~p", [ServiceItem]),
+ sync(ServiceItems, AccountId, Updates);
{PlanId, AddOnId}->
Quantity = wh_service_item:quantity(ServiceItem),
Routines = [fun(S) -> braintree_subscription:update_addon_quantity(S, AddOnId, Quantity) end
+ ,fun(S) ->
+ Rate = wh_service_item:rate(ServiceItem),
+ braintree_subscription:update_addon_amount(S, AddOnId, Rate)
+ end
,fun(S) -> handle_single_discount(ServiceItem, S) end
,fun(S) -> handle_cumulative_discounts(ServiceItem, S) end
],
Subscription = lists:foldl(fun(F, S) -> F(S) end, fetch_or_create_subscription(PlanId, Updates), Routines),
- sync(ServiceItems, BillingId, update_subscriptions(PlanId, Subscription, Updates))
+ sync(ServiceItems, AccountId, update_subscriptions(PlanId, Subscription, Updates))
end.
%%--------------------------------------------------------------------
@@ -96,14 +104,14 @@ handle_cumulative_discounts(ServiceItem, Subscription) ->
%% @end
%%--------------------------------------------------------------------
-spec fetch_or_create_customer/1 :: (ne_binary()) -> braintree_customer:customer().
-fetch_or_create_customer(BillingId) ->
- lager:debug("requesting braintree customer ~s", [BillingId]),
- try braintree_customer:find(BillingId) of
+fetch_or_create_customer(AccountId) ->
+ lager:debug("requesting braintree customer ~s", [AccountId]),
+ try braintree_customer:find(AccountId) of
Customer -> Customer
catch
throw:{not_found, _} ->
- lager:debug("creating new braintree customer ~s", [BillingId]),
- braintree_customer:create(BillingId)
+ lager:debug("creating new braintree customer ~s", [AccountId]),
+ braintree_customer:create(AccountId)
end.
%%--------------------------------------------------------------------
@@ -1,69 +0,0 @@
-%%%-------------------------------------------------------------------
-%%% @copyright (C) 2012, VoIP, INC
-%%% @doc
-%%%
-%%% @end
-%%% @contributors
-%%%-------------------------------------------------------------------
--module(wh_service_invoice).
-
--include_lib("whistle_services/src/whistle_services.hrl").
-
--export([sync/1]).
-
--record(wh_invoice, {account_id = undefined
- ,account_db = undefined
- ,billing_id = undefined
- ,service_items = wh_service_items:empty() :: wh_service_items:items()
- ,service_plans = wh_service_plans:empty() :: wh_service_plans:plans()
- ,services = wh_services:empty()
- }).
-
-%%--------------------------------------------------------------------
-%% @public
-%% @doc
-%% Create a collection of billable items and envoke the bookkeeper
-%% to actualize any billing changes.
-%% @end
-%%--------------------------------------------------------------------
--spec sync/1 :: (ne_binary()) -> #wh_invoice{}.
-sync(Account) ->
- #wh_invoice{billing_id=BillingId, service_items=ServiceItems} = create(Account),
- wh_bookkeeper_braintree:sync(ServiceItems, BillingId).
-
-%%--------------------------------------------------------------------
-%% @public
-%% @doc
-%% Create a new invoice for a given account. An invoice is a collection
-%% of service plans (provided by multiple vendors) applied to the
-%% services the account is currently consuming to generate a collection
-%% of items.
-%% @end
-%%--------------------------------------------------------------------
--spec create/1 :: (ne_binary()) -> #wh_invoice{}.
-create(Account) ->
- Routines = [fun(I) -> I#wh_invoice{account_id=wh_util:format_account_id(Account, raw)
- ,account_db=wh_util:format_account_id(Account, encoded)}
- end
- ,fun(#wh_invoice{account_db=AccountDb, account_id=AccountId}=I) ->
- case couch_mgr:open_doc(AccountDb, AccountId) of
- {ok, JObj} ->
- BillingId = wh_json:get_ne_value(<<"billing_id">>, JObj, AccountId),
- lager:debug("using billing id ~s for account ~s", [BillingId, AccountId]),
- I#wh_invoice{billing_id=BillingId};
- {error, _R} ->
- lager:debug("unable to open account definition for ~s (using account as billing id): ~p", [AccountId, _R]),
- I#wh_invoice{billing_id=AccountId}
- end
- end
- ,fun(#wh_invoice{account_id=AccountId}=I) ->
- I#wh_invoice{services=wh_services:fetch(AccountId)}
- end
- ,fun(#wh_invoice{billing_id=BillingId}=I) ->
- I#wh_invoice{service_plans=wh_service_plans:fetch(BillingId)}
- end
- ,fun(#wh_invoice{service_plans=ServicePlans, services=Services}=I) ->
- I#wh_invoice{service_items=wh_service_plans:create_items(Services, ServicePlans)}
- end
- ],
- lists:foldl(fun(F, I) -> F(I) end, #wh_invoice{}, Routines).
@@ -7,4 +7,201 @@
%%%-------------------------------------------------------------------
-module(wh_service_invoices).
+-behaviour(gen_server).
+
+-export([start_link/0]).
+-export([run/0]).
+-export([init/1
+ ,handle_call/3
+ ,handle_cast/2
+ ,handle_info/2
+ ,terminate/2
+ ,code_change/3
+ ]).
+
-include_lib("whistle_services/src/whistle_services.hrl").
+
+-record(state, {}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+run() ->
+ maybe_sync_service().
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+init([]) ->
+ {ok, #state{}}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @spec handle_call(Request, From, State) ->
+%% {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_call(_Request, _From, State) ->
+ {reply, {error, not_implemented}, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @spec handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ lager:debug("whistle service invoices terminating: ~p", [_Reason]).
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+-spec maybe_sync_service/0 :: () -> wh_json:json_objects().
+maybe_sync_service() ->
+ ViewOptions = [{limit, 1}
+ ,include_docs
+ ],
+ case couch_mgr:get_results(?WH_SERVICES_DB, <<"services/dirty">>, ViewOptions) of
+ {error, _}=E -> E;
+ {ok, [JObj]} -> bump_modified(wh_json:get_value(<<"doc">>, JObj));
+ {ok, _} -> {error, no_dirty_services}
+ end.
+
+-spec bump_modified/1 :: (wh_json:json_object()) -> _.
+bump_modified(JObj) ->
+ AccountId = wh_json:get_value(<<"pvt_account_id">>, JObj),
+ UpdatedJObj = wh_json:set_value(<<"pvt_modified">>, wh_util:current_tstamp(), JObj),
+ case couch_mgr:save_doc(?WH_SERVICES_DB, UpdatedJObj) of
+ {error, _}=E ->
+ %% If we conflict or cant save the doc with a new modified timestamp
+ %% then another process is probably handling it, move on
+ E;
+ {ok, NewJObj} ->
+ %% If we can change the timestamp then (since the view requires the
+ %% modified time to be x mins in the past) we have gain exclusive
+ %% control for x mins.... good luck!
+ AccountId = wh_json:get_value(<<"pvt_account_id">>, NewJObj),
+ maybe_follow_billing_id(AccountId, NewJObj)
+ end.
+
+-spec maybe_follow_billing_id/2 :: (ne_binary(), wh_json:json_object()) -> wh_std_return().
+maybe_follow_billing_id(AccountId, JObj) ->
+ case get_billing_id(AccountId, JObj) of
+ AccountId -> sync_services(AccountId, JObj);
+ BillingId -> follow_billing_id(AccountId, BillingId)
+ end.
+
+-spec follow_billing_id/2 :: (ne_binary(), ne_binary()) -> wh_std_return().
+follow_billing_id(AccountId, BillingId) ->
+ case mark_dirty(BillingId) of
+ {ok, _} -> mark_clean(AccountId);
+ {error, _R}=E ->
+ lager:debug("unable to mark billing services ~s dirty: ~p", [BillingId, _R]),
+ E
+ end.
+
+-spec sync_services/2 :: (ne_binary(), wh_json:json_object()) -> wh_std_return().
+sync_services(AccountId, ServiceJObj) ->
+ ServiceItems = wh_service_plans:create_items(ServiceJObj),
+ %% TODO: support other bookkeepers...
+ try wh_bookkeeper_braintree:sync(ServiceItems, AccountId) of
+ _ -> mark_clean(AccountId)
+ catch
+ _E:R ->
+ lager:debug("unable to sync services(~p): ~p", [_E, R]),
+ {error, R}
+ end.
+
+-spec get_billing_id/2 :: (ne_binary(), wh_json:json_object()) -> ne_binary().
+get_billing_id(AccountId, JObj) ->
+ case wh_json:is_true(<<"pvt_reseller">>, JObj) of
+ true -> AccountId;
+ false -> wh_json:get_ne_value(<<"billing_id">>, JObj, AccountId)
+ end.
+
+-spec mark_dirty/1 :: (ne_binary()) -> wh_std_return().
+mark_dirty(AccountId) ->
+ case couch_mgr:open_doc(?WH_SERVICES_DB, AccountId) of
+ {error, _}=E -> E;
+ {ok, JObj} -> couch_mgr:save_doc(?WH_SERVICES_DB, wh_json:set_value(<<"pvt_dirty">>, true, JObj))
+ end.
+
+-spec mark_clean/1 :: (ne_binary()) -> wh_std_return().
+mark_clean(AccountId) ->
+ case couch_mgr:open_doc(?WH_SERVICES_DB, AccountId) of
+ {error, _}=E -> E;
+ {ok, JObj} -> couch_mgr:save_doc(?WH_SERVICES_DB, wh_json:set_value(<<"pvt_dirty">>, false, JObj))
+ end.
Oops, something went wrong.

0 comments on commit b9ebf76

Please sign in to comment.