Skip to content

Commit

Permalink
aggressively ensure the account definition aggregate exits and is in …
Browse files Browse the repository at this point in the history
…sync
  • Loading branch information
k-anderson committed Oct 20, 2011
1 parent 2079acb commit bdcb494
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 17 deletions.
28 changes: 23 additions & 5 deletions whistle_apps/apps/crossbar/src/modules/cb_accounts.erl
Expand Up @@ -159,13 +159,30 @@ handle_info({binding_fired, Pid, <<"v1_resource.execute.post.accounts">>, [RD, C
end),
{noreply, State};

handle_info({binding_fired, Pid, <<"v1_resource.execute.post.accounts">>, [RD, Context | [AccountId]=Params]}, State) ->
handle_info({binding_fired, Pid, <<"v1_resource.execute.post.accounts">>, [RD, #cb_context{doc=Doc}=Context | [AccountId]=Params]}, State) ->
spawn(fun() ->
crossbar_util:put_reqid(Context),
crossbar_util:binding_heartbeat(Pid),
Context1 = crossbar_doc:save(Context#cb_context{db_name=whapps_util:get_db_name(AccountId, encoded)}),
whapps_util:replicate_from_account(whapps_util:get_db_name(AccountId, unencoded), ?AGG_DB, ?AGG_FILTER),
Pid ! {binding_result, true, [RD, Context1, Params]}
%% this just got messy
%% since we are not replicating, the accounts rev and the account rev on
%% this doc can drift.... so set it to account save, then set it to
%% accounts for the final operation... good times
AccountDb = whapps_util:get_db_name(AccountId, encoded),
AccountsRev = wh_json:get_value(<<"_rev">>, Doc, <<>>),
case couch_mgr:lookup_doc_rev(AccountDb, AccountId) of
{ok, Rev} ->
case crossbar_doc:save(Context#cb_context{db_name=AccountDb
,doc=wh_json:set_value(<<"_rev">>, Rev, Doc)
}) of
#cb_context{resp_status=success, doc=Doc1}=Context1 ->
couch_mgr:ensure_saved(?AGG_DB, wh_json:set_value(<<"_rev">>, AccountsRev, Doc1)),
Pid ! {binding_result, true, [RD, Context1, Params]};
Else ->
Pid ! {binding_result, true, [RD, Else, Params]}
end;
_ ->
Pid ! {binding_result, true, [RD, crossbar_util:response_conflicting_docs(Context), Params]}
end
end),
{noreply, State};

Expand Down Expand Up @@ -703,7 +720,8 @@ create_new_account_db(#cb_context{doc=Doc}=Context) ->
#cb_context{resp_status=success}=Context1 ->
_ = crossbar_bindings:map(<<"account.created">>, Db),
couch_mgr:revise_docs_from_folder(Db, crossbar, "account"),
whapps_util:replicate_from_account(whapps_util:get_db_name(Db, unencoded), ?AGG_DB, ?AGG_FILTER),
couch_mgr:revise_doc_from_file(Db, crossbar, ?MAINTENANCE_VIEW_FILE),
couch_mgr:ensure_saved(?AGG_DB, Context1#cb_context.doc),
Context1;
Else ->
?LOG_SYS("Other PUT resp: ~s: ~p~n", [Else#cb_context.resp_status, Else#cb_context.doc]),
Expand Down
46 changes: 34 additions & 12 deletions whistle_apps/apps/crossbar/src/modules/cb_shared_auth.erl
Expand Up @@ -319,10 +319,12 @@ validate([], #cb_context{auth_doc=JObj, req_verb = <<"get">>}=Context, _) ->
Context#cb_context{resp_status=success
,resp_data={struct, [{<<"account">>, Account}
,{<<"user">>, User}]}};
{error, _} ->
{error, R} ->
?LOG("failed to get user for response ~p", [R]),
crossbar_util:response_db_fatal(Context)
end;
{error, _} ->
{error, R} ->
?LOG("failed to get account for response ~p", [R]),
crossbar_util:response_db_fatal(Context)
end;
validate(_, Context, _) ->
Expand Down Expand Up @@ -423,32 +425,50 @@ import_missing_data(RemoteData) ->
AccountId :: undefined | binary(),
Account :: undefined | json_object().
import_missing_account(undefined, _) ->
?LOG("shared auth reply did not define an account id"),
false;
import_missing_account(_, undefined) ->
?LOG("shared auth reply did not define an account definition"),
false;
import_missing_account(AccountId, Account) ->
%% check if the acount datbase exists
Db = whapps_util:get_db_name(AccountId, encoded),
case couch_mgr:db_exists(Db) of
%% if the account database exists make sure it has the account
%% definition, because when couch is acting up it can skip this
true ->
?LOG("remote account ~s alread exists locally", [AccountId]),
case couch_mgr:lookup_doc_rev(?AGG_DB, AccountId) of
{error, not_found} ->
?LOG("failed to locate account definition, forcing ~p creation", [AccountId]),
whapps_util:replicate_from_account(whapps_util:get_db_name(AccountId, unencoded), ?AGG_DB, ?AGG_FILTER);
?LOG("remote account db ~s alread exists locally", [AccountId]),
%% make sure the account definition is in the account, if not
%% use the one we got from shared auth
Event = <<"v1_resource.execute.post.accounts">>,
Doc = case couch_mgr:open_doc(Db, AccountId) of
{error, not_found} ->
?LOG("missing local account definition, creating from shared auth response"),
wh_json:delete_key(<<"_rev">>, Account);
{ok, JObj} ->
?LOG("account definition exists locally"),
JObj
end,
Payload = [undefined, #cb_context{doc=Doc, db_name=Db}, [[AccountId]]],
case crossbar_bindings:fold(Event, Payload) of
[_, #cb_context{resp_status=success} | _] ->
?LOG("udpated account definition"),
true;
_ ->
ok
end,
true;
?LOG("could not update account definition"),
false
end;
false ->
?LOG("remote account db ~s does not exist locally, creating", [AccountId]),
Event = <<"v1_resource.execute.put.accounts">>,
Doc = wh_json:delete_key(<<"_rev">>, Account),
Payload = [undefined, #cb_context{doc=Doc}, [[]]],
case crossbar_bindings:fold(Event, Payload) of
[_, #cb_context{resp_status=success} | _] ->
?LOG("imported account ~s", [AccountId]),
?LOG("imported account"),
true;
_ ->
?LOG("could not import account ~s", [AccountId]),
?LOG("could not import account"),
false
end
end.
Expand All @@ -465,8 +485,10 @@ import_missing_account(AccountId, Account) ->
UserId :: undefined | binary(),
User :: undefined | json_object().
import_missing_user(_, undefined, _) ->
?LOG("shared auth reply did not define an user id"),
false;
import_missing_user(_, _, undefined) ->
?LOG("shared auth reply did not define an user object"),
false;
import_missing_user(AccountId, UserId, User) ->
Db = whapps_util:get_db_name(AccountId, encoded),
Expand Down

0 comments on commit bdcb494

Please sign in to comment.