Skip to content

Commit

Permalink
KAZOO-1445: correct bug in whistle_couch cache flush
Browse files Browse the repository at this point in the history
  • Loading branch information
k-anderson committed Sep 25, 2013
1 parent 14eacc2 commit 7ec2577
Showing 1 changed file with 89 additions and 76 deletions.
165 changes: 89 additions & 76 deletions core/whistle_couch-1.0.0/src/couch_util.erl
Expand Up @@ -332,12 +332,12 @@ cache_db_doc(DbName, DocId, Doc) ->
wh_cache:store_local(?WH_COUCH_CACHE, {?MODULE, DbName, DocId}, Doc).

-spec flush_cache_doc(server(), ne_binary(), ne_binary(), wh_proplist()) -> 'ok'.
flush_cache_doc(#server{}=Conn, DbName, DocId, _Options) ->
wh_cache:erase_local(?WH_COUCH_CACHE, {?MODULE, Conn, DbName, DocId}).
flush_cache_doc(_, DbName, DocId, _Options) ->
wh_cache:erase_local(?WH_COUCH_CACHE, {?MODULE, DbName, DocId}).

flush_cache_docs() -> wh_cache:flush_local(?WH_COUCH_CACHE).
flush_cache_docs(DbName) ->
Filter = fun({?MODULE, _, DbName1, _}=K, _) when DbName1 =:= DbName ->
Filter = fun({?MODULE, DbName1, _}=K, _) when DbName1 =:= DbName ->
wh_cache:erase_local(?WH_COUCH_CACHE, K),
'true';
(_, _) -> 'false'
Expand Down Expand Up @@ -429,7 +429,9 @@ prepare_doc_for_del(Doc) ->
couchbeam_error().
do_ensure_saved(#db{}=Db, Doc, Opts) ->
case do_save_doc(Db, Doc, Opts) of
{'ok', _}=Saved -> Saved;
{'ok', JObj}=Ok ->
_ = maybe_publish_doc(Db, Doc, JObj),
Ok;
{'error', 'conflict'} ->
case do_fetch_rev(Db, wh_json:get_value(<<"_id">>, Doc, <<>>)) of
?NE_BINARY = Rev ->
Expand Down Expand Up @@ -459,10 +461,7 @@ do_save_doc(#db{}=Db, Docs, Options) when is_list(Docs) ->
do_save_doc(#db{}=Db, Doc, Options) ->
case ?RETRY_504(couchbeam:save_doc(Db, maybe_set_docid(Doc), Options)) of
{'ok', JObj}=Ok ->
_ = case couch_mgr:change_notice() of
'true' -> spawn(fun() -> publish_doc(Db, JObj) end);
'false' -> 'ok'
end,
_ = maybe_publish_doc(Db, Doc, JObj),
Ok;
Else -> Else
end.
Expand All @@ -486,31 +485,23 @@ maybe_set_docid(Doc) ->
do_save_docs(#db{}=Db, Docs, Options, Acc) ->
case catch(lists:split(?MAX_BULK_INSERT, Docs)) of
{'EXIT', _} ->
case ?RETRY_504(couchbeam:save_docs(Db, [maybe_set_docid(D) || D <- Docs], Options)) of
{'ok', Res} ->
JObjs = Res++Acc,
_ = case couch_mgr:change_notice() of
'true' -> publish_doc_change(Db, Docs, JObjs);
'false' -> 'ok'
end,
{'ok', JObjs};
PreparedDocs = [maybe_set_docid(D) || D <- Docs],
case ?RETRY_504(couchbeam:save_docs(Db, PreparedDocs, Options)) of
{'ok', JObjs} ->
_ = maybe_publish_docs(Db, PreparedDocs, JObjs),
{'ok', JObjs ++ Acc};
{'error', _}=E -> E
end;
{Save, Cont} ->
case ?RETRY_504(couchbeam:save_docs(Db, [maybe_set_docid(D) || D <- Save], Options)) of
{'ok', Res} -> do_save_docs(Db, Cont, Options, Res++Acc);
PreparedDocs = [maybe_set_docid(D) || D <- Save],
case ?RETRY_504(couchbeam:save_docs(Db, PreparedDocs, Options)) of
{'ok', JObjs} ->
_ = maybe_publish_docs(Db, PreparedDocs, JObjs),
do_save_docs(Db, Cont, Options, JObjs ++ Acc);
{'error', _}=E -> E
end
end.

publish_doc_change(Db, Docs, JObjs) ->
spawn(fun() ->
case lists:any(fun(Doc) -> wh_json:is_true(<<"_deleted">>, Doc) end, Docs) of
'true' -> publish_doc('deleted', Db, JObjs);
'false' -> [publish_doc(Db, JObj) || JObj <- JObjs]
end
end).

%% Attachment-related functions ------------------------------------------------
-spec fetch_attachment(server(), ne_binary(), ne_binary(), ne_binary()) ->
{'ok', binary()} |
Expand Down Expand Up @@ -568,14 +559,24 @@ do_stream_attachment(#db{}=Db, DocId, AName, Caller) ->
{'ok', wh_json:object()} |
couchbeam_error().
do_put_attachment(#db{}=Db, DocId, AName, Contents, Options) ->
?RETRY_504(couchbeam:put_attachment(Db, DocId, AName, Contents, Options)).
case ?RETRY_504(couchbeam:put_attachment(Db, DocId, AName, Contents, Options)) of
{'ok', JObj}=Ok ->
maybe_publish_doc(Db, wh_json:from_list([{<<"_id">>, DocId}]), JObj),
Ok;
Else -> Else
end.

-spec do_del_attachment(couchbeam_db(), ne_binary(), ne_binary(), wh_proplist()) ->
{'ok', wh_json:object()} |
couchbeam_error().
do_del_attachment(#db{}=Db, DocId, AName, Options) ->
Doc = wh_util:to_binary(http_uri:encode(wh_util:to_list(DocId))),
?RETRY_504(couchbeam:delete_attachment(Db, Doc, AName, Options)).
case ?RETRY_504(couchbeam:delete_attachment(Db, Doc, AName, Options)) of
{'ok', JObj}=Ok ->
maybe_publish_doc(Db, wh_json:from_list([{<<"_id">>, DocId}]), JObj),
Ok;
Else -> Else
end.

%% Helpers for getting Couchbeam records ---------------------------------------

Expand Down Expand Up @@ -650,43 +651,61 @@ retry504s(Fun, Cnt) ->
OK -> OK
end.

-spec publish_doc(couchbeam_db(), wh_json:object()) -> 'ok'.
publish_doc(Db, Doc) ->
Action = case wh_json:is_true(<<"pvt_deleted">>, Doc) of
'true' -> 'deleted';
'false' ->
case wh_json:get_value(<<"_rev">>, Doc) of
<<"1-", _/binary>> -> 'created';
_Else -> 'edited'
end
end,
publish_doc(Action, Db, Doc).

-spec publish_doc(wapi_conf:action(), couchbeam_db(), wh_json:object() | wh_json:objects()) -> 'ok'.
publish_doc(Action, #db{name=DbName}, Doc) -> publish_docs(Action, wh_util:to_binary(DbName), Doc).
maybe_publish_docs(#db{name=DbName}=Db, Docs, JObjs) ->
case couch_mgr:change_notice() of
'true' ->
spawn(fun() ->
[wh_cache:erase_local(?WH_COUCH_CACHE
,{?MODULE, DbName, doc_id(Doc)})
|| Doc <- Docs
],
[publish_doc(Db, Doc, JObj)
|| {Doc, JObj} <- lists:zip(Docs, JObjs)
,should_publish_doc(Doc)
]
end);
'false' -> 'ok'
end.

publish_docs(Action, Db, Doc) when not is_list(Doc) ->
publish_docs(Action, Db, [Doc]);
publish_docs(Action, Db, Docs) ->
_ = [publish(Action, Db, Doc) || Doc <- Docs],
'ok'.
-spec maybe_publish_doc(couchbeam_db(), wh_json:object(), wh_json:object()) -> 'ok'.
maybe_publish_doc(#db{name=DbName}=Db, Doc, JObj) ->
wh_cache:erase_local(?WH_COUCH_CACHE, {?MODULE, DbName, doc_id(Doc)}),
case couch_mgr:change_notice()
andalso should_publish_doc(Doc)
of
'true' -> spawn(fun() -> publish_doc(Db, Doc, JObj) end);
'false' -> 'ok'
end.

-spec publish(wapi_conf:action(), ne_binary(), wh_json:object()) ->
'ok' |
{'error', 'pool_full' | 'poolboy_fault'}.
publish(Action, Db, Doc) ->
should_publish_doc(Doc) ->
case doc_id(Doc) of
'undefined' -> 'ok';
<<"_design/", _/binary>> = _D -> 'ok';
Id -> publish(Action, Db, Doc, Id)
'undefined' -> 'false';
<<"_design/", _/binary>> = _D -> 'false';
_Else -> 'true'
end.

-spec publish_doc(couchbeam_db(), wh_json:object(), wh_json:object()) -> 'ok'.
publish_doc(#db{name=DbName}, Doc, JObj) ->
case wh_json:is_true(<<"pvt_deleted">>, Doc)
orelse wh_json:is_true(<<"_deleted">>, Doc)
of
'true' -> publish('deleted', DbName, Doc);
'false' ->
case wh_json:get_value(<<"_rev">>, JObj) of
<<"1-", _/binary>> ->
publish('created', DbName, JObj);
_Else ->
publish('edited', DbName, JObj)
end
end.

publish(Action, Db, Doc, Id) ->
Type = doc_type(Db, Doc),
publish(Action, Db, Doc) ->
Type = doc_type(Doc),
Id = doc_id(Doc),
Props =
[{<<"ID">>, Id}
,{<<"Type">>, Type}
,{<<"Database">>, Db}
,{<<"Database">>, wh_util:to_binary(Db)}
,{<<"Rev">>, doc_rev(Doc)}
,{<<"Account-ID">>, doc_acct_id(Db, Doc)}
,{<<"Date-Modified">>, wh_json:get_binary_value(<<"pvt_created">>, Doc)}
Expand All @@ -698,27 +717,21 @@ publish(Action, Db, Doc, Id) ->
whapps_util:amqp_pool_send(Props, Fun).

doc_rev(Doc) ->
case wh_json:get_value(<<"_rev">>, Doc) of
'undefined' -> wh_json:get_value(<<"rev">>, Doc);
Rev -> Rev
end.
wh_json:get_first_defined([<<"_rev">>, <<"rev">>]
,Doc
,<<"undefined">>).

doc_id(Doc) ->
case wh_json:get_ne_value(<<"_id">>, Doc) of
'undefined' -> wh_json:get_value(<<"id">>, Doc);
Id -> Id
end.
wh_json:get_first_defined([<<"_id">>, <<"id">>]
,Doc
,<<"undefined">>).

doc_type(Db, Doc) ->
case wh_json:get_value(<<"pvt_type">>, Doc) of
'undefined' ->
{'ok', D} = couch_mgr:open_cache_doc(Db, doc_id(Doc)),
wh_json:get_binary_value(<<"pvt_type">>, D, <<"undefined">>);
T -> wh_util:to_binary(T)
end.
doc_type(Doc) ->
wh_json:get_value(<<"pvt_type">>, Doc, <<"undefined">>).

doc_acct_id(Db, Doc) ->
case wh_json:get_value(<<"pvt_account_id">>, Doc) of
'undefined' -> wh_util:format_account_id(Db, 'raw');
Id -> Id
end.
wh_json:get_value(<<"pvt_account_id">>
,Doc
,wh_util:format_account_id(Db, 'raw')
).

0 comments on commit 7ec2577

Please sign in to comment.