Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

1219 lines (1093 sloc) 44.26 kb
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_db).
-export([open/2,open_int/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]).
-export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]).
-export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
-export([set_revs_limit/2,get_revs_limit/1]).
-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]).
-export([enum_docs/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
-export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]).
-export([set_security/2,get_security/1]).
-export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]).
-export([check_is_admin/1, check_is_reader/1, get_doc_count/1]).
-export([reopen/1, make_doc/5]).
-include("couch_db.hrl").
start_link(DbName, Filepath, Options) ->
case open_db_file(Filepath, Options) of
{ok, Fd} ->
{ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName,
Filepath, Fd, Options}, []),
unlink(Fd),
gen_server:call(UpdaterPid, get_db);
Else ->
Else
end.
open_db_file(Filepath, Options) ->
case couch_file:open(Filepath, Options) of
{ok, Fd} ->
{ok, Fd};
{error, enoent} ->
% couldn't find file. is there a compact version? This can happen if
% crashed during the file switch.
case couch_file:open(Filepath ++ ".compact") of
{ok, Fd} ->
?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
ok = file:rename(Filepath ++ ".compact", Filepath),
ok = couch_file:sync(Filepath),
{ok, Fd};
{error, enoent} ->
{not_found, no_db_file}
end;
Error ->
Error
end.
create(DbName, Options) ->
couch_server:create(DbName, Options).
% this is for opening a database for internal purposes like the replicator
% or the view indexer. it never throws a reader error.
open_int(DbName, Options) ->
couch_server:open(DbName, Options).
% this should be called anytime an http request opens the database.
% it ensures that the http userCtx is a valid reader
open(DbName, Options) ->
case couch_server:open(DbName, Options) of
{ok, Db} ->
try
check_is_reader(Db),
{ok, Db}
catch
throw:Error ->
close(Db),
throw(Error)
end;
Else -> Else
end.
reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
{ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity),
case NewFd =:= Fd of
true ->
{ok, NewDb#db{user_ctx = UserCtx}};
false ->
erlang:demonitor(OldRef),
NewRef = erlang:monitor(process, NewFd),
{ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}}
end.
ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
ok = gen_server:call(Pid, full_commit, infinity),
{ok, StartTime}.
ensure_full_commit(Db, RequiredSeq) ->
#db{main_pid=Pid, instance_start_time=StartTime} = Db,
ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity),
{ok, StartTime}.
close(#db{fd_monitor=RefCntr}) ->
erlang:demonitor(RefCntr).
open_ref_counted(MainPid, OpenedPid) ->
gen_server:call(MainPid, {open_ref_count, OpenedPid}).
is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) ->
case erlang:process_info(Db#db.fd, monitored_by) of
undefined ->
true;
{monitored_by, Pids} ->
(Pids -- [Db#db.main_pid, whereis(couch_stats_collector)]) =:= []
end;
is_idle(_Db) ->
false.
monitor(#db{main_pid=MainPid}) ->
erlang:monitor(process, MainPid).
start_compact(#db{main_pid=Pid}) ->
{ok, _} = gen_server:call(Pid, start_compact),
ok.
delete_doc(Db, Id, Revisions) ->
DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
{ok, [Result]} = update_docs(Db, DeletedDocs, []),
{ok, Result}.
open_doc(Db, IdOrDocInfo) ->
open_doc(Db, IdOrDocInfo, []).
open_doc(Db, Id, Options) ->
increment_stat(Db, {couchdb, database_reads}),
case open_doc_int(Db, Id, Options) of
{ok, #doc{deleted=true}=Doc} ->
case lists:member(deleted, Options) of
true ->
apply_open_options({ok, Doc},Options);
false ->
{not_found, deleted}
end;
Else ->
apply_open_options(Else,Options)
end.
apply_open_options({ok, Doc},Options) ->
apply_open_options2(Doc,Options);
apply_open_options(Else,_Options) ->
Else.
apply_open_options2(Doc,[]) ->
{ok, Doc};
apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc,
[{atts_since, PossibleAncestors}|Rest]) ->
RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors),
apply_open_options2(Doc#doc{atts=[A#att{data=
if AttPos>RevPos -> Data; true -> stub end}
|| #att{revpos=AttPos,data=Data}=A <- Atts]}, Rest);
apply_open_options2(Doc,[_|Rest]) ->
apply_open_options2(Doc,Rest).
find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
0;
find_ancestor_rev_pos(_DocRevs, []) ->
0;
find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
case lists:member({RevPos, RevId}, AttsSinceRevs) of
true ->
RevPos;
false ->
find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
end.
open_doc_revs(Db, Id, Revs, Options) ->
increment_stat(Db, {couchdb, database_reads}),
[{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
{ok, [apply_open_options(Result, Options) || Result <- Results]}.
% Each returned result is a list of tuples:
% {Id, MissingRevs, PossibleAncestors}
% if no revs are missing, it's omitted from the results.
get_missing_revs(Db, IdRevsList) ->
Results = get_full_doc_infos(Db, [Id1 || {Id1, _Revs} <- IdRevsList]),
{ok, find_missing(IdRevsList, Results)}.
find_missing([], []) ->
[];
find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of
[] ->
find_missing(RestIdRevs, RestLookupInfo);
MissingRevs ->
#doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
% Find the revs that are possible parents of this rev
PossibleAncestors =
lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
% this leaf is a "possible ancenstor" of the missing
% revs if this LeafPos lessthan any of the missing revs
case lists:any(fun({MissingPos, _}) ->
LeafPos < MissingPos end, MissingRevs) of
true ->
[{LeafPos, LeafRevId} | Acc];
false ->
Acc
end
end, [], LeafRevs),
[{Id, MissingRevs, PossibleAncestors} |
find_missing(RestIdRevs, RestLookupInfo)]
end;
find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
[{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)].
get_doc_info(Db, Id) ->
case get_full_doc_info(Db, Id) of
{ok, DocInfo} ->
{ok, couch_doc:to_doc_info(DocInfo)};
Else ->
Else
end.
% returns {ok, DocInfo} or not_found
get_full_doc_info(Db, Id) ->
[Result] = get_full_doc_infos(Db, [Id]),
Result.
get_full_doc_infos(Db, Ids) ->
couch_btree:lookup(Db#db.id_tree, Ids).
increment_update_seq(#db{main_pid=Pid}) ->
gen_server:call(Pid, increment_update_seq).
purge_docs(#db{main_pid=Pid}, IdsRevs) ->
gen_server:call(Pid, {purge_docs, IdsRevs}).
get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
get_update_seq(#db{update_seq=Seq})->
Seq.
get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})->
PurgeSeq.
get_last_purged(#db{header=#db_header{purged_docs=nil}}) ->
{ok, []};
get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) ->
couch_file:pread_term(Fd, PurgedPointer).
get_doc_count(Db) ->
case couch_btree:full_reduce(Db#db.id_tree) of
{ok, {OCount, _, _}} ->
{ok, OCount};
{ok, {NCount, _, _, _}} ->
{ok, NCount}
end.
get_db_info(Db) ->
#db{fd=Fd,
header=#db_header{disk_version=DiskVersion},
compactor_pid=Compactor,
update_seq=SeqNum,
name=Name,
id_tree=FullDocBtree,
instance_start_time=StartTime,
committed_update_seq=CommittedUpdateSeq} = Db,
{ok, Size} = couch_file:bytes(Fd),
{ok, {Count, DelCount, Conflicts, DataSize}} =
case couch_btree:full_reduce(FullDocBtree) of
{ok, {_,_,_,_}} = New -> New;
{ok, {C,De,Da}} -> {ok, {C,De,0,Da}}
end,
InfoList = [
{db_name, Name},
{doc_count, Count},
{doc_del_count, DelCount},
{conflicts_count, Conflicts},
{update_seq, SeqNum},
{purge_seq, couch_db:get_purge_seq(Db)},
{compact_running, Compactor/=nil},
{disk_size, Size},
{other, {[{data_size, DataSize}]}},
{instance_start_time, StartTime},
{disk_format_version, DiskVersion},
{committed_update_seq, CommittedUpdateSeq}
],
{ok, InfoList}.
get_design_docs(#db{name = <<"shards/", _/binary>> = ShardName}) ->
{_, Ref} = spawn_monitor(fun() ->
exit(fabric:design_docs(mem3:dbname(ShardName)))
end),
receive {'DOWN', Ref, _, _, Response} ->
Response
end;
get_design_docs(#db{id_tree=Btree}=Db) ->
{ok, _, Docs} = couch_view:fold(
#view{btree=Btree},
fun(#full_doc_info{deleted = true}, _Reds, AccDocs) ->
{ok, AccDocs};
(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) ->
{ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []),
{ok, [Doc | AccDocs]};
(_, _Reds, AccDocs) ->
{stop, AccDocs}
end,
[], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]),
{ok, Docs}.
check_is_admin(#db{user_ctx=#user_ctx{name=Name,roles=Roles}}=Db) ->
{Admins} = get_admins(Db),
AdminRoles = [<<"_admin">> | couch_util:get_value(<<"roles">>, Admins, [])],
AdminNames = couch_util:get_value(<<"names">>, Admins,[]),
case AdminRoles -- Roles of
AdminRoles -> % same list, not an admin role
case AdminNames -- [Name] of
AdminNames -> % same names, not an admin
throw({unauthorized, <<"You are not a db or server admin.">>});
_ ->
ok
end;
_ ->
ok
end.
check_is_reader(#db{user_ctx=#user_ctx{name=Name,roles=Roles}=UserCtx}=Db) ->
case (catch check_is_admin(Db)) of
ok -> ok;
_ ->
{Readers} = get_readers(Db),
ReaderRoles = couch_util:get_value(<<"roles">>, Readers,[]),
WithAdminRoles = [<<"_admin">> | ReaderRoles],
ReaderNames = couch_util:get_value(<<"names">>, Readers,[]),
case ReaderRoles ++ ReaderNames of
[] -> ok; % no readers == public access
_Else ->
case WithAdminRoles -- Roles of
WithAdminRoles -> % same list, not an reader role
case ReaderNames -- [Name] of
ReaderNames -> % same names, not a reader
?LOG_DEBUG("Not a reader: UserCtx ~p vs Names ~p Roles ~p",[UserCtx, ReaderNames, WithAdminRoles]),
throw({unauthorized, <<"You are not authorized to access this db.">>});
_ ->
ok
end;
_ ->
ok
end
end
end.
get_admins(#db{security=SecProps}) ->
couch_util:get_value(<<"admins">>, SecProps, {[]}).
get_readers(#db{security=SecProps}) ->
couch_util:get_value(<<"readers">>, SecProps, {[]}).
get_security(#db{security=SecProps}) ->
{SecProps}.
set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
check_is_admin(Db),
ok = validate_security_object(NewSecProps),
ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity),
{ok, _} = ensure_full_commit(Db),
ok;
set_security(_, _) ->
throw(bad_request).
validate_security_object(SecProps) ->
Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}),
Readers = couch_util:get_value(<<"readers">>, SecProps, {[]}),
ok = validate_names_and_roles(Admins),
ok = validate_names_and_roles(Readers),
ok.
% validate user input
validate_names_and_roles({Props}) when is_list(Props) ->
case couch_util:get_value(<<"names">>,Props,[]) of
Ns when is_list(Ns) ->
[throw("names must be a JSON list of strings") ||N <- Ns, not is_binary(N)],
Ns;
_ -> throw("names must be a JSON list of strings")
end,
case couch_util:get_value(<<"roles">>,Props,[]) of
Rs when is_list(Rs) ->
[throw("roles must be a JSON list of strings") ||R <- Rs, not is_binary(R)],
Rs;
_ -> throw("roles must be a JSON list of strings")
end,
ok.
get_revs_limit(#db{revs_limit=Limit}) ->
Limit.
set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
check_is_admin(Db),
gen_server:call(Pid, {set_revs_limit, Limit}, infinity);
set_revs_limit(_Db, _Limit) ->
throw(invalid_revs_limit).
name(#db{name=Name}) ->
Name.
update_doc(Db, Doc, Options) ->
update_doc(Db, Doc, Options, interactive_edit).
update_doc(Db, Doc, Options, UpdateType) ->
case update_docs(Db, [Doc], Options, UpdateType) of
{ok, [{ok, NewRev}]} ->
{ok, NewRev};
{ok, [{{_Id, _Rev}, Error}]} ->
throw(Error);
{ok, [Error]} ->
throw(Error);
{ok, []} ->
% replication success
{Pos, [RevId | _]} = Doc#doc.revs,
{ok, {Pos, RevId}}
end.
update_docs(Db, Docs) ->
update_docs(Db, Docs, []).
% group_alike_docs groups the sorted documents into sublist buckets, by id.
% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
group_alike_docs(Docs) ->
Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs),
group_alike_docs(Sorted, []).
group_alike_docs([], Buckets) ->
lists:reverse(Buckets);
group_alike_docs([Doc|Rest], []) ->
group_alike_docs(Rest, [[Doc]]);
group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
[#doc{id=BucketId}|_] = Bucket,
case Doc#doc.id == BucketId of
true ->
% add to existing bucket
group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]);
false ->
% add to new bucket
group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
end.
validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
catch check_is_admin(Db);
validate_doc_update(#db{validate_doc_funs = undefined} = Db, Doc, Fun) ->
ValidationFuns = load_validation_funs(Db),
validate_doc_update(Db#db{validate_doc_funs = ValidationFuns}, Doc, Fun);
validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) ->
ok;
validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
ok;
validate_doc_update(Db, Doc, GetDiskDocFun) ->
DiskDoc = GetDiskDocFun(),
JsonCtx = couch_util:json_user_ctx(Db),
SecObj = get_security(Db),
try [case Fun(Doc, DiskDoc, JsonCtx, SecObj) of
ok -> ok;
Error -> throw(Error)
end || Fun <- Db#db.validate_doc_funs],
ok
catch
throw:Error ->
Error
end.
% to be safe, spawn a middleman here
load_validation_funs(#db{main_pid = Pid} = Db) ->
{_, Ref} = spawn_monitor(fun() ->
{ok, DesignDocs} = get_design_docs(Db),
exit({ok, lists:flatmap(fun(DesignDoc) ->
case couch_doc:get_validate_doc_fun(DesignDoc) of
nil ->
[];
Fun ->
[Fun]
end
end, DesignDocs)})
end),
receive
{'DOWN', Ref, _, _, {ok, Funs}} ->
gen_server:cast(Pid, {load_validation_funs, Funs}),
Funs;
{'DOWN', Ref, _, _, Reason} ->
?LOG_ERROR("could not load validation funs ~p", [Reason]),
throw(internal_server_error)
end.
prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
OldFullDocInfo, LeafRevsDict, AllowConflict) ->
case Revs of
[PrevRev|_] ->
case dict:find({RevStart, PrevRev}, LeafRevsDict) of
{ok, {Deleted, DiskSp, DiskRevs}} ->
case couch_doc:has_stubs(Doc) of
true ->
DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs),
Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
{validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2};
false ->
LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end,
{validate_doc_update(Db, Doc, LoadDiskDoc), Doc}
end;
error when AllowConflict ->
couch_doc:merge_stubs(Doc, #doc{}), % will generate error if
% there are stubs
{validate_doc_update(Db, Doc, fun() -> nil end), Doc};
error ->
{conflict, Doc}
end;
[] ->
% new doc, and we have existing revs.
% reuse existing deleted doc
if OldFullDocInfo#full_doc_info.deleted orelse AllowConflict ->
{validate_doc_update(Db, Doc, fun() -> nil end), Doc};
true ->
{conflict, Doc}
end
end.
prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
AccFatalErrors) ->
{AccPrepped, AccFatalErrors};
prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
[#doc{id=Id}|_]=DocBucket,
% no existing revs are known,
{PreppedBucket, AccErrors3} = lists:foldl(
fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
false -> ok
end,
case Revs of
{0, []} ->
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
{[Doc | AccBucket], AccErrors2};
Error ->
{AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]}
end;
_ ->
% old revs specified but none exist, a conflict
{AccBucket, [{{Id, Revs}, conflict} | AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
[PreppedBucket | AccPrepped], AccErrors3);
prep_and_validate_updates(Db, [DocBucket|RestBuckets],
[{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
Leafs = couch_key_tree:get_all_leafs(OldRevTree),
LeafRevsDict = dict:from_list([{{Start, RevId}, {Del, Ptr, Revs}} ||
{#leaf{deleted=Del, ptr=Ptr}, {Start, [RevId|_]}=Revs} <- Leafs]),
{PreppedBucket, AccErrors3} = lists:foldl(
fun(Doc, {Docs2Acc, AccErrors2}) ->
case prep_and_validate_update(Db, Doc, OldFullDocInfo,
LeafRevsDict, AllowConflict) of
{ok, Doc2} ->
{[Doc2 | Docs2Acc], AccErrors2};
{Error, #doc{id=Id,revs=Revs}} ->
% Record the error
{Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
[PreppedBucket | AccPrepped], AccErrors3).
update_docs(Db, Docs, Options) ->
update_docs(Db, Docs, Options, interactive_edit).
prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
Errors2 = [{{Id, {Pos, Rev}}, Error} ||
{#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors],
{lists:reverse(AccPrepped), lists:reverse(Errors2)};
prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) ->
case OldInfo of
not_found ->
{ValidatedBucket, AccErrors3} = lists:foldl(
fun(Doc, {AccPrepped2, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
false -> ok
end,
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
{[Doc | AccPrepped2], AccErrors2};
Error ->
{AccPrepped2, [{Doc, Error} | AccErrors2]}
end
end,
{[], AccErrors}, Bucket),
prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
{ok, #full_doc_info{rev_tree=OldTree}} ->
NewRevTree = lists:foldl(
fun(NewDoc, AccTree) ->
{NewTree, _} = couch_key_tree:merge(AccTree,
couch_db:doc_to_tree(NewDoc), Db#db.revs_limit),
NewTree
end,
OldTree, Bucket),
Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
{ValidatedBucket, AccErrors3} =
lists:foldl(
fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) ->
case dict:find({Pos, RevId}, LeafRevsFullDict) of
{ok, {Start, Path}} ->
% our unflushed doc is a leaf node. Go back on the path
% to find the previous rev that's on disk.
LoadPrevRevFun = fun() ->
make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
end,
case couch_doc:has_stubs(Doc) of
true ->
DiskDoc = LoadPrevRevFun(),
Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
GetDiskDocFun = fun() -> DiskDoc end;
false ->
Doc2 = Doc,
GetDiskDocFun = LoadPrevRevFun
end,
case validate_doc_update(Db, Doc2, GetDiskDocFun) of
ok ->
{[Doc2 | AccValidated], AccErrors2};
Error ->
{AccValidated, [{Doc, Error} | AccErrors2]}
end;
_ ->
% this doc isn't a leaf or already exists in the tree.
% ignore but consider it a success.
{AccValidated, AccErrors2}
end
end,
{[], AccErrors}, Bucket),
prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo,
[ValidatedBucket | AccPrepped], AccErrors3)
end.
new_revid(#doc{body=Body,revs={OldStart,OldRevs},
atts=Atts,deleted=Deleted}) ->
case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M =/= <<>>] of
Atts2 when length(Atts) =/= length(Atts2) ->
% We must have old style non-md5 attachments
?l2b(integer_to_list(couch_util:rand32()));
Atts2 ->
OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end,
couch_util:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2]))
end.
new_revs([], OutBuckets, IdRevsAcc) ->
{lists:reverse(OutBuckets), IdRevsAcc};
new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
{NewBucket, IdRevsAcc3} = lists:mapfoldl(
fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)->
NewRevId = new_revid(Doc),
{Doc#doc{revs={Start+1, [NewRevId | RevIds]}},
[{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
end, IdRevsAcc, Bucket),
new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
check_dup_atts(#doc{atts=Atts}=Doc) ->
Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
check_dup_atts2(Atts2),
Doc.
check_dup_atts2([#att{name=N}, #att{name=N} | _]) ->
throw({bad_request, <<"Duplicate attachments">>});
check_dup_atts2([_ | Rest]) ->
check_dup_atts2(Rest);
check_dup_atts2(_) ->
ok.
update_docs(Db, Docs, Options, replicated_changes) ->
increment_stat(Db, {couchdb, database_writes}),
DocBuckets = group_alike_docs(Docs),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
(#doc{atts=Atts}) ->
Atts /= []
end, Docs) of
true ->
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
ExistingDocs = get_full_doc_infos(Db, Ids),
{DocBuckets2, DocErrors} =
prep_and_validate_replicated_updates(Db, DocBuckets, ExistingDocs, [], []),
DocBuckets3 = [Bucket || [_|_]=Bucket <- DocBuckets2]; % remove empty buckets
false ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
|| Doc <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
update_docs(Db, Docs, Options, interactive_edit) ->
increment_stat(Db, {couchdb, database_writes}),
AllOrNothing = lists:member(all_or_nothing, Options),
% go ahead and generate the new revision ids for the documents.
% separate out the NonRep documents from the rest of the documents
{Docs2, NonRepDocs} = lists:foldl(
fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->
case Id of
<<?LOCAL_DOC_PREFIX, _/binary>> ->
{DocsAcc, [Doc | NonRepDocsAcc]};
Id->
{[Doc | DocsAcc], NonRepDocsAcc}
end
end, {[], []}, Docs),
DocBuckets = group_alike_docs(Docs2),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
true;
(#doc{atts=Atts}) ->
Atts /= []
end, Docs2) of
true ->
% lookup the doc by id and get the most recent
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
ExistingDocInfos = get_full_doc_infos(Db, Ids),
{DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
DocBuckets, ExistingDocInfos, AllOrNothing, [], []),
% strip out any empty buckets
DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped];
false ->
PreCommitFailures = [],
DocBuckets2 = DocBuckets
end,
if (AllOrNothing) and (PreCommitFailures /= []) ->
{aborted, lists:map(
fun({{Id,{Pos, [RevId|_]}}, Error}) ->
{{Id, {Pos, RevId}}, Error};
({{Id,{0, []}}, Error}) ->
{{Id, {0, <<>>}}, Error}
end, PreCommitFailures)};
true ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
DocBuckets3 = [[
doc_flush_atts(set_new_att_revpos(
check_dup_atts(Doc)), Db#db.fd)
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
{ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
{ok, lists:map(
fun(#doc{id=Id,revs={Pos, RevIds}}) ->
{ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict),
Result
end, Docs)}
end.
% Returns the first available document on disk. Input list is a full rev path
% for the doc.
make_first_doc_on_disk(_Db, _Id, _Pos, []) ->
nil;
make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #doc{}} | RestPath]) ->
make_first_doc_on_disk(Db, Id, Pos-1, RestPath);
make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) ->
make_first_doc_on_disk(Db, Id, Pos - 1, RestPath);
make_first_doc_on_disk(Db, Id, Pos, [{_, #leaf{deleted=IsDel, ptr=Sp}} |_]=DocPath) ->
Revs = [Rev || {Rev, _} <- DocPath],
make_doc(Db, Id, IsDel, Sp, {Pos, Revs}).
set_commit_option(Options) ->
CommitSettings = {
[true || O <- Options, O==full_commit orelse O==delay_commit],
couch_config:get("couchdb", "delayed_commits", "false")
},
case CommitSettings of
{[true], _} ->
Options; % user requested explicit commit setting, do not change it
{_, "true"} ->
Options; % delayed commits are enabled, do nothing
{_, "false"} ->
[full_commit|Options];
{_, Else} ->
?LOG_ERROR("[couchdb] delayed_commits setting must be true/false, not ~p",
[Else]),
[full_commit|Options]
end.
collect_results(Pid, MRef, ResultsAcc) ->
receive
{result, Pid, Result} ->
collect_results(Pid, MRef, [Result | ResultsAcc]);
{done, Pid} ->
{ok, ResultsAcc};
{retry, Pid} ->
retry;
{'DOWN', MRef, _, _, Reason} ->
exit(Reason)
end.
write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets,
NonRepDocs, Options0) ->
Options = set_commit_option(Options0),
MergeConflicts = lists:member(merge_conflicts, Options),
FullCommit = lists:member(full_commit, Options),
MRef = erlang:monitor(process, Pid),
try
Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
case collect_results(Pid, MRef, []) of
{ok, Results} -> {ok, Results};
retry ->
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
Pid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
case collect_results(Pid, MRef, []) of
{ok, Results} -> {ok, Results};
retry -> throw({update_error, compaction_retry})
end
end
after
erlang:demonitor(MRef, [flush])
end.
set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) ->
% already commited to disk, do not set new rev
Att;
(Att) ->
Att#att{revpos=RevPos+1}
end, Atts)}.
doc_flush_atts(Doc, Fd) ->
Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
check_md5(_NewSig, <<>>) -> ok;
check_md5(Sig, Sig) -> ok;
check_md5(_, _) -> throw(md5_mismatch).
flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd ->
% already written to our file, nothing to write
Att;
flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5,
disk_len=InDiskLen} = Att) ->
{NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} =
couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
check_md5(IdentityMd5, InMd5),
Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=InDiskLen};
flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
with_stream(Fd, Att, fun(OutputStream) ->
couch_stream:write(OutputStream, Data)
end);
flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) ->
with_stream(Fd, Att, fun(OutputStream) ->
% Fun(MaxChunkSize, WriterFun) must call WriterFun
% once for each chunk of the attachment,
Fun(4096,
% WriterFun({Length, Binary}, State)
% WriterFun({0, _Footers}, State)
% Called with Length == 0 on the last time.
% WriterFun returns NewState.
fun({0, Footers}, _) ->
F = mochiweb_headers:from_binary(Footers),
case mochiweb_headers:get_value("Content-MD5", F) of
undefined ->
ok;
Md5 ->
{md5, base64:decode(Md5)}
end;
({_Length, Chunk}, _) ->
couch_stream:write(OutputStream, Chunk)
end, ok)
end);
flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) ->
with_stream(Fd, Att, fun(OutputStream) ->
write_streamed_attachment(OutputStream, Fun, AttLen)
end).
% From RFC 2616 3.6.1 - Chunked Transfer Coding
%
% In other words, the origin server is willing to accept
% the possibility that the trailer fields might be silently
% discarded along the path to the client.
%
% I take this to mean that if "Trailers: Content-MD5\r\n"
% is present in the request, but there is no Content-MD5
% trailer, we're free to ignore this inconsistency and
% pretend that no Content-MD5 exists.
with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) ->
{ok, OutputStream} = case (Enc =:= identity) andalso
couch_util:compressible_att_type(Type) of
true ->
CompLevel = list_to_integer(
couch_config:get("attachments", "compression_level", "0")
),
couch_stream:open(Fd, gzip, [{compression_level, CompLevel}]);
_ ->
couch_stream:open(Fd)
end,
ReqMd5 = case Fun(OutputStream) of
{md5, FooterMd5} ->
case InMd5 of
md5_in_footer -> FooterMd5;
_ -> InMd5
end;
_ ->
InMd5
end,
{StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
couch_stream:close(OutputStream),
check_md5(IdentityMd5, ReqMd5),
{AttLen, DiskLen, NewEnc} = case Enc of
identity ->
case {Md5, IdentityMd5} of
{Same, Same} ->
{Len, IdentityLen, identity};
_ ->
{Len, IdentityLen, gzip}
end;
gzip ->
case {Att#att.att_len, Att#att.disk_len} of
{AL, DL} when AL =:= undefined orelse DL =:= undefined ->
% Compressed attachment uploaded through the standalone API.
{Len, Len, gzip};
{AL, DL} ->
% This case is used for efficient push-replication, where a
% compressed attachment is located in the body of multipart
% content-type request.
{AL, DL, gzip}
end
end,
Att#att{
data={Fd,StreamInfo},
att_len=AttLen,
disk_len=DiskLen,
md5=Md5,
encoding=NewEnc
}.
write_streamed_attachment(_Stream, _F, 0) ->
ok;
write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 ->
Bin = read_next_chunk(F, LenLeft),
ok = couch_stream:write(Stream, Bin),
write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
read_next_chunk(F, _) when is_function(F, 0) ->
F();
read_next_chunk(F, LenLeft) when is_function(F, 1) ->
F(lists:min([LenLeft, 16#2000])).
enum_docs_since_reduce_to_count(Reds) ->
couch_btree:final_reduce(
fun couch_db_updater:btree_by_seq_reduce/2, Reds).
enum_docs_reduce_to_count(Reds) ->
case couch_btree:final_reduce(
fun couch_db_updater:btree_by_id_reduce/2, Reds) of
{C1, _, _} -> C1;
{C2, _, _, _} -> C2
end.
changes_since(Db, StartSeq, Fun, Acc) ->
changes_since(Db, StartSeq, Fun, [], Acc).
changes_since(Db, StartSeq, Fun, Options, Acc) ->
Wrapper = fun(FullDocInfo, _Offset, Acc2) ->
case FullDocInfo of
#full_doc_info{} ->
DocInfo = couch_doc:to_doc_info(FullDocInfo);
#doc_info{} ->
DocInfo = FullDocInfo
end,
Fun(DocInfo, Acc2)
end,
{ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, Wrapper,
Acc, [{start_key, couch_util:to_integer(StartSeq) + 1} | Options]),
{ok, AccOut}.
count_changes_since(Db, SinceSeq) ->
{ok, Changes} =
couch_btree:fold_reduce(Db#db.seq_tree,
fun(_SeqStart, PartialReds, 0) ->
{ok, couch_btree:final_reduce(Db#db.seq_tree, PartialReds)}
end,
0, [{start_key, SinceSeq + 1}]),
Changes.
enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
{ok, LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
{ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
enum_docs(Db, InFun, InAcc, Options) ->
{ok, LastReduce, OutAcc} = couch_view:fold(
#view{btree=Db#db.id_tree}, InFun, InAcc, Options),
{ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
%%% Internal function %%%
open_doc_revs_int(Db, IdRevs, Options) ->
Ids = [Id || {Id, _Revs} <- IdRevs],
LookupResults = get_full_doc_infos(Db, Ids),
lists:zipwith(
fun({Id, Revs}, Lookup) ->
case Lookup of
{ok, #full_doc_info{rev_tree=RevTree}} ->
{FoundRevs, MissingRevs} =
case Revs of
all ->
{couch_key_tree:get_all_leafs(RevTree), []};
_ ->
case lists:member(latest, Options) of
true ->
couch_key_tree:get_key_leafs(RevTree, Revs);
false ->
couch_key_tree:get(RevTree, Revs)
end
end,
FoundResults =
lists:map(fun({Value, {Pos, [Rev|_]}=FoundRevPath}) ->
case Value of
?REV_MISSING ->
% we have the rev in our list but know nothing about it
{{not_found, missing}, {Pos, Rev}};
#leaf{deleted=IsDeleted, ptr=SummaryPtr} ->
{ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
end
end, FoundRevs),
Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
{ok, Results};
not_found when Revs == all ->
{ok, []};
not_found ->
{ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
end
end,
IdRevs, LookupResults).
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, _Options) ->
case couch_btree:lookup(Db#db.local_tree, [Id]) of
[{ok, {_, {Rev, BodyData}}}] ->
{ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}};
[not_found] ->
{not_found, missing}
end;
open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) ->
#rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo,
Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}),
{ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}};
open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
#doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} =
DocInfo = couch_doc:to_doc_info(FullDocInfo),
{[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath),
{ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}};
open_doc_int(Db, Id, Options) ->
case get_full_doc_info(Db, Id) of
{ok, FullDocInfo} ->
open_doc_int(Db, FullDocInfo, Options);
not_found ->
{not_found, missing}
end.
doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTree, Options) ->
case lists:member(revs_info, Options) of
false -> [];
true ->
{[{Pos, RevPath}],[]} =
couch_key_tree:get_full_key_paths(RevTree, [Rev]),
[{revs_info, Pos, lists:map(
fun({Rev1, #leaf{deleted=true}}) ->
{Rev1, deleted};
({Rev1, #leaf{deleted=false}}) ->
{Rev1, available};
({Rev1, ?REV_MISSING}) ->
{Rev1, missing}
end, RevPath)}]
end ++
case lists:member(conflicts, Options) of
false -> [];
true ->
case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of
[] -> [];
ConflictRevs -> [{conflicts, ConflictRevs}]
end
end ++
case lists:member(deleted_conflicts, Options) of
false -> [];
true ->
case [Rev1 || #rev_info{rev=Rev1,deleted=true} <- RestInfo] of
[] -> [];
DelConflictRevs -> [{deleted_conflicts, DelConflictRevs}]
end
end ++
case lists:member(local_seq, Options) of
false -> [];
true -> [{local_seq, Seq}]
end.
read_doc(#db{fd=Fd}, OldStreamPointer) when is_tuple(OldStreamPointer) ->
% 09 UPGRADE CODE
couch_stream:old_read_term(Fd, OldStreamPointer);
read_doc(#db{fd=Fd}, Pos) ->
couch_file:pread_term(Fd, Pos).
doc_to_tree(#doc{revs={Start, RevIds}}=Doc) ->
[Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)),
{Start - length(RevIds) + 1, Tree}.
doc_to_tree_simple(Doc, [RevId]) ->
[{RevId, Doc, []}];
doc_to_tree_simple(Doc, [RevId | Rest]) ->
[{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
{BodyData, Atts} =
case Bp of
nil ->
{[], []};
_ ->
{ok, {BodyData0, Atts0}} = read_doc(Db, Bp),
{BodyData0,
lists:map(
fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
#att{name=Name,
type=Type,
att_len=AttLen,
disk_len=DiskLen,
md5=Md5,
revpos=RevPos,
data={Fd,Sp},
encoding=
case Enc of
true ->
% 0110 UPGRADE CODE
gzip;
false ->
% 0110 UPGRADE CODE
identity;
_ ->
Enc
end
};
({Name,Type,Sp,AttLen,RevPos,Md5}) ->
#att{name=Name,
type=Type,
att_len=AttLen,
disk_len=AttLen,
md5=Md5,
revpos=RevPos,
data={Fd,Sp}};
({Name,{Type,Sp,AttLen}}) ->
#att{name=Name,
type=Type,
att_len=AttLen,
disk_len=AttLen,
md5= <<>>,
revpos=0,
data={Fd,Sp}}
end, Atts0)}
end,
#doc{
id = Id,
revs = RevisionPath,
body = BodyData,
atts = Atts,
deleted = Deleted
}.
increment_stat(#db{is_sys_db = true}, _Stat) ->
ok;
increment_stat(#db{}, Stat) ->
couch_stats_collector:increment(Stat).
Jump to Line
Something went wrong with that request. Please try again.