Skip to content

Commit

Permalink
WHISTLE-844: update crossbar_doc to take an options proplist on save …
Browse files Browse the repository at this point in the history
…to passthrough to whistle_couch, and to configure whether to send the doc update to AMQP, and update cb_rates to just save the docs via crossbar_doc and not do the chunking
  • Loading branch information
James Aimonetti committed Jan 28, 2012
1 parent 35ee94b commit c8025e3
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 48 deletions.
89 changes: 57 additions & 32 deletions whistle_apps/apps/crossbar/src/crossbar_doc.erl
Expand Up @@ -8,9 +8,16 @@
%%%-------------------------------------------------------------------
-module(crossbar_doc).

-export([load/2, load_from_file/2, load_merge/3, load_view/3, load_view/4, load_attachment/3, load_docs/2]).
-export([save/1, delete/1, delete/2, save_attachment/4, save_attachment/5, delete_attachment/3]).
-export([ensure_saved/1]).
-export([load/2, load_from_file/2, load_merge/3
,load_view/3, load_view/4
,load_attachment/3, load_docs/2
]).
-export([save/1, save/2
,delete/1, delete/2
,save_attachment/4, save_attachment/5
,delete_attachment/3
]).
-export([ensure_saved/1, ensure_saved/2]).
-export([public_fields/1, private_fields/1, is_private_key/1]).
-export([rev_to_etag/1, current_doc_vsn/0]).

