Skip to content

Commit

Permalink
CBD-426 Index headers are stored in a termless format
Browse files Browse the repository at this point in the history
The index headers are no longer written as serialized Erlang
terms. Instead they are now written in a format easy to parse
and for any environment/language.

Change-Id: Ief5cd900d2d1f87fc6db1e42ce3be1c0b02648b7
Reviewed-on: http://review.couchbase.org/19332
Reviewed-by: Volker Mische <volker.mische@gmail.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information
fdmanana committed Aug 10, 2012
1 parent fc3a87e commit 8aae876
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 56 deletions.
1 change: 1 addition & 0 deletions src/couch_set_view/Makefile.am
Expand Up @@ -38,6 +38,7 @@ test_files = \
test/00-prepare.t \
test/01-load.t \
test/01-collation.t \
test/01-headers.t \
test/02-old-index-cleanup.t \
test/03-db-compaction-file-leaks.t \
test/04-handle-db-deletes.t \
Expand Down
34 changes: 17 additions & 17 deletions src/couch_set_view/include/couch_set_view.hrl
Expand Up @@ -57,7 +57,7 @@
-type bitmask() :: non_neg_integer().
-type bitmap() :: non_neg_integer().
-type update_seq() :: non_neg_integer().
-type btree_state() :: 'nil' | tuple().
-type btree_state() :: 'nil' | binary().
-type partition_seq() :: {partition_id(), update_seq()}.
% Manipulate via ordsets or orddict, keep it ordered by partition id.
-type partition_seqs() :: ordsets:ordset(partition_seq()).
Expand Down Expand Up @@ -113,7 +113,7 @@
-record(set_view_index_header, {
version = ?LATEST_COUCH_SET_VIEW_HEADER_VERSION :: non_neg_integer(),
% Maximum number of partitions this set view supports, nil means not yet defined.
num_partitions = nil :: 'nil' | non_neg_integer(),
num_partitions = 0 :: non_neg_integer(),
% active partitions bitmap
abitmask = 0 :: bitmask(),
% passive partitions bitmap
Expand Down Expand Up @@ -175,21 +175,21 @@
}).

