Browse files

Making doc_info and body info simpler, for use by couchstore

Simplifying the storage to only store a single body, with content_meta
type to determine if json or raw binary (and why it's not json), and if compressed.

We are now snappy compressing all persisted erlang terms, doc bodies must be
compressed in a separate step.

Removing couch_btree_nif, as we will be using a native updater in ep-engine.

Change-Id: I6822bbc271861748049bca5ebac7ca0f5fe816d4
Reviewed-on: http://review.couchbase.org/13090
Reviewed-by: Filipe David Borba Manana <fdmanana@gmail.com>
Reviewed-by: Damien Katz <damien@couchbase.com>
Tested-by: Damien Katz <damien@couchbase.com>
  • Loading branch information...
1 parent 112ce49 commit 0baa8d8370278c84793fbe469d12695994c774f1 Damien Katz committed Feb 7, 2012
Showing with 227 additions and 2,579 deletions.
  1. +0 −2 configure.ac
  2. +1 −2 src/couch_set_view/src/couch_set_view_group.erl
  3. +2 −0 src/couch_set_view/src/couch_set_view_updater.erl
  4. +1 −1 src/couch_set_view/src/couch_set_view_util.erl
  5. +0 −2 src/couchdb/Makefile.am
  6. +2 −4 src/couchdb/couch_btree.erl
  7. +4 −7 src/couchdb/couch_btree_copy.erl
  8. +0 −93 src/couchdb/couch_btree_nif.erl
  9. +2 −2 src/couchdb/couch_changes.erl
  10. +7 −86 src/couchdb/couch_compress.erl
  11. +23 −43 src/couchdb/couch_db.erl
  12. +14 −9 src/couchdb/couch_db.hrl
  13. +23 −33 src/couchdb/couch_db_updater.erl
  14. +82 −57 src/couchdb/couch_doc.erl
  15. +4 −21 src/couchdb/couch_file.erl
  16. +21 −26 src/couchdb/couch_httpd_db.erl
  17. +2 −2 src/couchdb/couch_query_servers.erl
  18. +2 −2 src/couchdb/couch_replication_manager.erl
  19. +9 −9 src/couchdb/couch_replicator.erl
  20. +1 −1 src/couchdb/couch_replicator_utils.erl
  21. +4 −5 src/couchdb/couch_view_group.erl
  22. +2 −2 src/couchdb/couch_view_merger.erl
  23. +2 −30 src/couchdb/priv/Makefile.am
  24. +0 −40 src/couchdb/priv/btree_nif/config_static.h
  25. +0 −913 src/couchdb/priv/btree_nif/couch_btree.c
  26. +0 −143 src/couchdb/priv/btree_nif/couch_btree.h
  27. +0 −605 src/couchdb/priv/btree_nif/couch_btree_nif.c
  28. +0 −80 src/couchdb/priv/btree_nif/couch_btree_nif.erl
  29. +0 −215 src/couchdb/priv/btree_nif/couch_file_read.c
  30. +0 −31 src/couchdb/priv/btree_nif/couch_nif_write.c
  31. +0 −90 src/couchdb/priv/btree_nif/win32/couch_btree_nif.vcxproj.tpl.in
  32. +0 −1 src/couchdb/priv/btree_nif/win32/msbuild.bat.tpl.in
  33. +4 −4 src/snappy/snappy.erl
  34. +3 −2 test/etap/010-file-basics.t
  35. +1 −2 test/etap/020-btree-basics.t
  36. +2 −3 test/etap/022-btree-copy.t
  37. +3 −3 test/etap/030-doc-from-json.t
  38. +4 −4 test/etap/031-doc-to-json.t
  39. +1 −2 test/etap/200-view-group-no-db-leaks.t
  40. +1 −2 test/etap/201-view-group-shutdown.t
View
2 configure.ac
@@ -562,8 +562,6 @@ if test x${IS_WINDOWS} = xTRUE; then
AC_CONFIG_FILES([src/couchdb/priv/icu_driver/win32/couch_icu_driver.vcxproj.tpl])
AC_CONFIG_FILES([src/couchdb/priv/couch_ejson_compare/win32/msbuild.bat.tpl])
AC_CONFIG_FILES([src/couchdb/priv/couch_ejson_compare/win32/couch_ejson_compare.vcxproj.tpl])
- AC_CONFIG_FILES([src/couchdb/priv/btree_nif/win32/msbuild.bat.tpl])
- AC_CONFIG_FILES([src/couchdb/priv/btree_nif/win32/couch_btree_nif.vcxproj.tpl])
AC_CONFIG_FILES([src/ejson/win32/msbuild.bat.tpl])
AC_CONFIG_FILES([src/ejson/win32/ejson.vcxproj.tpl])
AC_CONFIG_FILES([src/snappy/win32/msbuild.bat.tpl])
View
3 src/couch_set_view/src/couch_set_view_group.erl
@@ -1099,8 +1099,7 @@ init_group(Fd, #set_view_group{def_lang = Lang, views = Views} = Group, IndexHea
First, Rest)
end,
BtreeOptions = [
- {chunk_threshold, ?BTREE_CHUNK_THRESHOLD},
- {compression, snappy}
+ {chunk_threshold, ?BTREE_CHUNK_THRESHOLD}
],
{ok, IdBtree} = couch_btree:open(
IdBtreeState, Fd, [{reduce, IdTreeReduce} | BtreeOptions]),
View
2 src/couch_set_view/src/couch_set_view_updater.erl
@@ -42,6 +42,8 @@
deleted_ids = 0
}).
+
+-spec update(pid()|nil, #set_view_group{}, list) -> no_return().
update(Owner, Group, FileName) ->
#set_view_group{
set_name = SetName,
View
2 src/couch_set_view/src/couch_set_view_util.erl
@@ -138,7 +138,7 @@ get_ddoc_ids_with_sig(SetName, ViewGroupSig) ->
[], DDocList).
-design_doc_to_set_view_group(SetName, #doc{id = Id, json = {Fields}}) ->
+design_doc_to_set_view_group(SetName, #doc{id = Id, body = {Fields}}) ->
Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>),
{DesignOptions} = couch_util:get_value(<<"options">>, Fields, {[]}),
{RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
View
2 src/couchdb/Makefile.am
@@ -34,7 +34,6 @@ source_files = \
couch_app.erl \
couch_auth_cache.erl \
couch_btree.erl \
- couch_btree_nif.erl \
couch_btree_copy.erl \
couch_changes.erl \
couch_compaction_daemon.erl \
@@ -113,7 +112,6 @@ compiled_files = \
couch_app.beam \
couch_auth_cache.beam \
couch_btree.beam \
- couch_btree_nif.beam \
couch_btree_copy.beam \
couch_changes.beam \
couch_compaction_daemon.beam \
View
6 src/couchdb/couch_btree.erl
@@ -43,8 +43,6 @@ set_options(Bt, [{less, Less}|Rest]) ->
set_options(Bt#btree{less=Less}, Rest);
set_options(Bt, [{reduce, Reduce}|Rest]) ->
set_options(Bt#btree{reduce=Reduce}, Rest);
-set_options(Bt, [{compression, Comp}|Rest]) ->
- set_options(Bt#btree{compression=Comp}, Rest);
set_options(Bt, [{chunk_threshold, Threshold}|Rest]) ->
set_options(Bt#btree{chunk_threshold = Threshold}, Rest).
@@ -390,14 +388,14 @@ get_node(#btree{fd = Fd}, NodePos) ->
{ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos),
{NodeType, NodeList}.
-write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
+write_node(#btree{fd = Fd} = Bt, NodeType, NodeList) ->
% split up nodes into smaller sizes
NodeListList = chunkify(Bt, NodeList),
% now write out each chunk and return the KeyPointer pairs for those nodes
ResultList = [
begin
{ok, Pointer, Size} = couch_file:append_term(
- Fd, {NodeType, ANodeList}, [{compression, Comp}]),
+ Fd, {NodeType, ANodeList}),
{LastKey, _} = lists:last(ANodeList),
SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
{LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
View
11 src/couchdb/couch_btree_copy.erl
@@ -21,7 +21,6 @@
fd,
before_kv_write = {fun(Item, Acc) -> {Item, Acc} end, []},
filter = fun(_) -> true end,
- compression = ?DEFAULT_COMPRESSION,
chunk_threshold,
nodes = dict:from_list([{1, []}]),
cur_level = 1,
@@ -80,8 +79,6 @@ apply_options([{filter, Fun} | Rest], Acc) ->
apply_options(Rest, Acc#acc{filter = Fun});
apply_options([override | Rest], Acc) ->
apply_options(Rest, Acc);
-apply_options([{compression, Comp} | Rest], Acc) ->
- apply_options(Rest, Acc#acc{compression = Comp});
apply_options([{chunk_threshold, Threshold} | Rest], Acc) ->
apply_options(Rest, Acc#acc{chunk_threshold = Threshold}).
@@ -106,12 +103,12 @@ before_leaf_write(#acc{before_kv_write = {Fun, UserAcc0}} = Acc, KVs) ->
{NewKVs, Acc#acc{before_kv_write = {Fun, NewUserAcc}}}.
-write_leaf(#acc{fd = Fd, compression = Comp}, Node, Red) ->
- {ok, Pos, Size} = couch_file:append_term(Fd, Node, [{compression, Comp}]),
+write_leaf(#acc{fd = Fd}, Node, Red) ->
+ {ok, Pos, Size} = couch_file:append_term(Fd, Node),
{ok, {Pos, Red, Size}}.
-write_kp_node(#acc{fd = Fd, btree = Bt, compression = Comp}, NodeList) ->
+write_kp_node(#acc{fd = Fd, btree = Bt}, NodeList) ->
{ChildrenReds, ChildrenSize} = lists:foldr(
fun({_Key, {_P, Red, Sz}}, {AccR, AccSz}) ->
{[Red | AccR], Sz + AccSz}
@@ -123,7 +120,7 @@ write_kp_node(#acc{fd = Fd, btree = Bt, compression = Comp}, NodeList) ->
couch_btree:final_reduce(Bt, {[], ChildrenReds})
end,
{ok, Pos, Size} = couch_file:append_term(
- Fd, {kp_node, NodeList}, [{compression, Comp}]),
+ Fd, {kp_node, NodeList}),
{ok, {Pos, Red, ChildrenSize + Size}}.
View
93 src/couchdb/couch_btree_nif.erl
@@ -1,93 +0,0 @@
--module(couch_btree_nif).
--include("couch_db.hrl").
--export([query_modify_raw/3, query_modify_raw_native/3, write_response/3, init/0]).
--on_load(init/0).
-
-init() ->
- LibDir = case couch_config:get("couchdb", "util_driver_dir") of
- undefined ->
- filename:join(couch_util:priv_dir(), "lib");
- LibDir0 ->
- LibDir0
- end,
- erlang:load_nif(filename:join([LibDir, ?MODULE]), 0).
-
-get_rq_info(#db{filepath=Filepath,
- local_docs_btree = LocalDocs,
- docinfo_by_id_btree = DocInfoById,
- docinfo_by_seq_btree = DocInfoBySeq}, #btree{root = Root} = WhichTree) ->
- case WhichTree of
- DocInfoById ->
- % raw ascii collation, by_id reduce
- {0, 2, Filepath, Root};
- LocalDocs ->
- % raw ascii collation, no reduce
- {0, 0, Filepath, Root};
- DocInfoBySeq ->
- % numeric term collation, by_seq reduce (count)
- {1, 1, Filepath, Root};
- _ -> unsupported_btree
- end.
-
-format_newroot({P, Rbin, S}) ->
- {P, binary_to_term(iolist_to_binary([<<131>>, Rbin])), S}.
-
-response_loop(Ref, #btree{assemble_kv=Assemble} = Bt, Root, QueryResults) ->
- receive
- {Ref, not_modified} ->
- {ok, QueryResults, Bt#btree{root = Root}};
- {Ref, {new_root, NewRoot}} ->
- {ok, QueryResults, Bt#btree{root = format_newroot(NewRoot)}};
- {Ref, {ok, Key, Value}} ->
- response_loop(Ref, Bt, Root, [{ok, Assemble(binary_to_term(Key), binary_to_term(Value))}
- | QueryResults]);
- {Ref, {not_found, Key}} ->
- response_loop(Ref, Bt, Root, [{not_found, {binary_to_term(Key), nil}} | QueryResults]);
- {Ref, {error, Err}} ->
- {error, Err}
- end.
-
-action_type(fetch) -> 0;
-action_type(insert) -> 1;
-action_type(remove) -> 2.
-
-format_action(Action) ->
- case Action of
- {T,K} ->
- {action_type(T), term_to_binary(K)};
- {T,K,V} ->
- {action_type(T), term_to_binary(K), term_to_binary(V)}
- end.
-
-query_modify_raw(Db, Bt, SortedActions) ->
- case couch_config:get("couchdb", "btree_implementation", "native") of
- "native" ->
- try query_modify_raw_native(Db, Bt, SortedActions)
- catch
- % Fall back onto the normal updater if we can't use the NIF
- _Type:_Error ->
- couch_btree:query_modify_raw(Bt, SortedActions)
- end;
- "erlang" ->
- couch_btree:query_modify_raw(Bt, SortedActions);
- _ ->
- {error, unknown_btree_implementation}
- end.
-
-query_modify_raw_native(Db, #btree{fd = Fd} = Bt, SortedActions) ->
- {CompareType, ReduceType, Filename, Root} = get_rq_info(Db, Bt),
- DoActions = [format_action(Act) || Act <- SortedActions],
-
- case do_native_modify(DoActions, Filename, CompareType,
- ReduceType, Root, Fd) of
- {ok, Ref} ->
- response_loop(Ref, Bt, Root, []);
- ok ->
- {ok, [], Bt}
- end.
-
-do_native_modify(_, _, _, _, _, _) ->
- erlang:nif_error(nif_not_loaded).
-
-write_response(_, _, _) ->
- erlang:nif_error(nif_not_loaded).
View
4 src/couchdb/couch_changes.erl
@@ -117,7 +117,7 @@ os_filter_fun(FilterName, Style, Req, Db) ->
DesignId = <<"_design/", DName/binary>>,
DDoc = couch_db_frontend:couch_doc_open(Db, DesignId, [ejson_body]),
% validate that the ddoc has the filter fun
- #doc{json={Props}} = DDoc,
+ #doc{body={Props}} = DDoc,
couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
fun(Db2, DocInfo) ->
{ok, Doc} =
@@ -184,7 +184,7 @@ filter_view(ViewName, _Style, Db) ->
DesignId = <<"_design/", DName/binary>>,
{ok, DDoc} = couch_db_frontend:open_doc(Db, DesignId, [ejson_body]),
% validate that the ddoc has the filter fun
- #doc{json={Props}} = DDoc,
+ #doc{body={Props}} = DDoc,
couch_util:get_nested_json_value({Props}, [<<"views">>, VName]),
fun(Db2, DocInfo) ->
{ok, Doc} =
View
93 src/couchdb/couch_compress.erl
@@ -12,94 +12,15 @@
-module(couch_compress).
--export([compress/2, compress_bin/2, decompress/1, is_compressed/1]).
--export([get_compression_method/0]).
+-export([compress/1, decompress/1]).
-include("couch_db.hrl").
-% binaries compressed with snappy have their first byte set to this value
--define(SNAPPY_PREFIX, 1).
-% binaries that are a result of an erlang:term_to_binary/1,2 call have this
-% value as their first byte
--define(TERM_PREFIX, 131).
+compress(Bin) ->
+ {ok, CompressedBin} = snappy:compress(Bin),
+ CompressedBin.
-% If a term's binary representation is smaller then this threshold, don't
-% even attempt to compress it.
--define(SNAPPY_COMPRESS_THRESHOLD, 64).
-
-
-get_compression_method() ->
- case couch_config:get("couchdb", "file_compression") of
- undefined ->
- ?DEFAULT_COMPRESSION;
- Method1 ->
- case string:tokens(Method1, "_") of
- [Method] ->
- list_to_existing_atom(Method);
- [Method, Level] ->
- {list_to_existing_atom(Method), list_to_integer(Level)}
- end
- end.
-
-
-compress_bin(Bin, none) ->
- Bin;
-compress_bin(Bin, snappy) ->
- case byte_size(Bin) < ?SNAPPY_COMPRESS_THRESHOLD of
- true ->
- Bin;
- false ->
- try
- {ok, CompressedBin} = snappy:compress(Bin),
- case byte_size(CompressedBin) < byte_size(Bin) of
- true ->
- <<?SNAPPY_PREFIX, CompressedBin/binary>>;
- false ->
- Bin
- end
- catch exit:snappy_nif_not_loaded ->
- Bin
- end
- end.
-
-
-compress(Term, none) ->
- ?term_to_bin(Term);
-compress(Term, {deflate, Level}) ->
- term_to_binary(Term, [{minor_version, 1}, {compressed, Level}]);
-compress(Term, snappy) ->
- Bin = ?term_to_bin(Term),
- case byte_size(Bin) < ?SNAPPY_COMPRESS_THRESHOLD of
- true ->
- Bin;
- false ->
- try
- {ok, CompressedBin} = snappy:compress(Bin),
- case byte_size(CompressedBin) < byte_size(Bin) of
- true ->
- <<?SNAPPY_PREFIX, CompressedBin/binary>>;
- false ->
- Bin
- end
- catch exit:snappy_nif_not_loaded ->
- Bin
- end
- end.
-
-
-decompress(<<?SNAPPY_PREFIX, Rest/binary>>) ->
- {ok, TermBin} = snappy:decompress(Rest),
- binary_to_term(TermBin);
-decompress(<<?TERM_PREFIX, _/binary>> = Bin) ->
- binary_to_term(Bin).
-
-
-is_compressed(<<?SNAPPY_PREFIX, _/binary>>) ->
- true;
-is_compressed(<<?TERM_PREFIX, _/binary>>) ->
- true;
-is_compressed(<<"{", _/binary>>) ->
- false;
-is_compressed(Term) when not is_binary(Term) ->
- false.
+decompress(Bin) ->
+ {ok, DecompressedBin} = snappy:decompress(Bin),
+ DecompressedBin.
View
66 src/couchdb/couch_db.erl
@@ -172,7 +172,7 @@ apply_open_options2(Doc,[]) ->
apply_open_options2(Doc, [ejson_body | Rest]) ->
apply_open_options2(couch_doc:with_ejson_body(Doc), Rest);
apply_open_options2(Doc, [json_bin_body | Rest]) ->
-apply_open_options2(couch_doc:with_json_body(Doc), Rest);
+ apply_open_options2(couch_doc:with_json_body(Doc), Rest);
apply_open_options2(Doc,[_|Rest]) ->
apply_open_options2(Doc,Rest).
@@ -499,15 +499,16 @@ write_doc_bodies_retry_closed(Db, Docs) ->
write_doc_bodies(Db, Docs) ->
lists:map(
- fun(#doc{json = Body, binary = Binary} = Doc) ->
- Prepped = prep_doc_body_binary(Body, Binary),
+ fun(#doc{body = Body, content_meta = ContentMeta0} = Doc) ->
+ {Prepped, ContentMeta} = prep_doc_body_binary(Body, ContentMeta0),
case couch_file:append_binary_crc32(Db#db.fd, Prepped) of
{ok, BodyPtr, Size} ->
#doc_update_info{
id=Doc#doc.id,
rev=Doc#doc.rev,
deleted=Doc#doc.deleted,
body_ptr=BodyPtr,
+ content_meta=ContentMeta,
fd=Db#db.fd,
size=Size
};
@@ -517,30 +518,14 @@ write_doc_bodies(Db, Docs) ->
end,
Docs).
-prep_doc_body_binary(EJson, Binary) when is_tuple(EJson)->
- % convert ejson to json
- prep_doc_body_binary(?JSON_ENCODE(EJson), Binary);
-prep_doc_body_binary(Body, Binary) ->
- BodyCompressed = couch_compress:compress(Body, snappy),
- Size = iolist_size(BodyCompressed),
- case Binary of
- nil ->
- % we differentiate between a empty binary and no binary. Clients
- % might set a empty binary as the primary value.
- [<<0:1/integer, Size:31/integer>>, BodyCompressed];
- _ ->
- [<<1:1/integer, Size:31/integer>>, BodyCompressed, Binary]
- end.
-
-read_doc_body_binary(Iolist) ->
- {Len, Rest} = couch_util:split_iolist(Iolist, 4),
- case iolist_to_binary(Len) of
- <<0:1/integer, _Size:31/integer>> ->
- {couch_compress:decompress(iolist_to_binary(Rest)), nil};
- <<1:1/integer, Size:31/integer>> ->
- {Json, Binary} = couch_util:split_iolist(Rest, Size),
- {couch_compress:decompress(iolist_to_binary(Json)), Binary}
- end.
+prep_doc_body_binary(EJson, _ContentMeta) when is_tuple(EJson)->
+ % convert ejson to json binary, clear out ContentMeta, set to
+ % compressed json.
+ {couch_compress:compress(?JSON_ENCODE(EJson)),
+ ?CONTENT_META_JSON bor ?CONTENT_META_SNAPPY_COMPRESSED};
+prep_doc_body_binary(Body, ContentMeta) ->
+ % assume body is binary or iolist, and preserve ContentMeta
+ {Body, ContentMeta}.
enum_docs_since_reduce_to_count(Reds) ->
couch_btree:final_reduce(
@@ -646,14 +631,21 @@ handle_info(Msg, Db) ->
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of
[{ok, {_, BodyData}}] ->
- Doc = #doc{id=Id, json=BodyData},
+ Doc = #doc{id=Id, body=BodyData},
apply_open_options({ok, Doc}, Options);
[not_found] ->
{not_found, missing}
end;
-open_doc_int(Db, #doc_info{id=Id,deleted=IsDeleted,rev=RevInfo, body_ptr=Bp}=
- DocInfo, Options) ->
- Doc = make_doc(Db, Id, IsDeleted, Bp, RevInfo),
+open_doc_int(Db, #doc_info{id=Id,deleted=IsDeleted,rev=RevInfo, body_ptr=Bp,
+ content_meta=ContentMeta}=DocInfo, Options) ->
+ {ok, Body} = couch_file:pread_iolist(Db#db.fd, Bp),
+ Doc = #doc{
+ id = Id,
+ rev = RevInfo,
+ body = Body,
+ deleted = IsDeleted,
+ content_meta = ContentMeta
+ },
apply_open_options(
{ok, Doc#doc{meta=doc_meta_info(DocInfo, Options)}}, Options);
open_doc_int(Db, Id, Options) ->
@@ -671,18 +663,6 @@ doc_meta_info(#doc_info{local_seq=Seq}, Options) ->
end.
-make_doc(#db{fd = Fd}, Id, Deleted, Bp, Rev) ->
- {ok, Body} = couch_file:pread_iolist(Fd, Bp),
- {Json, Binary} = read_doc_body_binary(Body),
- #doc{
- id = Id,
- rev = Rev,
- json = Json,
- binary = Binary,
- deleted = Deleted
- }.
-
-
increment_stat(#db{options = Options}, Stat) ->
case lists:member(sys_db, Options) of
true ->
View
23 src/couchdb/couch_db.hrl
@@ -53,13 +53,21 @@
-define(LOG_ERROR(Format, Args), couch_log:error(Format, Args)).
+-define(CONTENT_META_JSON, 0).
+-define(CONTENT_META_INVALID_JSON, 1).
+-define(CONTENT_META_INVALID_JSON_KEY, 2).
+-define(CONTENT_META_NON_JSON_MODE, 3).
+
+-define(CONTENT_META_SNAPPY_COMPRESSED, (1 bsl 7)).
+
-record(doc_info,
{
id = <<"">>,
deleted = false,
local_seq,
rev = {0, <<>>},
body_ptr,
+ content_meta = 0, % should be 0-255 only.
size = 0
}).
@@ -69,6 +77,7 @@
deleted = false,
rev,
body_ptr,
+ content_meta = 0, % should be 0-255 only.
size = 0,
fd
}).
@@ -95,11 +104,9 @@
id = <<>>,
rev = {0, <<>>},
- % the json body object and metadata
- json = <<"{}">>,
-
- % the binary body, if not json
- binary = nil,
+ % the binary body
+ body = <<"{}">>,
+ content_meta = 0, % should be 0-255 only.
deleted = false,
@@ -126,7 +133,7 @@
% if the disk revision is incremented, then new upgrade logic will need to be
% added to couch_db_updater:init_db.
--define(LATEST_DISK_VERSION, 8).
+-define(LATEST_DISK_VERSION, 9).
-record(db_header,
{disk_version = ?LATEST_DISK_VERSION,
@@ -159,8 +166,7 @@
user_ctx = #user_ctx{},
waiting_delayed_commit = nil,
fsync_options = [],
- options = [],
- compression
+ options = []
}).
@@ -273,6 +279,5 @@
assemble_kv = fun(Key, Value) -> {Key, Value} end,
less = fun(A, B) -> A < B end,
reduce = nil,
- compression = ?DEFAULT_COMPRESSION,
chunk_threshold = 16#4ff
}).
View
56 src/couchdb/couch_db_updater.erl
@@ -64,9 +64,8 @@ handle_call(increment_update_seq, _From, Db) ->
couch_db_update_notifier:notify({updated, Db#db.name}),
{reply, {ok, Db2#db.update_seq}, Db2};
-handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
- {ok, Ptr, _} = couch_file:append_term(
- Db#db.fd, NewSec, [{compression, Comp}]),
+handle_call({set_security, NewSec}, _From, Db) ->
+ {ok, Ptr, _} = couch_file:append_term(Db#db.fd, NewSec),
Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
update_seq=Db#db.update_seq+1}),
ok = notify_db_updated(Db2),
@@ -81,8 +80,7 @@ handle_call({purge_docs, IdRevs}, _From, Db) ->
docinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq,
- header = Header = #db_header{purge_seq=PurgeSeq},
- compression = Comp
+ header = Header = #db_header{purge_seq=PurgeSeq}
} = Db,
DocLookups = couch_btree:lookup(DocInfoByIdBTree,
[Id || {Id, _Rev} <- IdRevs]),
@@ -112,8 +110,7 @@ handle_call({purge_docs, IdRevs}, _From, Db) ->
[], SeqsToRemove),
{ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
[], IdsToRemove),
- {ok, Pointer, _} = couch_file:append_term(
- Fd, IdRevsPurged, [{compression, Comp}]),
+ {ok, Pointer, _} = couch_file:append_term(Fd, IdRevsPurged),
Db2 = commit_data(
Db#db{
@@ -239,29 +236,31 @@ increment_filepath(FilePath) ->
string:join(lists:sublist(Tokens, length(Tokens) - 1) ++ [NumStr], ".").
btree_by_seq_split(#doc_info{id=Id, local_seq=Seq, rev=Rev,
- deleted=Deleted, body_ptr=Bp, size=Size}) ->
- {Seq, {Id, Rev, Bp, if Deleted -> 1; true -> 0 end, Size}}.
+ deleted=Deleted, body_ptr=Bp, content_meta=Meta, size=Size}) ->
+ {Seq, {Id, Rev, Bp, if Deleted -> 1; true -> 0 end, Meta, Size}}.
-btree_by_seq_join(Seq, {Id, Rev, Bp, Deleted, Size}) ->
+btree_by_seq_join(Seq, {Id, Rev, Bp, Deleted, Meta, Size}) ->
#doc_info{
id = Id,
local_seq = Seq,
rev = Rev,
deleted = (Deleted == 1),
+ content_meta = Meta,
body_ptr = Bp,
size = Size}.
btree_by_id_split(#doc_info{id=Id, local_seq=Seq, rev=Rev,
- deleted=Deleted, body_ptr=Bp, size=Size}) ->
- {Id, {Seq, Rev, Bp, if Deleted -> 1; true -> 0 end, Size}}.
+ deleted=Deleted, body_ptr=Bp, content_meta=Meta, size=Size}) ->
+ {Id, {Seq, Rev, Bp, if Deleted -> 1; true -> 0 end, Meta, Size}}.
-btree_by_id_join(Id, {Seq, Rev, Bp, Deleted, Size}) ->
+btree_by_id_join(Id, {Seq, Rev, Bp, Deleted, Meta, Size}) ->
#doc_info{
id = Id,
local_seq = Seq,
rev = Rev,
deleted = (Deleted == 1),
body_ptr = Bp,
+ content_meta = Meta,
size = Size}.
btree_by_id_reduce(reduce, DocInfos) ->
@@ -296,9 +295,6 @@ simple_upgrade_record(Old, New) when tuple_size(Old) < tuple_size(New) ->
simple_upgrade_record(Old, _New) ->
Old.
--define(OLD_DISK_VERSION_ERROR,
- "Database files from versions smaller than 0.10.0 are no longer supported").
-
init_db(DbName, Filepath, Fd, Header0, Options) ->
Header1 = simple_upgrade_record(Header0, #db_header{}),
Header =
@@ -316,20 +312,16 @@ init_db(DbName, Filepath, Fd, Header0, Options) ->
_ -> ok
end,
- Compression = couch_compress:get_compression_method(),
-
{ok, IdBtree} = couch_btree:open(Header#db_header.docinfo_by_id_btree_state, Fd,
[{split, fun(X) -> btree_by_id_split(X) end},
{join, fun(X,Y) -> btree_by_id_join(X,Y) end},
- {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end},
- {compression, Compression}]),
+ {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}]),
{ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
[{split, fun(X) -> btree_by_seq_split(X) end},
{join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
- {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end},
- {compression, Compression}]),
+ {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]),
{ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd,
- [{compression, Compression}]),
+ []),
case Header#db_header.security_ptr of
nil ->
Security = [],
@@ -358,8 +350,7 @@ init_db(DbName, Filepath, Fd, Header0, Options) ->
security_ptr = SecurityPtr,
instance_start_time = StartTime,
fsync_options = FsyncOptions,
- options = Options,
- compression = Compression
+ options = Options
}.
@@ -385,6 +376,7 @@ update_docs_int(Db, DocsList, NonRepDocs, FullCommit) ->
body_ptr=Bp,
deleted=Deleted,
size=Size,
+ content_meta=Meta,
fd=DocFd
} = DocUpdate,
if Fd /= DocFd ->
@@ -395,6 +387,7 @@ update_docs_int(Db, DocsList, NonRepDocs, FullCommit) ->
id=Id,
rev=Rev,
body_ptr=Bp,
+ content_meta=Meta,
deleted=Deleted,
local_seq=SeqAcc,
size=Size
@@ -405,7 +398,7 @@ update_docs_int(Db, DocsList, NonRepDocs, FullCommit) ->
{K, V} = btree_by_id_split(DocInfo),
[{fetch, K, nil}, {insert, K, V}]
end, NewDocInfos),
- {ok, OldInfos, DocInfoByIdBTree2} = couch_btree_nif:query_modify_raw(Db,
+ {ok, OldInfos, DocInfoByIdBTree2} = couch_btree:query_modify_raw(
DocInfoByIdBTree, RawKeys),
OldSeqs = [OldSeq || {ok, #doc_info{local_seq=OldSeq}} <- OldInfos],
@@ -414,7 +407,7 @@ update_docs_int(Db, DocsList, NonRepDocs, FullCommit) ->
{K, V} = btree_by_seq_split(DocInfo),
{insert, K, V}
end, NewDocInfos),
- {ok, [], DocInfoBySeqBTree2} = couch_btree_nif:query_modify_raw(Db, DocInfoBySeqBTree, RemoveBySeq ++ InsertBySeq),
+ {ok, [], DocInfoBySeqBTree2} = couch_btree:query_modify_raw(DocInfoBySeqBTree, RemoveBySeq ++ InsertBySeq),
{ok, Db2} = update_local_docs(Db, NonRepDocs),
@@ -432,7 +425,7 @@ update_local_docs(Db, []) ->
{ok, Db};
update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
KVsAdd = [{Id, if is_tuple(Body) -> ?JSON_ENCODE(Body); true -> Body end} ||
- #doc{id=Id, deleted=false, json=Body} <- Docs],
+ #doc{id=Id, deleted=false, body=Body} <- Docs],
IdsRemove = [Id || #doc{id=Id, deleted=true} <- Docs],
{ok, Btree2} =
couch_btree:add_remove(Btree, KVsAdd, IdsRemove),
@@ -582,9 +575,7 @@ copy_compact(Db, NewDb0, Retry) ->
% copy misc header values
if NewDb3#db.security /= Db#db.security ->
- {ok, Ptr, _} = couch_file:append_term(
- NewDb3#db.fd, Db#db.security,
- [{compression, NewDb3#db.compression}]),
+ {ok, Ptr, _} = couch_file:append_term(NewDb3#db.fd, Db#db.security),
NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
true ->
NewDb4 = NewDb3
@@ -655,8 +646,7 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
NewDb = init_db(Name, CompactFile, Fd, Header, Db#db.options),
NewDb2 = if PurgeSeq > 0 ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
- {ok, Pointer, _} = couch_file:append_term(
- Fd, PurgedIdsRevs, [{compression, NewDb#db.compression}]),
+ {ok, Pointer, _} = couch_file:append_term(Fd, PurgedIdsRevs),
NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}};
true ->
NewDb
View
139 src/couchdb/couch_doc.erl
@@ -14,7 +14,7 @@
-export([parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]).
-export([from_json_obj/1,to_json_obj/2,from_binary/3]).
--export([validate_docid/1]).
+-export([validate_docid/1,with_uncompressed_body/1]).
-export([with_ejson_body/1,with_json_body/1]).
-include("couch_db.hrl").
@@ -26,15 +26,19 @@ to_json_rev(0, _) ->
to_json_rev(Start, RevId) ->
[{<<"_rev">>, ?l2b([integer_to_list(Start),"-",revid_to_str(RevId)])}].
-to_ejson_body(true, {Body}) ->
- to_ejson_body(false, {Body}) ++ [{<<"_deleted">>, true}];
-to_ejson_body(false, {Body}) ->
+to_ejson_body(true = _IsDeleted, ContentMeta, Body) ->
+ to_ejson_body(false, ContentMeta, Body) ++ [{<<"_deleted">>, true}];
+to_ejson_body(false, _ContentMeta, {Body}) ->
Body;
-to_ejson_body(false, <<"{}">>) ->
+to_ejson_body(false, ?CONTENT_META_JSON, <<"{}">>) ->
[];
-to_ejson_body(false, Body) when is_binary(Body) ->
+to_ejson_body(false, ?CONTENT_META_JSON, Body) ->
{R} = ?JSON_DECODE(Body),
- R.
+ R;
+to_ejson_body(false, ContentMeta, _Body)
+ when ContentMeta /= ?CONTENT_META_JSON ->
+ [].
+
revid_to_str(RevId) ->
?l2b(couch_util:to_hex(RevId)).
@@ -54,48 +58,69 @@ to_json_meta(Meta) ->
{<<"_local_seq">>, Seq}
end, Meta).
+revid_to_memcached_meta(<<_Cas:64, Expiration:32, Flags:32>>) ->
+ [{<<"$expiration">>, Expiration}, {<<"$flags">>, Flags}];
+revid_to_memcached_meta(_) ->
+ [].
-to_json_obj(Doc, Options) ->
- doc_to_json_obj(with_ejson_body(Doc), Options).
-
-doc_to_json_obj(#doc{id=Id,deleted=Del,json=Json,rev={Start, RevId},
- meta=Meta}, _Options)->
+content_meta_to_memcached_meta(?CONTENT_META_JSON) ->
+ [];
+content_meta_to_memcached_meta(?CONTENT_META_INVALID_JSON) ->
+ [{<<"$att_reason">>, <<"invalid_json">>}];
+content_meta_to_memcached_meta(?CONTENT_META_INVALID_JSON_KEY) ->
+ [{<<"$att_reason">>, <<"invalid_key">>}];
+content_meta_to_memcached_meta(?CONTENT_META_NON_JSON_MODE) ->
+ [{<<"$att_reason">>, <<"non-JSON mode">>}].
+
+to_memcached_meta(#doc{rev={_, RevId},content_meta=Meta}) ->
+ revid_to_memcached_meta(RevId) ++ content_meta_to_memcached_meta(Meta).
+
+to_json_obj(#doc{id=Id,deleted=Del,rev={Start, RevId},
+ meta=Meta}=Doc0, _Options)->
+ Doc = with_uncompressed_body(Doc0),
+ #doc{body=Body,content_meta=ContentMeta} = Doc,
{[{<<"_id">>, Id}]
++ to_json_rev(Start, RevId)
++ to_json_meta(Meta)
- ++ to_ejson_body(Del, Json)
+ ++ to_ejson_body(Del, ContentMeta, Body)
+ ++ to_memcached_meta(Doc)
}.
-mk_att_doc_from_binary(Id, Value, Reason) ->
- #doc{id=Id,
- meta = [att_reason, Reason],
- binary = Value}.
-
+mk_json_doc_from_binary(<<?LOCAL_DOC_PREFIX, _/binary>> = Id, Value) ->
+ case ejson:validate(Value, <<"_$">>) of
+ {ok, JsonBinary} ->
+ #doc{id=Id, body = JsonBinary};
+ Error ->
+ throw(Error)
+ end;
mk_json_doc_from_binary(Id, Value) ->
case ejson:validate(Value, <<"_$">>) of
- {error, invalid_json} ->
- mk_att_doc_from_binary(Id, Value, <<"invalid_json">>);
- {error, private_field_set} ->
- mk_att_doc_from_binary(Id, Value, <<"invalid_key">>);
- {error, garbage_after_value} ->
- mk_att_doc_from_binary(Id, Value, <<"invalid_json">>);
- {ok, Json} ->
- #doc{id=Id, % now add in new meta
- json=Json
- }
+ {error, invalid_json} ->
+ #doc{id=Id, body = Value,
+ content_meta = ?CONTENT_META_INVALID_JSON};
+ {error, private_field_set} ->
+ #doc{id=Id, body = Value,
+ content_meta = ?CONTENT_META_INVALID_JSON_KEY};
+ {error, garbage_after_value} ->
+ #doc{id=Id, body = Value,
+ content_meta = ?CONTENT_META_INVALID_JSON};
+ {ok, JsonBinary} ->
+ #doc{id=Id, body = couch_compress:compress(JsonBinary),
+ content_meta = ?CONTENT_META_JSON bor ?CONTENT_META_SNAPPY_COMPRESSED}
end.
from_binary(Id, Value, WantJson) ->
case WantJson of
- true ->
- mk_json_doc_from_binary(Id, Value);
- _ ->
- mk_att_doc_from_binary(Id, Value, <<"non-JSON mode">>)
+ true ->
+ mk_json_doc_from_binary(Id, Value);
+ _ ->
+ #doc{id=Id, body = Value,
+ content_meta = ?CONTENT_META_NON_JSON_MODE}
end.
from_json_obj({Props}) ->
- transfer_fields(Props, #doc{json=[]});
+ transfer_fields(Props, #doc{body=[]});
from_json_obj(_Other) ->
throw({bad_request, "Document must be a JSON object"}).
@@ -141,9 +166,9 @@ validate_docid(Id) ->
?LOG_DEBUG("Document id is not a string: ~p", [Id]),
throw({bad_request, <<"Document id must be a string">>}).
-transfer_fields([], #doc{json=Fields}=Doc) ->
+transfer_fields([], #doc{body=Fields}=Doc) ->
% convert fields back to json object
- Doc#doc{json={lists:reverse(Fields)}};
+ Doc#doc{body=?JSON_ENCODE({lists:reverse(Fields)})};
transfer_fields([{<<"_id">>, Id} | Rest], Doc) ->
validate_docid(Id),
@@ -173,43 +198,43 @@ transfer_fields([{<<"_deleted_conflicts">>, _} | Rest], Doc) ->
% special fields for replication documents
transfer_fields([{<<"_replication_state">>, _} = Field | Rest],
- #doc{json=Fields} = Doc) ->
- transfer_fields(Rest, Doc#doc{json=[Field|Fields]});
+ #doc{body=Fields} = Doc) ->
+ transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
transfer_fields([{<<"_replication_state_time">>, _} = Field | Rest],
- #doc{json=Fields} = Doc) ->
- transfer_fields(Rest, Doc#doc{json=[Field|Fields]});
+ #doc{body=Fields} = Doc) ->
+ transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
transfer_fields([{<<"_replication_id">>, _} = Field | Rest],
- #doc{json=Fields} = Doc) ->
- transfer_fields(Rest, Doc#doc{json=[Field|Fields]});
+ #doc{body=Fields} = Doc) ->
+ transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
% unknown special field
transfer_fields([{<<"_",Name/binary>>, _} | _], _) ->
throw({doc_validation,
?l2b(io_lib:format("Bad special document member: _~s", [Name]))});
-transfer_fields([Field | Rest], #doc{json=Fields}=Doc) ->
- transfer_fields(Rest, Doc#doc{json=[Field|Fields]}).
+transfer_fields([Field | Rest], #doc{body=Fields}=Doc) ->
+ transfer_fields(Rest, Doc#doc{body=[Field|Fields]}).
with_ejson_body(Doc) ->
Uncompressed = with_uncompressed_body(Doc),
- #doc{json = Body} = Uncompressed,
- Uncompressed#doc{json = {to_ejson_body(false, Body)}}.
+ #doc{body = Body, content_meta=Meta} = Uncompressed,
+ Uncompressed#doc{body = {to_ejson_body(false, Meta, Body)}}.
with_json_body(Doc) ->
case with_uncompressed_body(Doc) of
- #doc{json = Body} = Doc2 when is_binary(Body) ->
- Doc2;
- #doc{json = Body} = Doc2 when is_tuple(Body)->
- Doc2#doc{json = ?JSON_ENCODE(Body)}
+ #doc{body = Body} = Doc2 when is_tuple(Body)->
+ Doc2#doc{body = ?JSON_ENCODE(Body)};
+ #doc{} = Doc2 ->
+ Doc2
end.
-with_uncompressed_body(#doc{json = Body} = Doc) when is_binary(Body) ->
- case couch_compress:is_compressed(Body) of
- true ->
- Doc#doc{json = couch_compress:decompress(Body)};
- false ->
- Doc
- end;
-with_uncompressed_body(#doc{json = {_}} = Doc) ->
+
+with_uncompressed_body(#doc{body = {_}} = Doc) ->
+ Doc;
+with_uncompressed_body(#doc{body = Body, content_meta = Meta} = Doc)
+ when (Meta band ?CONTENT_META_SNAPPY_COMPRESSED) > 0 ->
+ NewMeta = Meta band (bnot ?CONTENT_META_SNAPPY_COMPRESSED),
+ Doc#doc{body = couch_compress:decompress(Body), content_meta = NewMeta};
+with_uncompressed_body(Doc) ->
Doc.
View
25 src/couchdb/couch_file.erl
@@ -26,9 +26,9 @@
% public API
-export([open/1, open/2, close/1, bytes/1, flush/1, sync/1, truncate/2]).
-export([pread_term/2, pread_iolist/2, pread_binary/2,rename/2]).
--export([append_binary/2, append_binary_crc32/2,set_close_after/2]).
+-export([append_binary/2, append_binary_crc32/2, set_close_after/2]).
-export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
--export([append_term/2, append_term/3, append_term_crc32/2, append_term_crc32/3]).
+-export([append_term/2]).
-export([write_header/2, read_header/1,only_snapshot_reads/1]).
-export([delete/2, delete/3, init_delete_dir/1,get_delete_dir/1]).
@@ -67,18 +67,7 @@ open(Filepath, Options) ->
%%----------------------------------------------------------------------
append_term(Fd, Term) ->
- append_term(Fd, Term, []).
-
-append_term(Fd, Term, Options) ->
- Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
- append_binary(Fd, couch_compress:compress(Term, Comp)).
-
-append_term_crc32(Fd, Term) ->
- append_term_crc32(Fd, Term, []).
-
-append_term_crc32(Fd, Term, Options) ->
- Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
- append_binary_crc32(Fd, couch_compress:compress(Term, Comp)).
+ append_binary_crc32(Fd, couch_compress:compress(?term_to_bin(Term))).
%%----------------------------------------------------------------------
%% Purpose: To append an Erlang binary to the end of the file.
@@ -115,7 +104,7 @@ assemble_file_chunk(Bin, Crc32) ->
pread_term(Fd, Pos) ->
case pread_binary(Fd, Pos) of
{ok, Bin} ->
- {ok, couch_compress:decompress(Bin)};
+ {ok, binary_to_term(couch_compress:decompress(Bin))};
Else ->
Else
end.
@@ -453,12 +442,6 @@ code_change(_OldVsn, State, _Extra) ->
handle_info(heart, File) ->
{noreply, File};
-handle_info({append_bin_btnif, Comp, Rsrc, TermBin}, #file{writer = W, eof = Pos} = File) ->
- Bin = assemble_file_chunk(couch_compress:compress_bin(TermBin, Comp)),
- Size = calculate_total_read_len(Pos rem ?SIZE_BLOCK, iolist_size(Bin)),
- ok = couch_btree_nif:write_response(Rsrc, Pos, Size),
- W ! {chunk, Bin},
- {noreply, File#file{eof = Pos + Size}};
handle_info({'EXIT', _, normal}, Fd) ->
{noreply, Fd};
handle_info({'EXIT', Pid, Reason}, #file{writer = Pid} = Fd) ->
View
47 src/couchdb/couch_httpd_db.erl
@@ -218,7 +218,7 @@ db_req(#httpd{method='POST',
true ->
Doc = couch_doc:from_json_obj(couch_httpd:json_body(Req));
false ->
- Doc = #doc{binary=couch_httpd:body(Req)}
+ Doc = #doc{body=couch_httpd:body(Req)}
end,
Doc2 = case Doc#doc.id of
<<"">> ->
@@ -563,10 +563,10 @@ db_doc_req(#httpd{method='DELETE',db_frontend=DbFrontend}=Req, Db, DocId) ->
case couch_httpd:qs_value(Req, "rev") of
undefined ->
update_doc(Req, Db, DocId,
- couch_doc_from_req(Req, DocId, {[{<<"_deleted">>,true}]}));
+ couch_doc_from_req(DocId, {[{<<"_deleted">>,true}]}));
Rev ->
update_doc(Req, Db, DocId,
- couch_doc_from_req(Req, DocId,
+ couch_doc_from_req(DocId,
{[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]}))
end;
@@ -575,7 +575,7 @@ db_doc_req(#httpd{method = 'GET',
#doc_query_args{
options = Options
} = parse_doc_query(Req),
- Doc = DbFrontend:couch_doc_open(Db, DocId, Options),
+ Doc = DbFrontend:couch_doc_open(Db, DocId, [ejson_body | Options]),
send_doc(Req, Doc, Options);
@@ -585,37 +585,32 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
RespHeaders = [{"Location", Loc}],
case couch_httpd:is_ctype(Req, "application/json") of
true ->
- Body = couch_httpd:json_body(Req);
+ Body = couch_httpd:json_body(Req),
+ Doc = couch_doc_from_req(DocId, Body);
false ->
- Body = couch_doc:from_binary(DocId, couch_httpd:body(Req), true)
+ Body = couch_httpd:body(Req),
+ Doc = couch_doc:from_binary(DocId, Body, false)
end,
- Doc = couch_doc_from_req(Req, DocId, Body),
update_doc(Req, Db, DocId, Doc, RespHeaders);
db_doc_req(Req, _Db, _DocId) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST,PUT").
-send_doc(Req, Doc, Options) ->
- case Doc#doc.meta of
- [] ->
- send_doc_efficiently(Req, Doc, [], Options);
+send_doc(Req, Doc0, Options) ->
+ Doc = couch_doc:with_uncompressed_body(Doc0),
+ case Doc#doc.content_meta == ?CONTENT_META_JSON of
+ true ->
+ send_json(Req, 200, [], couch_doc:to_json_obj(Doc, Options));
_ ->
- send_doc_efficiently(Req, Doc, [], Options)
+ Headers = [
+ {"Content-Type", "application/content-stream"},
+ {"Cache-Control", "must-revalidate"}
+ ],
+ send_response(Req, 200, Headers, Doc#doc.body)
end.
-send_doc_efficiently(Req,
- #doc{binary=nil} = Doc, Headers, Options) ->
- send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options));
-send_doc_efficiently(Req,
- #doc{binary=Binary}, Headers, _Options) ->
- Headers2 = Headers ++ [
- {"Content-Type", "application/content-stream"},
- {"Cache-Control", "must-revalidate"}
- ],
- send_response(Req, 200, Headers2, Binary).
-
update_doc_result_to_json({{Id, Rev}, Error}) ->
{_Code, Err, Msg} = couch_httpd:error_info(Error),
{[{id, Id}, {rev, couch_doc:rev_to_str(Rev)},
@@ -649,10 +644,10 @@ update_doc(Req, Db, DocId, #doc{deleted=Deleted}=Doc, Headers) ->
{ok, true},
{id, DocId}]}).
-couch_doc_from_req(_Req, DocId, #doc{} = Doc) ->
+couch_doc_from_req(DocId, #doc{} = Doc) ->
Doc#doc{id=DocId};
-couch_doc_from_req(Req, DocId, Json) ->
- couch_doc_from_req(Req, DocId, couch_doc:from_json_obj(Json)).
+couch_doc_from_req(DocId, {_}=EJson) ->
+ couch_doc_from_req(DocId, couch_doc:from_json_obj(EJson)).
parse_doc_query(Req) ->
View
4 src/couchdb/couch_query_servers.erl
@@ -330,7 +330,7 @@ terminate(_Reason, #qserver{pid_procs=PidProcs}) ->
ok.
handle_call({get_proc, DDoc1, DDocKey}, From, Server) ->
- #doc{json = {Props}} = DDoc = couch_doc:with_ejson_body(DDoc1),
+ #doc{body = {Props}} = DDoc = couch_doc:with_ejson_body(DDoc1),
Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
case lang_proc(Lang, Server, fun(Procs) ->
% find a proc in the set that has the DDoc
@@ -431,7 +431,7 @@ service_waitlist(#qserver{waitlist=Waitlist}=Server) ->
end.
% todo get rid of duplication
-service_waiting({{#doc{json={Props}}=DDoc, DDocKey}, From}, Server) ->
+service_waiting({{#doc{body={Props}}=DDoc, DDocKey}, From}, Server) ->
Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
case lang_proc(Lang, Server, fun(Procs) ->
% find a proc in the set that has the DDoc
View
4 src/couchdb/couch_replication_manager.erl
@@ -515,7 +515,7 @@ update_rep_doc(RepDocId, KVs) ->
couch_db:close(RepDb)
end.
-update_rep_doc(RepDb, #doc{json = {RepDocBody}} = RepDoc, KVs) ->
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
NewRepDocBody = lists:foldl(
fun({<<"_replication_state">> = K, State} = KV, Body) ->
case get_value(K, Body) of
@@ -537,7 +537,7 @@ update_rep_doc(RepDb, #doc{json = {RepDocBody}} = RepDoc, KVs) ->
_ ->
% Might not succeed - when the replication doc is deleted right
% before this update (not an error, ignore).
- couch_db:update_doc(RepDb, RepDoc#doc{json = {NewRepDocBody}}, [])
+ couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, [])
end.
View
18 src/couchdb/couch_replicator.erl
@@ -535,7 +535,7 @@ init_state(Rep) ->
{StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
StartSeq1 = get_value(since_seq, Options, StartSeq0),
StartSeq = {0, StartSeq1},
- #doc{json={CheckpointHistory}} = SourceLog,
+ #doc{body={CheckpointHistory}} = SourceLog,
State = #rep_state{
rep_details = Rep,
source_name = couch_api_wrap:db_uri(Source),
@@ -580,12 +580,12 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
{error, <<"not_found">>} ->
fold_replication_logs(
- Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId, json = {[]}} | Acc]);
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId, body = {[]}} | Acc]);
{ok, Doc} when LogId =:= NewId ->
fold_replication_logs(
Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
{ok, Doc} ->
- MigratedLog = #doc{id = NewId, json = Doc#doc.json},
+ MigratedLog = #doc{id = NewId, body = Doc#doc.body},
fold_replication_logs(
Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
end.
@@ -745,9 +745,9 @@ do_checkpoint(State) ->
RandBin = <<Rand:32/integer>>,
try
SrcRev = update_checkpoint(
- Source, SourceLog#doc{json = NewRepHistory, rev={1, RandBin}}, source),
+ Source, SourceLog#doc{body = NewRepHistory, rev={1, RandBin}}, source),
TgtRev = update_checkpoint(
- Target, TargetLog#doc{json = NewRepHistory, rev={1, RandBin}}, target),
+ Target, TargetLog#doc{body = NewRepHistory, rev={1, RandBin}}, target),
SourceCurSeq = source_cur_seq(State),
NewState = State#rep_state{
source_seq = SourceCurSeq,
@@ -782,7 +782,7 @@ update_checkpoint(Db, Doc, DbType) ->
" checkpoint document: ", (to_binary(Reason))/binary>>})
end.
-update_checkpoint(Db, #doc{id = LogId, json = LogBody, rev = Rev} = Doc) ->
+update_checkpoint(Db, #doc{id = LogId, body = LogBody, rev = Rev} = Doc) ->
try
case couch_api_wrap:update_doc(Db, Doc, [delay_commit]) of
ok ->
@@ -792,7 +792,7 @@ update_checkpoint(Db, #doc{id = LogId, json = LogBody, rev = Rev} = Doc) ->
end
catch throw:conflict ->
case (catch couch_api_wrap:open_doc(Db, LogId, [ejson_body])) of
- {ok, #doc{json = LogBody, rev = Rev}} ->
+ {ok, #doc{body = LogBody, rev = Rev}} ->
% This means that we were able to update successfully the
% checkpoint doc in a previous attempt but we got a connection
% error (timeout for e.g.) before receiving the success response.
@@ -841,8 +841,8 @@ commit_to_both(Source, Target) ->
compare_replication_logs(SrcDoc, TgtDoc) ->
- #doc{json={RepRecProps}} = SrcDoc,
- #doc{json={RepRecPropsTgt}} = TgtDoc,
+ #doc{body={RepRecProps}} = SrcDoc,
+ #doc{body={RepRecPropsTgt}} = TgtDoc,
case get_value(<<"session_id">>, RepRecProps) ==
get_value(<<"session_id">>, RepRecPropsTgt) of
true ->
View
2 src/couchdb/couch_replicator_utils.erl
@@ -120,7 +120,7 @@ filter_code(Filter, Source, UserCtx) ->
try
Body = case (catch couch_api_wrap:open_doc(
Db, <<"_design/", DDocName/binary>>, [ejson_body])) of
- {ok, #doc{json = Body0}} ->
+ {ok, #doc{body = Body0}} ->
Body0;
DocError ->
DocErrorMsg = io_lib:format(
View
9 src/couchdb/couch_view_group.erl
@@ -585,7 +585,7 @@ sum_btree_sizes(Size1, Size2) ->
Size1 + Size2.
% maybe move to another module
-design_doc_to_view_group(#doc{id=Id,json={Fields}}) ->
+design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>),
{DesignOptions} = couch_util:get_value(<<"options">>, Fields, {[]}),
{RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
@@ -638,7 +638,7 @@ init_group(Db, Fd, #group{views=Views}=Group, nil) ->
init_group(Db, Fd, Group,
#index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
id_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views]});
-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
+init_group(_Db, Fd, #group{def_lang=Lang,views=Views}=
Group, IndexHeader) ->
#index_header{seq=Seq, purge_seq=PurgeSeq,
id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
@@ -648,7 +648,7 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
end,
ViewStates2 = lists:map(StateUpdate, ViewStates),
{ok, IdBtree} = couch_btree:open(
- IdBtreeState, Fd, [{compression, Db#db.compression}]),
+ IdBtreeState, Fd, []),
Views2 = lists:zipwith(
fun({BTState, USeq, PSeq}, #view{reduce_funs=RedFuns,options=Options}=View) ->
FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
@@ -674,8 +674,7 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
Less = fun(A,B) -> A < B end
end,
{ok, Btree} = couch_btree:open(BTState, Fd,
- [{less, Less}, {reduce, ReduceFun},
- {compression, Db#db.compression}]
+ [{less, Less}, {reduce, ReduceFun}]
),
View#view{btree=Btree, update_seq=USeq, purge_seq=PSeq}
end,
View
4 src/couchdb/couch_view_merger.erl
@@ -200,7 +200,7 @@ http_index_folder_req_details(#simple_index_spec{
view_details(nil, <<"_all_docs">>) ->
{<<"raw">>, map, nil};
-view_details(#doc{json = DDoc}, ViewName) ->
+view_details(#doc{body = DDoc}, ViewName) ->
{Props} = DDoc,
{ViewDef} = get_nested_json_value(DDoc, [<<"views">>, ViewName]),
{ViewOptions} = get_value(<<"options">>, ViewDef, {[]}),
@@ -215,7 +215,7 @@ view_details(#doc{json = DDoc}, ViewName) ->
{Collation, ViewType, Lang}.
-reduce_function(#doc{json = DDoc}, ViewName) ->
+reduce_function(#doc{body = DDoc}, ViewName) ->
{ViewDef} = get_nested_json_value(DDoc, [<<"views">>, ViewName]),
get_value(<<"reduce">>, ViewDef).
View
32 src/couchdb/priv/Makefile.am
@@ -36,12 +36,6 @@ EJSON_COMPARE_DIR = couch_ejson_compare/win32
EJSON_COMPARE_MSBUILD = $(EJSON_COMPARE_DIR)/msbuild.bat
EJSON_COMPARE_VCPROJ = $(EJSON_COMPARE_DIR)/couch_ejson_compare.vcxproj
COUCH_EJSON_COMPARE = $(EJSON_COMPARE_DIR)/couch_ejson_compare.dll
-BTREE_NIF_DIR = btree_nif/win32
-BTREE_NIF_MSBUILD = $(BTREE_NIF_DIR)/msbuild.bat
-BTREE_NIF_VCPROJ = $(BTREE_NIF_DIR)/couch_btree_nif.vcxproj
-COUCH_BTREE_NIF = $(BTREE_NIF_DIR)/release/couch_btree_nif.dll
-WIN_ERL_INTERFACE_INCLUDE = $(BTREE_NIF_DIR)/win_erl_interface_include.tmp
-WIN_ERL_INTERFACE_LIB = $(BTREE_NIF_DIR)/win_erl_interface_lib.tmp
else
ICU_LOCAL_FLAGS = $(ICU_LOCAL_CFLAGS) $(ICU_LOCAL_LDFLAGS)
@@ -50,22 +44,11 @@ ICU_LOCAL_LIBS=-licuuc -licudata -licui18n
couchprivlib_LTLIBRARIES = couch_icu_driver.la
if USE_OTP_NIFS
couchprivlib_LTLIBRARIES += couch_ejson_compare.la
-couchprivlib_LTLIBRARIES += couch_btree_nif.la
couch_ejson_compare_la_SOURCES = couch_ejson_compare/couch_ejson_compare.c
couch_ejson_compare_la_CFLAGS = -D_BSD_SOURCE $(ICU_LOCAL_FLAGS)
couch_ejson_compare_la_LDFLAGS = -module -avoid-version $(ICU_LOCAL_FLAGS)
couch_ejson_compare_la_LIBADD = $(ICU_LOCAL_LIBS)
-couch_btree_nif_la_SOURCES = btree_nif/couch_btree_nif.c \
- btree_nif/couch_btree.c \
- btree_nif/couch_file_read.c \
- btree_nif/couch_nif_write.c \
- $(SNAPPY_SRC)/snappy-c.cc \
- $(SNAPPY_SRC)/snappy.cc \
- $(SNAPPY_SRC)/snappy-sinksource.cc \
- $(SNAPPY_SRC)/snappy-stubs-internal.cc
-couch_btree_nif_la_CFLAGS = $(ERL_INTERFACE_DIR_INCLUDE)
-couch_btree_nif_la_LDFLAGS = -module -avoid-version $(erlinterfaceldflags)
endif
couch_icu_driver_la_SOURCES = icu_driver/couch_icu_driver.c
@@ -79,7 +62,7 @@ couchjs_msbuild = couch_js/win32/msbuild.bat
couchjs_vcproj = couch_js/win32/couchjs.vcxproj
WIN_JS_INCLUDE = couch_js/win32/win_js_include.tmp
WIN_JS_LIB_DIR = couch_js/win32/win_js_dir.tmp
-couchpriv_DATA = stat_descriptions.cfg $(COUCH_ICU_DRIVER) $(COUCH_EJSON_COMPARE) $(COUCH_BTREE_NIF)
+couchpriv_DATA = stat_descriptions.cfg $(COUCH_ICU_DRIVER)
endif
COUCHJS_SRCS = \
@@ -143,18 +126,7 @@ $(WIN_ICU_LIB_DIR):$(ICU_LIB_DIR)
$(WIN_ERL_INCLUDE):$(ERLANG_INCLUDE)
echo $< | sed -e "s|^/cygdrive/\([a-zA-Z]\)|\1:|" > $@
-$(COUCH_BTREE_NIF) : $(couch_btree_nif_la_SOURCES) $(BTREE_NIF_MSBUILD) $(BTREE_NIF_VCPROJ)
- $(MKDIR_P) "$(couchprivlibdir)" || true
- (cd $(BTREE_NIF_DIR) && ./msbuild.bat)
- cp $(COUCH_BTREE_NIF) $(couchprivlibdir)
-
-$(BTREE_NIF_MSBUILD) : $(BTREE_NIF_MSBUILD).tpl
- sed -e "s|%msbuild_dir%|$(msbuild_dir)|" \
- -e "s|%msbuild_name%|$(msbuild_name)|" \
- -e "s|^/cygdrive/\([a-zA-Z]\)|\1:|" \
- < $< > $@
-
-$(BTREE_NIF_VCPROJ) : $(BTREE_NIF_VCPROJ).tpl $(WIN_ERL_INCLUDE) $(WIN_ERL_INTERFACE_INCLUDE) $(WIN_ERL_INTERFACE_LIB)
+ $(WIN_ERL_INTERFACE_INCLUDE) $(WIN_ERL_INTERFACE_LIB)
sed -e "s|%ERLANG_INCLUDE%|`cat $(WIN_ERL_INCLUDE)`|" \
-e "s|%ERL_INTERFACE_DIR_INCLUDE%|`cat $(WIN_ERL_INTERFACE_INCLUDE)`|" \
-e "s|%ERL_INTERFACE_DIR_LIB%|`cat $(WIN_ERL_INTERFACE_LIB)`|" \
View
40 src/couchdb/priv/btree_nif/config_static.h
@@ -1,40 +0,0 @@
-#pragma once
-
-#ifndef _CONFIG_STATIC_H
-#define _CONFIG_STATIC_H
-
-#ifdef WIN32
-#define __WIN32__
-// Including SDKDDKVer.h defines the highest available Windows platform.
-
-// If you wish to build your application for a previous Windows platform, include WinSDKVer.h and
-// set the _WIN32_WINNT macro to the platform you wish to support before including SDKDDKVer.h.
-
-#include <SDKDDKVer.h>
-
-#define WIN32_LEAN_AND_MEAN // Exclude rarely-used stuff from Windows headers
-#define MAXPATHLEN 1024
-#include <windows.h>
-#include <stdio.h>
-#include <stdint.h>
-
-#define inline __inline
-
-#define ssize_t long
-#define off_t long
-
-static inline ssize_t pread(int fd, void *buf, size_t count, off_t offset)
-{
- off_t pos = lseek(fd, offset, SEEK_SET);
- if (pos < 0) {
- return pos;
- }
- return read(fd, buf, count);
-}
-#else
-#define HAVE_SYS_PARAM_H 1
-#define HAVE_UNISTD_H 1
-#define HAVE_NETINET_IN_H 1
-
-#endif
-#endif // _CONFIG_STATIC_H
View
913 src/couchdb/priv/btree_nif/couch_btree.c
@@ -1,913 +0,0 @@
-#include "config_static.h"
-
-#include <stdlib.h>
-#include <string.h>
-
-#include "couch_btree.h"
-#include "ei.h"
-
-#define CHUNK_THRESHOLD 1279
-//#define DBG(...) fprintf(stderr, __VA_ARGS__)
-#define DBG(...)
-
-eterm_buf empty_root = {
- // {kv_node, []}
- "\x68\x02\x64\x00\x07kv_node\x6A",
- 13
-};
-
-int flush_mr(couchfile_modify_result *res);
-
-int find_first_gteq(char* buf, int pos, void* key, compare_info* lu, int at_least)
-{
- int list_arity, inner_arity;
- int list_pos = 0, cmp_val;
- off_t pair_pos = 0;
- if(ei_decode_list_header(buf, &pos, &list_arity) < 0)
- {
- return ERROR_PARSE;
- }
- while(list_pos < list_arity)
- {
- //{<<"key", some other term}
- pair_pos = pos; //Save pos of kv/kp pair tuple
-
- if(ei_decode_tuple_header(buf, &pos, &inner_arity) < 0)
- {
- return ERROR_PARSE;
- }
-
- lu->last_cmp_key = (*lu->from_ext)(lu, buf,pos);
- cmp_val = (*lu->compare) (lu->last_cmp_key, key);
- lu->last_cmp_val = cmp_val;
- lu->list_pos = list_pos;
- if(cmp_val >= 0 && list_pos >= at_least)
- {
- break;
- }
- ei_skip_term(buf, &pos); //skip over the key
- ei_skip_term(buf, &pos); //skip over the value
- list_pos++;
- }
- return pair_pos;
-}
-
-int maybe_flush(couchfile_modify_result *mr)
-{
- if(mr->modified && mr->node_len > CHUNK_THRESHOLD)
- {
- return flush_mr(mr);
- }
- return 0;
-}
-
-
-void free_nodelist(nodelist* nl)
-{
- while(nl != NULL)
- {
- nodelist* next = nl->next;
- if(nl->value.mem != NULL)
- {
- free(nl->value.mem);
- }
- free(nl);
- nl = next;
- }
-}
-
-nodelist* make_nodelist()
-{
- nodelist* r = malloc(sizeof(nodelist));
- if(!r)
- return NULL;
- r->next = NULL;
- r->value.mem = NULL;
- return r;
-}
-
-couchfile_modify_result *make_modres(couchfile_modify_request *rq) {
- couchfile_modify_result *res = malloc(sizeof(couchfile_modify_result));
- if(!res)
- return NULL;
- res->values = make_nodelist();
- if(!res->values)
- {
- free(res);
- return NULL;
- }
- res->values_end = res->values;
- res->pointers = make_nodelist();
- if(!res->pointers)
- {
- free(res);
- return NULL;
- }
- res->pointers_end = res->pointers;
- res->node_len = 0;
- res->count = 0;
- res->modified = 0;
- res->error_state = 0;
- res->rq = rq;
- return res;
-}
-
-void term_to_buf(eterm_buf *dst, char* buf, int *pos)
-{
- int start = *pos;
- ei_skip_term(buf, pos);
- dst->buf = buf + start;
- dst->size = *pos - start;
-}
-
-void free_modres(couchfile_modify_result* mr)
-{
- free_nodelist(mr->values);
- free_nodelist(mr->pointers);
- free(mr);
-}
-
-int mr_push_action(couchfile_modify_action *act, couchfile_modify_result *dst)
-{
- nodelist* n = make_nodelist();
- //For ACTION_INSERT
- couchfile_leaf_value* lv = malloc(sizeof(couchfile_leaf_value) +
- act->key->size + act->value->size + 2);
- if(!lv)
- goto fail;
- //Allocate space for {K,V} term
- lv->term.buf = ((char*)lv) + sizeof(couchfile_leaf_value);
- lv->term.size = (act->key->size + act->value->size + 2);
- //tuple of arity 2
- lv->term.buf[0] = 104;
- lv->term.buf[1] = 2;
- //copy terms from the action
- memcpy(lv->term.buf + 2, act->key->buf, act->key->size);
- memcpy(lv->term.buf + 2 + act->key->size,
- act->value->buf, act->value->size);
-
- if(!n)
- goto fail;
- dst->values_end->next = n;
- dst->values_end = n;
- n->value.leaf = lv;
-
- dst->node_len += lv->term.size;
- dst->count++;
- return maybe_flush(dst);
-fail:
- if(lv)
- free(lv);
- return ERROR_ALLOC_FAIL;
-}
-
-int mr_push_pointerinfo(couchfile_pointer_info* ptr, couchfile_modify_result* dst)
-{
- nodelist* pel = make_nodelist();
- if(!pel)
- goto fail;
- pel->value.pointer = ptr;
- dst->values_end->next = pel;
- dst->values_end = pel;
-
- //max len of {key,{pointer, reduce_value, subtreesize}}
- dst->node_len += ptr->key.size + ptr->reduce_value.size + 24;
- dst->count++;
- return maybe_flush(dst);
-fail:
- return ERROR_ALLOC_FAIL;
-}
-
-int mr_push_kv_range(char* buf, int pos, int bound, int end, couchfile_modify_result *dst)
-{
- int current = 0;
- int term_begin_pos;
- int errcode = 0;
- ei_decode_list_header(buf, &pos, NULL);
- while(current < end && errcode == 0)
- {
- term_begin_pos = pos;
- ei_skip_term(buf, &pos);
- if(current >= bound)
- {
- nodelist* n = make_nodelist();
- //Parse KV pair into a leaf_value
- couchfile_leaf_value *lv = malloc(sizeof(couchfile_leaf_value));
- if(!lv)
- {
- errcode = ERROR_ALLOC_FAIL;
- break;
- }
-
- lv->term.buf = buf+term_begin_pos;
- lv->term.size = pos-term_begin_pos;
-
- if(!n)
- {
- errcode = ERROR_ALLOC_FAIL;
- free(lv);
- break;
- }
-
- dst->values_end->next = n;
- dst->values_end = n;
- n->value.leaf = lv;
-
- dst->node_len += lv->term.size;
- dst->count++;
- errcode = maybe_flush(dst);
- }
- current++;
- }
- return errcode;
-}
-
-couchfile_pointer_info* read_pointer(char* buf, int pos)
-{
-//Parse KP pair into a couchfile_pointer_info {K, {ptr, reduce_value, subtreesize}}
- couchfile_pointer_info *p = malloc(sizeof(couchfile_pointer_info));
- if(!p)
- return NULL;
- p->writerq_resource = NULL;
- //DBG("%u,%u,%u,%u\n", buf[pos+0], buf[pos+1], buf[pos+2], buf[pos+3]);
- ei_decode_tuple_header(buf, &pos, NULL); //arity 2
- term_to_buf(&p->key, buf, &pos);
- ei_decode_tuple_header(buf, &pos, NULL); //arity 3
- ei_decode_ulonglong(buf, &pos, (unsigned long long*) &p->pointer);
- term_to_buf(&p->reduce_value, buf, &pos);
- ei_decode_ulonglong(buf, &pos, (unsigned long long*) &p->subtreesize);
-
- return p;
-}
-
-int mr_push_kp_range(char* buf, int pos, int bound, int end, couchfile_modify_result *dst)
-{
- couchfile_pointer_info *read= NULL;
- int current = 0;
- DBG("Moving items %d - %d into result.\r\n", bound, end);
-
- ei_decode_list_header(buf, &pos, NULL);
- while(current < end)
- {
- if(current >= bound)
- {
- DBG(" .... %d\r\n", current);
- read = read_pointer(buf,pos);
- if(!read)
- return ERROR_ALLOC_FAIL;
- mr_push_pointerinfo(read, dst);
- }
- ei_skip_term(buf, &pos);
- current++;
- }
- return 0;
-}
-
-inline void append_buf(void* dst, int *dstpos, void* src, int len) {
- char* tmp = (char*)dst;
- memcpy(tmp + *dstpos, src, len);
- *dstpos += len;
-}
-
-int wait_pointer(couchfile_modify_request* rq, couchfile_pointer_info *ptr)
-{
- int ret = 0;
- btreenif_state *state = rq->globalstate;
-
- if(ptr->writerq_resource == NULL)
- return 0;
- enif_mutex_lock(state->writer_cond.mtx);
-
- while(ptr->pointer == 0)
- {
- enif_cond_wait(state->writer_cond.cond, state->writer_cond.mtx);
- if(ptr->pointer == 0 &&
- !enif_send(rq->caller_env, &rq->writer, state->check_env, state->atom_heart))
- {
- //The writer process has died
- ret = ERROR_WRITER_DEAD;
- break;
- }
- enif_clear_env(state->check_env);
- }
-
- if(ptr->pointer != 0)
- {
- enif_release_resource(ptr->writerq_resource);
- }
-
- enif_mutex_unlock(state->writer_cond.mtx);
-
- ptr->writerq_resource = NULL;
- return ret;
-}
-
-//Write the current contents of the values list to disk as a node
-//and add the resulting pointer to the pointers list.
-int flush_mr(couchfile_modify_result *res)
-{
- int nbufpos = 0;
- long long subtreesize = 0;
- eterm_buf reduce_value;
- //default reduce value []
-
- int reduced = 0;
- int errcode = 0;
- nif_writerq *wrq = NULL;
- char *nodebuf = NULL;
- nodelist* i = NULL;
- eterm_buf last_key;
- nodelist* pel = NULL;
- couchfile_pointer_info* ptr = NULL;
-
- reduce_value.buf = "\x6A"; //NIL_EXT
- reduce_value.size = 1;
-
- if(res->values_end == res->values || !res->modified)
- {
- //Empty
- return 0;
- }
-
- res->node_len += 19; //tuple header and node type tuple, list header and tail
- wrq = nif_writerq_alloc(res->node_len);
- nodebuf = wrq->buf;
-
- //External term header; tuple header arity 2;
- ei_encode_version(nodebuf, &nbufpos);
- ei_encode_tuple_header(nodebuf, &nbufpos, 2);
- switch(res->node_type)
- {
- case KV_NODE:
- ei_encode_atom_len(nodebuf, &nbufpos, "kv_node", 7);
- if(res->rq->reduce)
- {
- (*res->rq->reduce)(&reduce_value, res->values->next, res->count);
- reduced = 1;
- }
- break;
- case KP_NODE:
- ei_encode_atom_len(nodebuf, &nbufpos, "kp_node", 7);
- if(res->rq->rereduce)
- {
- (*res->rq->rereduce)(&reduce_value, res->values->next, res->count);
- reduced = 1;
- }
- break;
- }
-
- if(reduced && reduce_value.buf == NULL)
- {
- errcode = ERROR_ALLOC_FAIL;
- goto cleanup;
- }
-
- ei_encode_list_header(nodebuf, &nbufpos, res->count);
-
- i = res->values->next;
-
- while(i != NULL)
- {
- if(res->node_type == KV_NODE) //writing value in a kv_node
- {
- append_buf(nodebuf, &nbufpos, i->value.leaf->term.buf, i->value.leaf->term.size);
- if(i->next == NULL)
- {
- int pos = 0;
- term_to_buf(&last_key, i->value.leaf->term.buf+2, &pos);
- }
- }
- else if (res->node_type == KP_NODE) //writing value in a kp_node
- {
- if(wait_pointer(res->rq, i->value.pointer) < 0)
- {
- errcode = ERROR_WRITER_DEAD;
- goto cleanup;
- }
- subtreesize += i->value.pointer->subtreesize;
- ei_encode_tuple_header(nodebuf, &nbufpos, 2); //tuple arity 2
- append_buf(nodebuf, &nbufpos, i->value.pointer->key.buf, i->value.pointer->key.size);
- ei_encode_tuple_header(nodebuf, &nbufpos, 3); //tuple arity 3
- //pointer
- // v- between 2 and 10 bytes (ERL_SMALL_INTEGER_EXT to ERL_SMALL_BIG_EXT/8)
- ei_encode_ulonglong(nodebuf, &nbufpos, i->value.pointer->pointer);
- //reduce_value
- append_buf(nodebuf, &nbufpos, i->value.pointer->reduce_value.buf,
- i->value.pointer->reduce_value.size);
- //subtreesize
- // v- between 2 and 10 bytes (ERL_SMALL_INTEGER_EXT to ERL_SMALL_BIG_EXT/8)
- ei_encode_ulonglong(nodebuf, &nbufpos, i->value.pointer->subtreesize);
- if(i->next == NULL)
- {
- last_key = i->value.pointer->key;
- }
- }
- i = i->next;
- }
-
- //NIL_EXT (list tail)
- ei_encode_empty_list(nodebuf, &nbufpos);
-
- ptr = malloc(sizeof(couchfile_pointer_info) +
- last_key.size + reduce_value.size);
- if(!ptr)
- {
- errcode = ERROR_ALLOC_FAIL;
- goto cleanup;
- }
- ptr->pointer = 0;
-
- nif_write(res->rq, ptr, wrq, nbufpos);
-
- ptr->key.buf = ((char*)ptr) + sizeof(couchfile_pointer_info);
- ptr->reduce_value.buf = ((char*)ptr) + sizeof(couchfile_pointer_info) + last_key.size;
-
- ptr->key.size = last_key.size;
- ptr->reduce_value.size = reduce_value.size;
-
- memcpy(ptr->key.buf, last_key.buf, last_key.size);
- memcpy(ptr->reduce_value.buf, reduce_value.buf, reduce_value.size);
-
- ptr->subtreesize = subtreesize;
-
- pel = make_nodelist();
- if(!pel)
- {
- errcode = ERROR_ALLOC_FAIL;
- free(ptr);
- goto cleanup;
- }
-
- pel->value.pointer = ptr;
- res->pointers_end->next = pel;
- res->pointers_end = pel;
-
- res->node_len = 0;
- res->count = 0;
-
- res->values_end = res->values;
- free_nodelist(res->values->next);
- res->values->next = NULL;
-cleanup:
-
- if(errcode < 0)
- {
- enif_release_resource(wrq);
- }
-
- if(reduced)
- {
- free(reduce_value.buf);
- }
- return errcode;
-}
-
-//Move this node's pointers list to dst node's values list.
-int mr_move_pointers(couchfile_modify_result *src, couchfile_modify_result *dst)
-{
- int errcode = 0;
- nodelist *ptr = NULL;
- nodelist *next = NULL;
- if(src->pointers_end == src->pointers)
- {
- return 0;
- }
-
- ptr = src->pointers->next;
- next = ptr;
- while(ptr != NULL && errcode == 0)
- {
- //max on disk len of a pointer node
- dst->node_len += ptr->value.pointer->key.size +
- ptr->value.pointer->reduce_value.size + 24;
- dst->count++;
-
- next = ptr->next;
- ptr->next = NULL;
-
- dst->values_end->next = ptr;
- dst->values_end = ptr;
- errcode = maybe_flush(dst);
- ptr = next;
- }
-
- src->pointers->next = next;
-