Expand Down Expand Up @@ -253,12 +260,16 @@ load_attachment(DocId, AName, #cb_context{db_name=DB}=Context) ->
%% @end
%%--------------------------------------------------------------------
-spec save/1 :: (#cb_context{}) -> #cb_context{}.
save(#cb_context{db_name=undefined}=Context) ->
-spec save/2 :: (#cb_context{}, proplist()) -> #cb_context{}.
save(#cb_context{}=Context) ->
save(Context, []).

save(#cb_context{db_name=undefined}=Context, _) ->
?LOG("DB undefined, cannot save"),
crossbar_util:response_db_missing(Context);
save(#cb_context{db_name=DB, doc=JObj, req_verb=Verb, resp_headers=RespHs}=Context) ->
save(#cb_context{db_name=DB, doc=JObj, req_verb=Verb, resp_headers=RespHs}=Context, Options) ->
JObj0 = update_pvt_parameters(JObj, Context),
case couch_mgr:save_doc(DB, JObj0) of
case couch_mgr:save_doc(DB, JObj0, Options) of
{error, db_not_reachable} ->
?LOG("failed to save json: db not reachable"),
crossbar_util:response_datastore_timeout(Context);
Expand All @@ -267,7 +278,7 @@ save(#cb_context{db_name=DB, doc=JObj, req_verb=Verb, resp_headers=RespHs}=Conte
crossbar_util:response_conflicting_docs(Context);
{ok, JObj1} when Verb =:= <<"put">> ->
?LOG("saved a put request, setting location headers"),
send_document_change(created, DB, JObj1),
send_document_change(created, DB, JObj1, Options),
Context#cb_context{doc=JObj1
,resp_status=success
,resp_headers=[{"Location", wh_json:get_value(<<"_id">>, JObj1)} | RespHs]
Expand All @@ -276,7 +287,7 @@ save(#cb_context{db_name=DB, doc=JObj, req_verb=Verb, resp_headers=RespHs}=Conte
};
{ok, JObj2} ->
?LOG("saved json doc"),
send_document_change(edited, DB, JObj2),
send_document_change(edited, DB, JObj2, Options),
Context#cb_context{doc=JObj2
,resp_status=success
,resp_data=public_fields(JObj2)
Expand All @@ -297,18 +308,22 @@ save(#cb_context{db_name=DB, doc=JObj, req_verb=Verb, resp_headers=RespHs}=Conte
%% @end
%%--------------------------------------------------------------------
-spec ensure_saved/1 :: (#cb_context{}) -> #cb_context{}.
ensure_saved(#cb_context{db_name=undefined}=Context) ->
-spec ensure_saved/2 :: (#cb_context{}, proplist()) -> #cb_context{}.
ensure_saved(#cb_context{}=Context) ->
ensure_saved(Context, []).

ensure_saved(#cb_context{db_name=undefined}=Context, _) ->
?LOG("DB undefined, cannot ensure save"),
crossbar_util:response_db_missing(Context);
ensure_saved(#cb_context{db_name=DB, doc=JObj, req_verb=Verb, resp_headers=RespHs}=Context) ->
ensure_saved(#cb_context{db_name=DB, doc=JObj, req_verb=Verb, resp_headers=RespHs}=Context, Options) ->
JObj0 = update_pvt_parameters(JObj, Context),
case couch_mgr:ensure_saved(DB, JObj0) of
case couch_mgr:ensure_saved(DB, JObj0, Options) of
{error, db_not_reachable} ->
?LOG("Failed to save json: db not reachable"),
crossbar_util:response_datastore_timeout(Context);
{ok, JObj1} when Verb =:= <<"put">> ->
?LOG("Saved a put request, setting location headers"),
send_document_change(created, DB, JObj1),
send_document_change(created, DB, JObj1, Options),
Context#cb_context{doc=JObj1
,resp_status=success
,resp_headers=[{"Location", wh_json:get_value(<<"_id">>, JObj1)} | RespHs]
Expand All @@ -317,7 +332,7 @@ ensure_saved(#cb_context{db_name=DB, doc=JObj, req_verb=Verb, resp_headers=RespH
};
{ok, JObj2} ->
?LOG("Saved json doc"),
send_document_change(edited, DB, JObj2),
send_document_change(edited, DB, JObj2, Options),
Context#cb_context{doc=JObj2
,resp_status=success
,resp_data=public_fields(JObj2)
Expand All @@ -328,27 +343,37 @@ ensure_saved(#cb_context{db_name=DB, doc=JObj, req_verb=Verb, resp_headers=RespH
Context
end.

-spec send_document_change/3 :: (wapi_conf:conf_action(), ne_binary(), wh_json:json_object() | wh_json:json_objects()) -> pid().
send_document_change(Action, Db, Docs) when is_list(Docs) ->
[send_document_change(Action, Db, Doc) || Doc <- Docs];
send_document_change(Action, Db, Doc) ->
-spec send_document_change/3 :: (wapi_conf:conf_action(), ne_binary(), wh_json:json_object() | wh_json:json_objects()) -> 'ok' | pid().
-spec send_document_change/4 :: (wapi_conf:conf_action(), ne_binary(), wh_json:json_object() | wh_json:json_objects(), proplist()) -> 'ok' | pid().
send_document_change(Action, Db, Docs) ->
send_document_change(Action, Db, Docs, []).

send_document_change(Action, Db, Docs, Options) when is_list(Docs) ->
[send_document_change(Action, Db, Doc, Options) || Doc <- Docs];
send_document_change(Action, Db, Doc, Options) ->
CallID = get(callid),
spawn(fun() ->
put(callid, CallID),
case wh_json:get_value(<<"_id">>, Doc) of
undefined ->
Id = wh_json:get_value(<<"id">>, Doc),
case wh_json:get_value(<<"error">>, Doc) of

case props:get_value(publish_doc, Options, true) of
true ->
spawn(fun() ->
put(callid, CallID),
case wh_json:get_value(<<"_id">>, Doc) of
undefined ->
{ok, Doc1} = couch_mgr:open_doc(Db, Id),
publish_doc(Action, Db, Doc1, Id);
_E ->
ok
end;
Id ->
publish_doc(Action, Db, Doc, Id)
end
end).
Id = wh_json:get_value(<<"id">>, Doc),
case wh_json:get_value(<<"error">>, Doc) of
undefined ->
{ok, Doc1} = couch_mgr:open_doc(Db, Id),
publish_doc(Action, Db, Doc1, Id);
_E ->
ok
end;
Id ->
publish_doc(Action, Db, Doc, Id)
end
end);
false ->
ok
end.

publish_doc(Action, Db, Doc, Id) ->
Type = wh_json:get_binary_value(<<"pvt_type">>, Doc, <<"undefined">>),
Expand Down
26 changes: 10 additions & 16 deletions whistle_apps/apps/crossbar/src/modules/cb_rates.erl
Expand Up @@ -26,9 +26,6 @@

-define(UPLOAD_MIME_TYPES, ["text/csv", "text/comma-separated-values"]).

%% Throttle how many docs we bulk insert to BigCouch
-define(MAX_BULK_INSERT, 2000).

%%%===================================================================
%%% API
%%%===================================================================
Expand Down Expand Up @@ -131,7 +128,13 @@ handle_info({binding_fired, Pid, <<"v1_resource.validate.rates">>, [RD, Context
_ = crossbar_util:put_reqid(Context),
crossbar_util:binding_heartbeat(Pid),

Context1 = validate(Params, Context#cb_context{db_name=?WH_RATES_DB}),
Context1 = try
validate(Params, Context#cb_context{db_name=?WH_RATES_DB})
catch
_E:R ->
?LOG("failed to validate: ~p:~p", [_E, R]),
crossbar_util:response(fatal, <<"exception encountered">>, 500, wh_json:new(), Context)
end,

Pid ! {binding_result, true, [RD, Context1, Params]}
end),
Expand All @@ -154,7 +157,7 @@ handle_info({binding_fired, Pid, <<"v1_resource.execute.post.rates">>, [RD, Cont
_ = spawn(fun() ->
_ = crossbar_util:put_reqid(Context),
Now = erlang:now(),
_ = save_bulk_rates(Rates),
crossbar_doc:save(Context#cb_context{doc=Rates}, [{publish_doc, false}]),
Elapsed = timer:now_diff(erlang:now(), Now),
?LOG("it took ~b micro, ~b milli, ~b sec to save ~b rates", [Elapsed, Elapsed div 1000, Elapsed div 1000000, Count])
end),
Expand Down Expand Up @@ -380,7 +383,8 @@ summary(Context) ->
%% @end
%%--------------------------------------------------------------------
-spec check_uploaded_file/1 :: (#cb_context{}) -> #cb_context{}.
check_uploaded_file(#cb_context{req_files=[{_, File}|_]}=Context) ->
check_uploaded_file(#cb_context{req_files=[{_Name, File}|_]}=Context) ->
?LOG("checking file ~s", [_Name]),
case wh_json:get_value(<<"contents">>, File) of
undefined ->
Context#cb_context{resp_status=error};
Expand Down Expand Up @@ -493,16 +497,6 @@ process_row(_, Acc) ->
strip_quotes(Bin) ->
binary:replace(Bin, [<<"\"">>, <<"\'">>], <<>>, [global]).

-spec save_bulk_rates/1 :: (wh_json:json_objects()) -> no_return().
save_bulk_rates(Rates) ->
case catch(lists:split(?MAX_BULK_INSERT, Rates)) of
{'EXIT', _} ->
couch_mgr:save_docs(?WH_RATES_DB, Rates);
{Save, Cont} ->
spawn(fun() -> couch_mgr:save_docs(?WH_RATES_DB, Save) end),
save_bulk_rates(Cont)
end.

-spec constrain_weight/1 :: (integer()) -> 1..100.
constrain_weight(X) when X =< 0 -> 1;
constrain_weight(X) when X >= 100 -> 100;
Expand Down

0 comments on commit c8025e3

Please sign in to comment.