-record(set_view_group, {
sig = nil :: 'nil' | binary(),
fd = nil :: 'nil' | pid(),
set_name = <<>> :: binary(),
name = <<>> :: binary(),
design_options = [] :: [any()],
views = [] :: [#set_view{}],
id_btree = nil :: 'nil' | #btree{},
ref_counter = nil :: 'nil' | pid(),
index_header = nil :: 'nil' | #set_view_index_header{},
db_set = nil :: 'nil' | pid(),
type = main :: set_view_group_type(),
replica_group = nil :: 'nil' | #set_view_group{},
replica_pid = nil :: 'nil' | pid(),
debug_info = nil :: #set_view_debug_info{} | 'nil',
filepath = "" :: string()
sig = binary:copy(<<0>>, 16) :: <<_:128>>,
fd = nil :: 'nil' | pid(),
set_name = <<>> :: binary(),
name = <<>> :: binary(),
design_options = [] :: [any()],
views = [] :: [#set_view{}],
id_btree = nil :: 'nil' | #btree{},
ref_counter = nil :: 'nil' | pid(),
index_header = #set_view_index_header{} :: #set_view_index_header{},
db_set = nil :: 'nil' | pid(),
type = main :: set_view_group_type(),
replica_group = nil :: 'nil' | #set_view_group{},
replica_pid = nil :: 'nil' | pid(),
debug_info = nil :: #set_view_debug_info{} | 'nil',
filepath = "" :: string()
}).

-record(set_view_updater_result, {
Expand Down
4 changes: 2 additions & 2 deletions src/couch_set_view/src/couch_set_view_compactor.erl
Expand Up @@ -198,8 +198,8 @@ maybe_retry_compact(CompactResult0, StartTime, LogFilePath, LogOffsetStart, Owne
type = Type,
fd = Fd
} = NewGroup,
DiskHeader = couch_set_view_util:make_disk_header(NewGroup),
ok = couch_file:write_header(Fd, DiskHeader),
HeaderBin = couch_set_view_util:group_to_header_bin(NewGroup),
ok = couch_file:write_header_bin(Fd, HeaderBin),
ok = couch_file:sync(Fd),
CompactResult = CompactResult0#set_view_compactor_result{
compact_time = timer:now_diff(os:timestamp(), StartTime) / 1000000
Expand Down
42 changes: 25 additions & 17 deletions src/couch_set_view/src/couch_set_view_group.erl
Expand Up @@ -48,7 +48,7 @@
-define(group_id(State), (State#state.group)#set_view_group.name).
-define(db_set(State), (State#state.group)#set_view_group.db_set).
-define(is_defined(State),
is_integer(((State#state.group)#set_view_group.index_header)#set_view_index_header.num_partitions)).
(((State#state.group)#set_view_group.index_header)#set_view_index_header.num_partitions > 0)).
-define(replicas_on_transfer(State),
((State#state.group)#set_view_group.index_header)#set_view_index_header.replicas_on_transfer).
-define(have_pending_transition(State),
Expand Down Expand Up @@ -317,7 +317,7 @@ do_init({_, SetName, _} = InitArgs) ->
ReplicaParts = get_replica_partitions(ReplicaPid)
end,
ViewCount = length(Group#set_view_group.views),
case is_integer(Header#set_view_index_header.num_partitions) of
case Header#set_view_index_header.num_partitions > 0 of
false ->
DbSet = nil,
?LOG_INFO("Started undefined ~s set view group `~s`, group `~s`,"
Expand Down Expand Up @@ -1266,9 +1266,16 @@ prepare_group({RootDir, SetName, #set_view_group{sig = Sig, type = Type} = Group
% this can happen if we missed a purge
{ok, reset_file(Fd, Group)};
true ->
case (catch couch_file:read_header(Fd)) of
{ok, {Sig, HeaderInfo}} ->
% sigs match!
case (catch couch_file:read_header_bin(Fd)) of
{ok, HeaderBin} ->
HeaderSig = couch_set_view_util:header_bin_sig(HeaderBin);
_ ->
HeaderSig = <<>>,
HeaderBin = <<>>
end,
case HeaderSig == Sig of
true ->
HeaderInfo = couch_set_view_util:header_bin_to_term(HeaderBin),
{ok, init_group(Fd, Group, HeaderInfo)};
_ ->
% this happens on a new file
Expand Down Expand Up @@ -1540,27 +1547,28 @@ reset_group(#set_view_group{views = Views} = Group) ->
Views2 = [View#set_view{btree = nil} || View <- Views],
Group#set_view_group{
fd = nil,
index_header = nil,
index_header = #set_view_index_header{},
id_btree = nil,
views = Views2
}.


-spec reset_file(pid(), #set_view_group{}) -> #set_view_group{}.
reset_file(Fd, #set_view_group{sig = Sig, index_header = Header} = Group) ->
reset_file(Fd, #set_view_group{views = Views, index_header = Header} = Group) ->
ok = couch_file:truncate(Fd, 0),
ok = couch_file:write_header(Fd, {Sig, nil}),
init_group(Fd, reset_group(Group), Header).
EmptyHeader = Header#set_view_index_header{
view_states = [nil || _ <- Views],
id_btree_state = nil
},
EmptyGroup = Group#set_view_group{index_header = EmptyHeader},
EmptyHeaderBin = couch_set_view_util:group_to_header_bin(EmptyGroup),
ok = couch_file:write_header_bin(Fd, EmptyHeaderBin),
init_group(Fd, reset_group(EmptyGroup), EmptyHeader).


-spec init_group(pid(),
#set_view_group{},
'nil' | #set_view_index_header{}) -> #set_view_group{}.
init_group(Fd, #set_view_group{views = Views} = Group, nil) ->
EmptyHeader = #set_view_index_header{
view_states = [nil || _ <- Views]
},
init_group(Fd, Group, EmptyHeader);
#set_view_index_header{}) -> #set_view_group{}.
init_group(Fd, Group, IndexHeader) ->
#set_view_group{
views = Views0,
Expand Down Expand Up @@ -1665,8 +1673,8 @@ init_group(Fd, Group, IndexHeader) ->

-spec commit_header(#set_view_group{}) -> 'ok'.
commit_header(Group) ->
Header = couch_set_view_util:make_disk_header(Group),
ok = couch_file:write_header(Group#set_view_group.fd, Header),
HeaderBin = couch_set_view_util:group_to_header_bin(Group),
ok = couch_file:write_header_bin(Group#set_view_group.fd, HeaderBin),
ok = couch_file:sync(Group#set_view_group.fd).

-spec filter_out_bitmask_partitions(ordsets:ordset(partition_id()),
Expand Down
4 changes: 2 additions & 2 deletions src/couch_set_view/src/couch_set_view_updater.erl
Expand Up @@ -955,8 +955,8 @@ checkpoint(#writer_acc{owner = Owner, parent = Parent, group = Group}, DoFsync)


write_header(#set_view_group{fd = Fd} = Group, DoFsync) ->
DiskHeader = couch_set_view_util:make_disk_header(Group),
ok = couch_file:write_header(Fd, DiskHeader),
HeaderBin = couch_set_view_util:group_to_header_bin(Group),
ok = couch_file:write_header_bin(Fd, HeaderBin),
case DoFsync of
true ->
ok = couch_file:sync(Fd);
Expand Down
190 changes: 172 additions & 18 deletions src/couch_set_view/src/couch_set_view_util.erl
Expand Up @@ -18,14 +18,14 @@
-export([make_key_options/1]).
-export([design_doc_to_set_view_group/2, get_ddoc_ids_with_sig/2]).
-export([open_raw_read_fd/1, close_raw_read_fd/1]).
-export([make_disk_header/1]).
-export([compute_indexed_bitmap/1, cleanup_group/1]).
-export([missing_changes_count/2]).
-export([is_group_empty/1]).
-export([new_sort_file_path/1, delete_sort_files/1]).
-export([encode_key_docid/2, decode_key_docid/1, split_key_docid/1]).
-export([parse_values/1, parse_reductions/1, parse_view_id_keys/1]).
-export([split_set_db_name/1]).
-export([group_to_header_bin/1, header_bin_sig/1, header_bin_to_term/1]).


-include("couch_db.hrl").
Expand Down Expand Up @@ -268,23 +268,6 @@ close_raw_read_fd(#set_view_group{fd = FilePid}) ->
end.


-spec make_disk_header(#set_view_group{}) ->
{Signature::binary(), #set_view_index_header{}}.
make_disk_header(Group) ->
#set_view_group{
sig = Sig,
id_btree = IdBtree,
views = Views,
index_header = Header
} = Group,
ViewStates = [couch_btree:get_state(V#set_view.btree) || V <- Views],
Header2 = Header#set_view_index_header{
id_btree_state = couch_btree:get_state(IdBtree),
view_states = ViewStates
},
{Sig, Header2}.


-spec compute_indexed_bitmap(#set_view_group{}) -> bitmap().
compute_indexed_bitmap(#set_view_group{id_btree = IdBtree, views = Views}) ->
compute_indexed_bitmap(IdBtree, Views).
Expand Down Expand Up @@ -419,3 +402,174 @@ split_set_db_name(DbName) ->
_ ->
error
end.


-spec group_to_header_bin(#set_view_group{}) -> binary().
group_to_header_bin(#set_view_group{index_header = Header, sig = Sig}) ->
#set_view_index_header{
version = Version,
num_partitions = NumParts,
abitmask = Abitmask,
pbitmask = Pbitmask,
cbitmask = Cbitmask,
seqs = Seqs,
id_btree_state = IdBtreeState,
view_states = ViewBtreeStates,
has_replica = HasReplica,
replicas_on_transfer = RepsOnTransfer,
pending_transition = PendingTrans,
unindexable_seqs = Unindexable
} = Header,
ViewBtreeStatesBin = lists:foldl(
fun(BtState, Acc) ->
<<Acc/binary, (btree_state_to_bin(BtState))/binary>>
end,
<<>>, ViewBtreeStates),
Base = <<
Version:8,
NumParts:16,
Abitmask:?MAX_NUM_PARTITIONS,
Pbitmask:?MAX_NUM_PARTITIONS,
Cbitmask:?MAX_NUM_PARTITIONS,
(length(Seqs)):16, (seqs_to_bin(Seqs, <<>>))/binary,
(btree_state_to_bin(IdBtreeState))/binary,
(length(ViewBtreeStates)):8, ViewBtreeStatesBin/binary,
(bool_to_bin(HasReplica))/binary,
(length(RepsOnTransfer)):16, (partitions_to_bin(RepsOnTransfer, <<>>))/binary,
(pending_trans_to_bin(PendingTrans))/binary,
(length(Unindexable)):16, (seqs_to_bin(Unindexable, <<>>))/binary
>>,
<<Sig/binary, (couch_compress:compress(Base))/binary>>.


-spec header_bin_sig(binary()) -> binary().
header_bin_sig(<<Sig:16/binary, _/binary>>) ->
% signature is a md5 digest, always 16 bytes
Sig.


-spec header_bin_to_term(binary()) -> #set_view_index_header{}.
header_bin_to_term(HeaderBin) ->
<<_Signature:16/binary, HeaderBaseCompressed/binary>> = HeaderBin,
Base = couch_compress:decompress(HeaderBaseCompressed),
<<
Version:8,
NumParts:16,
Abitmask:?MAX_NUM_PARTITIONS,
Pbitmask:?MAX_NUM_PARTITIONS,
Cbitmask:?MAX_NUM_PARTITIONS,
NumSeqs:16,
Rest/binary
>> = Base,
{Seqs, Rest2} = bin_to_seqs(NumSeqs, Rest, []),
<<
IdBtreeStateSize:16,
IdBtreeStateBin:IdBtreeStateSize/binary,
NumViewBtreeStates:8,
Rest3/binary
>> = Rest2,
IdBtreeState = case IdBtreeStateBin of
<<>> ->
nil;
_ ->
IdBtreeStateBin
end,
{ViewStates, Rest4} = bin_to_view_states(NumViewBtreeStates, Rest3, []),
<<
HasReplica:8,
NumReplicasOnTransfer:16,
Rest5/binary
>> = Rest4,
{ReplicasOnTransfer, Rest6} = bin_to_partitions(NumReplicasOnTransfer, Rest5, []),
{PendingTrans, Rest7} = bin_to_pending_trans(Rest6),
<<
UnindexableCount:16,
Rest8/binary
>> = Rest7,
{Unindexable, <<>>} = bin_to_seqs(UnindexableCount, Rest8, []),
#set_view_index_header{
version = Version,
num_partitions = NumParts,
abitmask = Abitmask,
pbitmask = Pbitmask,
cbitmask = Cbitmask,
seqs = Seqs,
id_btree_state = IdBtreeState,
view_states = ViewStates,
has_replica = case HasReplica of 1 -> true; 0 -> false end,
replicas_on_transfer = ReplicasOnTransfer,
pending_transition = PendingTrans,
unindexable_seqs = Unindexable
}.


btree_state_to_bin(nil) ->
<<0:16>>;
btree_state_to_bin(BinState) ->
StateSize = byte_size(BinState),
case StateSize >= (1 bsl 16) of
true ->
throw({too_large_btree_state, StateSize});
false ->
<<StateSize:16, BinState/binary>>
end.


bool_to_bin(true) ->
<<1:8>>;
bool_to_bin(false) ->
<<0:8>>.


seqs_to_bin([], Acc) ->
Acc;
seqs_to_bin([{P, S} | Rest], Acc) ->
seqs_to_bin(Rest, <<Acc/binary, P:16, S:48>>).


partitions_to_bin([], Acc) ->
Acc;
partitions_to_bin([P | Rest], Acc) ->
partitions_to_bin(Rest, <<Acc/binary, P:16>>).


pending_trans_to_bin(nil) ->
<<0:16, 0:16>>;
pending_trans_to_bin(#set_view_transition{active = A, passive = P}) ->
<<(length(A)):16, (partitions_to_bin(A, <<>>))/binary,
(length(P)):16, (partitions_to_bin(P, <<>>))/binary>>.


bin_to_pending_trans(<<NumActive:16, Rest/binary>>) ->
{Active, Rest2} = bin_to_partitions(NumActive, Rest, []),
<<NumPassive:16, Rest3/binary>> = Rest2,
{Passive, Rest4} = bin_to_partitions(NumPassive, Rest3, []),
case (Active == []) andalso (Passive == []) of
true ->
{nil, Rest4};
false ->
{#set_view_transition{active = Active, passive = Passive}, Rest4}
end.


bin_to_seqs(0, Rest, Acc) ->
{lists:reverse(Acc), Rest};
bin_to_seqs(N, <<P:16, S:48, Rest/binary>>, Acc) ->
bin_to_seqs(N - 1, Rest, [{P, S} | Acc]).


bin_to_view_states(0, Rest, Acc) ->
{lists:reverse(Acc), Rest};
bin_to_view_states(NumViewBtreeStates, <<Sz:16, State:Sz/binary, Rest/binary>>, Acc) ->
case State of
<<>> ->
bin_to_view_states(NumViewBtreeStates - 1, Rest, [nil | Acc]);
_ ->
bin_to_view_states(NumViewBtreeStates - 1, Rest, [State | Acc])
end.


bin_to_partitions(0, Rest, Acc) ->
{lists:reverse(Acc), Rest};
bin_to_partitions(Count, <<P:16, Rest/binary>>, Acc) ->
bin_to_partitions(Count - 1, Rest, [P | Acc]).

0 comments on commit 8aae876

Please sign in to comment.