Skip to content

Commit

Permalink
WHISTLE-884: handle uploading of rate docs by chunking them into grou…
Browse files Browse the repository at this point in the history
…ps of 2000
  • Loading branch information
James Aimonetti committed Jan 28, 2012
1 parent dda8aaa commit 682803a
Showing 1 changed file with 124 additions and 29 deletions.
153 changes: 124 additions & 29 deletions whistle_apps/apps/crossbar/src/modules/cb_rates.erl
Expand Up @@ -24,7 +24,10 @@
-define(PVT_TYPE, <<"rate">>).
-define(CB_LIST, <<"rates/crossbar_listing">>).

-define(UPLOAD_MIME_TYPES, [<<"text/csv">>, <<"text/comma-separated-values">>]).
-define(UPLOAD_MIME_TYPES, ["text/csv", "text/comma-separated-values"]).

%% Throttle how many docs we bulk insert to BigCouch
-define(MAX_BULK_INSERT, 2000).

%%%===================================================================
%%% API
Expand Down Expand Up @@ -56,9 +59,9 @@ start_link() ->
%% @end
%%--------------------------------------------------------------------
init(_) ->
couch_mgr:db_create(?WH_RATES_DB),
couch_mgr:revise_doc_from_file(?WH_RATES_DB, crossbar, "views/rates.json"),
bind_to_crossbar(),
_ = couch_mgr:db_create(?WH_RATES_DB),
_ = couch_mgr:revise_doc_from_file(?WH_RATES_DB, crossbar, "views/rates.json"),
_ = bind_to_crossbar(),
{ok, ok}.

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -128,9 +131,8 @@ handle_info({binding_fired, Pid, <<"v1_resource.validate.rates">>, [RD, Context
_ = crossbar_util:put_reqid(Context),
crossbar_util:binding_heartbeat(Pid),

?LOG("validating against db: ~s", [?WH_RATES_DB]),
Context1 = validate(Params, Context#cb_context{db_name=?WH_RATES_DB}),
?LOG("returning true and modified context"),

Pid ! {binding_result, true, [RD, Context1, Params]}
end),
{noreply, State};
Expand All @@ -139,13 +141,28 @@ handle_info({binding_fired, Pid, <<"v1_resource.execute.post.rates">>, [RD, Cont
spawn(fun() ->
_ = crossbar_util:put_reqid(Context),
crossbar_util:binding_heartbeat(Pid),
case Params of
[] ->
Pid ! {binding_result, true, [RD, Context, Params]};
[_] ->
Context1 = crossbar_doc:save(Context),
Pid ! {binding_result, true, [RD, Context1, Params]}
end
Context1 =
case Context#cb_context.doc of
{0, _} ->
?LOG("no rates to save"),
crossbar_util:response_invalid_data(wh_json:new(), Context);
{Count, Rates} when Count < 10 ->
?LOG("only ~b rates to try saving", [Count]),
crossbar_doc:save(Context#cb_context{doc=Rates});
{Count, Rates} ->
?LOG("there are ~b rates to save, responding with a 202 and saving in a spawn", [Count]),
_ = spawn(fun() ->
_ = crossbar_util:put_reqid(Context),
Now = erlang:now(),
_ = save_bulk_rates(Count, Rates),
Elapsed = timer:now_diff(erlang:now(), Now),
?LOG("it took ~b micro, ~b milli, ~b sec to save ~b rates", [Elapsed, Elapsed div 1000, Elapsed div 1000000, Count])
end),
crossbar_util:response_202(list_to_binary(["saving ", wh_util:to_list(Count), " rates from the uploaded document"]), Context);
_Rate ->
crossbar_doc:save(Context)
end,
Pid ! {binding_result, true, [RD, Context1, Params]}
end),
{noreply, State};

Expand Down Expand Up @@ -214,7 +231,7 @@ code_change(_OldVsn, State, _Extra) ->
%% for the keys we need to consume.
%% @end
%%--------------------------------------------------------------------
-spec bind_to_crossbar/0 :: () -> no_return().
-spec bind_to_crossbar/0 :: () -> 'ok' | {'error', 'exists'}.
bind_to_crossbar() ->
_ = crossbar_bindings:bind(<<"v1_resource.allowed_methods.rates">>),
_ = crossbar_bindings:bind(<<"v1_resource.resource_exists.rates">>),
Expand Down Expand Up @@ -285,13 +302,10 @@ content_types_accepted(_, Context) -> Context.
%%--------------------------------------------------------------------
-spec validate/2 :: (path_tokens(), #cb_context{}) -> #cb_context{}.
validate([], #cb_context{req_verb = <<"get">>}=Context) ->
?LOG("getting a list of rates"),
summary(Context);
validate([], #cb_context{req_verb = <<"put">>}=Context) ->
?LOG("putting a rate doc"),
create(Context);
validate([], #cb_context{req_verb = <<"post">>}=Context) ->
?LOG("checking uploaded file for valid csv"),
check_uploaded_file(Context);
validate([Id], #cb_context{req_verb = <<"get">>}=Context) ->
read(Id, Context);
Expand All @@ -314,12 +328,10 @@ create(#cb_context{req_data=Data}=Context) ->
{fail, Errors} ->
crossbar_util:response_invalid_data(Errors, Context);
{pass, JObj} ->
{JObj1, _} = lists:foldr(fun(F, {J, C}) ->
{F(J, C), C}
end, {JObj, Context}, ?PVT_FUNS),
{JObj1, _} = add_pvt_fields(JObj, Context),
Context#cb_context{doc=JObj1, resp_status=success}
end.

%%--------------------------------------------------------------------
%% @private
%% @doc
Expand Down Expand Up @@ -385,24 +397,107 @@ check_uploaded_file(#cb_context{req_files=[{_, File}|_]}=Context) ->
normalize_view_results(JObj, Acc) ->
[wh_json:get_value(<<"value">>, JObj)|Acc].

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Add any pvt_* fields to the json object
%% @end
%%--------------------------------------------------------------------
add_pvt_fields(JObjs) when is_list(JObjs) ->
[add_pvt_fields(JObj) || JObj <- JObjs];
add_pvt_fields(JObj) ->
{JObj1, _} = add_pvt_fields(JObj, undefined),
JObj1.

add_pvt_fields(JObjs, Context) when is_list(JObjs) ->
[add_pvt_fields(JObj, Context) || JObj <- JObjs];
add_pvt_fields(JObj, Context) ->
lists:foldr(fun(F, {J, C}) ->
{F(J, C), C}
end, {JObj, Context}, ?PVT_FUNS).

%%--------------------------------------------------------------------
%% @private
%% @doc
%% These are the pvt funs that add the necessary pvt fields to every
%% instance
%% @end
%%--------------------------------------------------------------------
-spec add_pvt_type/2 :: (wh_json:json_object(), #cb_context{}) -> wh_json:json_object().
-spec add_pvt_type/2 :: (wh_json:json_object(), #cb_context{} | 'undefined') -> wh_json:json_object().
add_pvt_type(JObj, _) ->
wh_json:set_value(<<"pvt_type">>, ?PVT_TYPE, JObj).

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert the file, based on content-type, to rate documents
%% @end
%%--------------------------------------------------------------------
-spec convert_file/3 :: (nonempty_string(), ne_binary(), #cb_context{}) -> #cb_context{}.
convert_file("text/csv", FileContents, Context) ->
{ok, Rates} = csv_to_rates(FileContents),
Context#cb_context{doc=Rates}.

{ok, {Count, Rates}} = csv_to_rates(FileContents),
case Count of
0 -> crossbar_util:response(error, <<"no usable rates found">>, 400, [], Context);
_ -> Context#cb_context{resp_status=success, doc={Count, Rates}}
end;
convert_file(ContentType, _, Context) ->
crossbar_util:response(error, list_to_binary(["unknown content type: ", ContentType]), Context).

-spec csv_to_rates/1 :: (ne_binary()) -> {'ok', {integer(), wh_json:json_objects()}}.
csv_to_rates(CSV) ->
ecsv:process_csv_binary_with(CSV, fun process_row/2).

process_row(Row, Acc) ->
?LOG("row: ~p", [Row]),
ecsv:process_csv_binary_with(CSV, fun process_row/2, {0, []}).

-spec process_row/2 :: ([string(),...], {integer(), wh_json:json_objects()}) -> {integer(), wh_json:json_objects()}.
process_row([Prefix, ISO, Desc, InternalCost, Rate], {Cnt, RateDocs}=Acc) ->
case catch wh_util:to_integer(Prefix) of
{'EXIT', _} -> Acc;
_ ->
Prefix1 = wh_util:to_binary(Prefix),
ISO1 = strip_quotes(wh_util:to_binary(ISO)),
Desc1 = strip_quotes(wh_util:to_binary(Desc)),
InternalCost1 = wh_util:to_binary(InternalCost),
Rate1 = wh_util:to_binary(Rate),

%% The idea here is the more expensive rate will have a higher CostF
%% and decrement it from the weight so it has a lower weight #
%% meaning it should be more likely used
CostF = trunc(wh_util:to_float(InternalCost) * 100),

{Cnt+1, [add_pvt_fields(
wh_json:from_list([{<<"_id">>, list_to_binary([ISO1, "-", Prefix])}
,{<<"prefix">>, <<"+", Prefix1/binary>>}
,{<<"iso_country_code">>, ISO1}
,{<<"description">>, Desc1}
,{<<"rate_name">>, Desc1}
,{<<"rate_cost">>, Rate1}
,{<<"rate_increment">>, 60}
,{<<"rate_minimum">>, 60}
,{<<"rate_surcharge">>, 0}
,{<<"internal_rate_cost">>, InternalCost1}
,{<<"weight">>, byte_size(Prefix1) * 10 - CostF}

,{<<"options">>, []}
,{<<"routes">>, [<<"^\\+", (wh_util:to_binary(Prefix1))/binary, "(\\d*)$">>]}
]))
| RateDocs]}
end;
process_row(_, Acc) ->
Acc.

-spec strip_quotes/1 :: (ne_binary()) -> ne_binary().
strip_quotes(Bin) ->
binary:replace(Bin, [<<"\"">>, <<"\'">>], <<>>, [global]).

-spec save_bulk_rates/2 :: (pos_integer(), wh_json:json_objects()) -> no_return().
save_bulk_rates(Count, Rates) ->
save_bulk_rates(Count, Rates, Count, []).

%% lacking a lists:splice, we chunk it up ourselves
-spec save_bulk_rates/4 :: (pos_integer(), wh_json:json_objects(), non_neg_integer(), wh_json:json_objects()) -> no_return().
save_bulk_rates(_, [], _, Acc) ->
couch_mgr:save_docs(?WH_RATES_DB, Acc);
save_bulk_rates(Count, Rates, 0, Acc) ->
couch_mgr:save_docs(?WH_RATES_DB, Acc),
save_bulk_rates(Count, Rates, ?MAX_BULK_INSERT, []);
save_bulk_rates(Count, [R|Rates], Left, Acc) ->
save_bulk_rates(Count, Rates, Left-1, [R|Acc]).

0 comments on commit 682803a

Please sign in to comment.