Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #13 from basho/az608-use-less-memory

More sane buffer conversion with better memory usage
  • Loading branch information...
commit 545cabf25aca73920b641d8437bad6667d9bfadc 2 parents f99596a + adfe3d7
@rzezeski rzezeski authored
View
4 include/merge_index.hrl
@@ -1,7 +1,3 @@
--define(PRINT(Var), error_logger:info_msg("DEBUG: ~p:~p~n~p~n ~p~n", [?MODULE, ?LINE, ??Var, Var])).
--define(TIMEON, erlang:put(debug_timer, [now()|case erlang:get(debug_timer) == undefined of true -> []; false -> erlang:get(debug_timer) end])).
--define(TIMEOFF(Var, Count), io:format("~s :: ~p @ ~10.2fms (~10.2f total) : ~p : ~p~n", [string:copies(" ", length(erlang:get(debug_timer))), Count, (timer:now_diff(now(), hd(erlang:get(debug_timer)))/1000/(Count+1)),(timer:now_diff(now(), hd(erlang:get(debug_timer)))/1000), ??Var, Var]), erlang:put(debug_timer, tl(erlang:get(debug_timer)))).
-
-record(segment,{root,
offsets_table,
size}).
View
10 src/mi_buffer.erl
@@ -26,6 +26,8 @@
-export([
new/1,
filename/1,
+ id/1,
+ exists/1,
close_filehandle/1,
delete/1,
filesize/1,
@@ -73,8 +75,12 @@ open_inner(FH, Table, Filename) ->
ok
end.
-filename(Buffer) ->
- Buffer#buffer.filename.
+filename(Buffer) -> Buffer#buffer.filename.
+
+id(#buffer{filename=Filename}) -> id(Filename);
+id(Filename) -> list_to_integer(tl(filename:extension(Filename))).
+
+exists(#buffer{table=Table}) -> ets:info(Table) /= undefined.
delete(Buffer=#buffer{table=Table, filename=Filename}) ->
ets:delete(Table),
View
157 src/mi_buffer_converter.erl
@@ -20,110 +20,32 @@
%% under the License.
%%
%% -------------------------------------------------------------------
-
-%% This module is a supervisor and gen_sever rolled into one. The
-%% supervisor's sole purpose is to monitor the one gen_server it owns,
-%% and attempt a couple of restarts, or blow up if restarts happen too
-%% fast.
-%%
-%% The supervisor is started by mi_server, and the gen_server alerts
-%% mi_server to its presence. mi_server talks directly to the worker
-%% using the convert/3 function. mi_server is linked to the supervisor,
-%% so it gets an EXIT message only after the gen_server has reached its
-%% max restart limit.
-module(mi_buffer_converter).
-%-behaviour(supervisor). % not actually conflicting...
--behaviour(gen_server).
-
%% API
--export([start_link/2, convert/3]).
+-export([start_link/3, convert/3]).
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
-%% private callbacks
--export([start_worker/2]).
-
--record(state, {mi_server, mi_root}).
+%% callbacks
+-export([convert/4]).
%%====================================================================
%% API
%%====================================================================
-%%--------------------------------------------------------------------
-%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
-%% Description: Starts the server
-%%--------------------------------------------------------------------
-start_link(MIServerPid, MIServerRoot) ->
- supervisor:start_link(
- ?MODULE, [supervisor, MIServerPid, MIServerRoot]).
-%% @private
-start_worker(MIServerPid, MIServerRoot) ->
- gen_server:start_link(
- ?MODULE, [gen_server, MIServerPid, MIServerRoot], []).
+start_link(Server, Root, Buffer) ->
+ proc_lib:start_link(?MODULE, convert, [self(), Server, Root, Buffer], 1000).
-convert(undefined, _Root, _Buffer) ->
- %% mi_server tried to convert a buffer before
- %% it had a registered converter: ignore
- ok;
-convert(Converter, Root, Buffer) ->
- gen_server:cast(Converter, {convert, Root, Buffer}).
+convert(Server, Root, Buffer) ->
+ mi_buffer_converter_sup:start_child(Server, Root, Buffer).
%%====================================================================
-%% gen_server callbacks
+%% Callbacks
%%====================================================================
-%%--------------------------------------------------------------------
-%% Func: init(Args) -> {ok, {SupFlags, [ChildSpec]}} |
-%% ignore |
-%% {error, Reason}
-%% Description: Whenever a supervisor is started using
-%% supervisor:start_link/[2,3], this function is called by the new process
-%% to find out about restart strategy, maximum restart frequency and child
-%% specifications.
-%%--------------------------------------------------------------------
-init([supervisor, MIServerPid, MIServerRoot]) ->
- AChild = {buffer_converter_worker,
- {mi_buffer_converter,start_worker,
- [MIServerPid, MIServerRoot]},
- permanent,2000,worker,[mi_buffer_converter]},
- {ok,{{one_for_all,10,1}, [AChild]}};
-%%--------------------------------------------------------------------
-%% Function: init(Args) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% Description: Initiates the server
-%%--------------------------------------------------------------------
-init([gen_server, MIServerPid, MIServerRoot]) ->
- mi_server:register_buffer_converter(MIServerPid, self()),
- {ok, #state{mi_server=MIServerPid, mi_root=MIServerRoot}}.
-
-%%--------------------------------------------------------------------
-%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
-%% {reply, Reply, State, Timeout} |
-%% {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, Reply, State} |
-%% {stop, Reason, State}
-%% Description: Handling call messages
-%%--------------------------------------------------------------------
-handle_call(_Request, _From, State) ->
- Reply = ok,
- {reply, Reply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_cast(Msg, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling cast messages
-%%--------------------------------------------------------------------
-handle_cast({convert, Root, Buffer}, #state{mi_root=Root}=State) ->
- %% Calculate the segment filename, open the segment, and convert.
+convert(Parent, Server, Root, Buffer) ->
+ proc_lib:init_ack(Parent, {ok, self()}),
try
- SNum = mi_server:get_id_number(mi_buffer:filename(Buffer)),
+ SNum = mi_buffer:id(Buffer),
SName = filename:join(Root, "segment." ++ integer_to_list(SNum)),
case mi_server:has_deleteme_flag(SName) of
@@ -136,44 +58,21 @@ handle_cast({convert, Root, Buffer}, #state{mi_root=Root}=State) ->
end,
SegmentWO = mi_segment:open_write(SName),
mi_segment:from_buffer(Buffer, SegmentWO),
- mi_server:buffer_to_segment(State#state.mi_server, Buffer, SegmentWO),
- {noreply, State}
+ mi_server:buffer_to_segment(Server, Buffer, SegmentWO),
+ exit(normal)
catch
- error:badarg ->
- lager:warning("`convert` attempted to work with a"
- " nonexistent buffer, probably because"
- " drop was called"),
- {noreply, State}
- end;
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_info(Info, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling all non call/cast messages
-%%--------------------------------------------------------------------
-handle_info(_Info, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: terminate(Reason, State) -> void()
-%% Description: This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
- ok.
-
-%%--------------------------------------------------------------------
-%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% Description: Convert process state when code is changed
-%%--------------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
+ error:Reason ->
+ case mi_buffer:exists(Buffer) of
+ false ->
+ lager:warning("conversion for buffer ~p failed, probably"
+ " because the buffer has been dropped ~p",
+ [mi_buffer:filename(Buffer),
+ erlang:get_stacktrace()]),
+ exit(normal);
+ true ->
+ lager:error("conversion for buffer ~p failed with trace ~p",
+ [mi_buffer:filename(Buffer),
+ erlang:get_stacktrace()]),
+ exit({error, Reason})
+ end
+ end.
View
37 src/mi_buffer_converter_sup.erl
@@ -0,0 +1,37 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-module(mi_buffer_converter_sup).
+-behaviour(supervisor).
+
+-export([init/1,
+ start_link/0,
+ start_child/3]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+start_child(Server, Root, Buffer) ->
+ supervisor:start_child(mi_buffer_converter_sup, [Server, Root, Buffer]).
+
+init([]) ->
+ Spec = {ignored,
+ {mi_buffer_converter, start_link, []},
+ transient, 1000, worker, [mi_buffer_converter]},
+ {ok, {{simple_one_for_one, 10, 1}, [Spec]}}.
View
6 src/mi_scheduler.erl
@@ -81,11 +81,11 @@ handle_call({schedule_compaction, Pid}, _From, #state { queue = Q } = State) ->
end;
handle_call(Event, _From, State) ->
- ?PRINT({unhandled_call, Event}),
+ lager:error("unhandled_call ~p", [Event]),
{reply, ok, State}.
handle_cast(Msg, State) ->
- ?PRINT({unhandled_cast, Msg}),
+ lager:error("unhandled_cast ~p", [Msg]),
{noreply, State}.
handle_info({worker_ready, WorkerPid}, #state { queue = Q } = State) ->
@@ -106,7 +106,7 @@ handle_info({'EXIT', WorkerPid, Reason}, #state { worker = WorkerPid } = State)
{noreply, NewState};
handle_info(Info, State) ->
- ?PRINT({unhandled_info, Info}),
+ lager:error("unhandled_info ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
View
137 src/mi_server.erl
@@ -29,14 +29,12 @@
compact/1,
drop/1,
fold/3,
- get_id_number/1,
has_deleteme_flag/1,
index/2,
info/4,
is_empty/1,
lookup/5,
range/7,
- register_buffer_converter/2,
set_deleteme_flag/1,
start_link/1,
stop/1,
@@ -59,9 +57,10 @@
buffers,
next_id,
is_compacting,
- buffer_converter,
lookup_range_pids,
- buffer_rollover_size
+ buffer_rollover_size,
+ converter,
+ to_convert
}).
-record(stream_range, {
@@ -97,37 +96,6 @@ is_empty(Server) -> gen_server:call(Server, is_empty, infinity).
fold(Server, Fun, Acc) -> gen_server:call(Server, {fold, Fun, Acc}, infinity).
-%% Return the ID number of a Segment/Buffer/Filename...
-%% Files can be named:
-%% - buffer.N
-%% - segment.N
-%% - segment.N.data
-%% - segment.M-N
-%% - segment.M-N.data
-get_id_number(Segment) when element(1, Segment) == segment ->
- Filename = mi_segment:filename(Segment),
- get_id_number(Filename);
-get_id_number(Buffer) when element(1, Buffer) == buffer ->
- Filename = mi_buffer:filename(Buffer),
- get_id_number(Filename);
-get_id_number(Filename) ->
- case string:chr(Filename, $-) == 0 of
- true ->
- %% Handle buffer.N, segment.N, segment.N.data
- case string:tokens(Filename, ".") of
- [_, N] -> ok;
- [_, N, _] -> ok
- end,
- list_to_integer(N);
- false ->
- %% Handle segment.M-N, segment.M-N.data
- case string:tokens(Filename, ".-") of
- [_, M, N] -> ok;
- [_, M, N, _] -> ok
- end,
- [list_to_integer(M), list_to_integer(N)]
- end.
-
lookup(Server, Index, Field, Term, Filter) ->
Ref = make_ref(),
ok = gen_server:call(Server,
@@ -143,9 +111,6 @@ range(Server, Index, Field, StartTerm, EndTerm, Size, Filter) ->
infinity),
{ok, Ref}.
-register_buffer_converter(Server, ConverterPid) ->
- gen_server:cast(Server, {register_buffer_converter, ConverterPid}).
-
set_deleteme_flag(Filename) ->
file:write_file(Filename ++ ?DELETEME_FLAG, "").
@@ -173,9 +138,6 @@ init([Root]) ->
%% don't pull down this merge_index if they fail
process_flag(trap_exit, true),
- %% Use a dedicated worker sub-process to do the
- {ok, ConverterSup} = mi_buffer_converter:start_link(self(), Root),
-
%% Create the state...
State = #state {
root = Root,
@@ -184,20 +146,20 @@ init([Root]) ->
segments = Segments,
next_id = NextID,
is_compacting = false,
- buffer_converter = {ConverterSup, undefined},
lookup_range_pids = [],
- buffer_rollover_size=fuzzed_rollover_size()
+ buffer_rollover_size=fuzzed_rollover_size(),
+ to_convert = queue:new()
},
lager:info("finished loading merge_index '~s' with rollover size ~p",
[Root, State#state.buffer_rollover_size]),
{ok, State}.
-
handle_call({index, Postings}, _From, State) ->
%% Write to the buffer...
#state { buffers=[CurrentBuffer0|Buffers],
- buffer_converter={_,ConverterWorker},
+ converter=Converter,
+ to_convert=ToConvert,
root=Root} = State,
%% By multiplying the timestamp by -1 and swapping order of TS and
@@ -224,9 +186,18 @@ handle_call({index, Postings}, _From, State) ->
%% Close the buffer filehandle. Needs to be done in the owner process.
mi_buffer:close_filehandle(CurrentBuffer),
-
- mi_buffer_converter:convert(
- ConverterWorker, Root, CurrentBuffer),
+
+ ToConvert2 = queue:in(CurrentBuffer, ToConvert),
+ {Converter2, ToConvert4} =
+ case Converter of
+ undefined ->
+ {Next, ToConvert3} = next_buffer_to_convert(ToConvert2),
+ {ok, Pid} = mi_buffer_converter:convert(self(),
+ Root,
+ Next),
+ {Pid, ToConvert3};
+ _ -> {Converter, ToConvert2}
+ end,
%% Create a new empty buffer...
BName = join(NewState, "buffer." ++ integer_to_list(NextID)),
@@ -235,7 +206,9 @@ handle_call({index, Postings}, _From, State) ->
NewState1 = NewState#state {
buffers=[NewBuffer|NewState#state.buffers],
next_id=NextID + 1,
- buffer_rollover_size = fuzzed_rollover_size()
+ buffer_rollover_size = fuzzed_rollover_size(),
+ converter = Converter2,
+ to_convert = ToConvert4
},
{reply, ok, NewState1};
false ->
@@ -408,14 +381,16 @@ handle_call(drop, _From, State) ->
Buffer = mi_buffer:new(BufferFile),
NewState = State#state { locks = mi_locks:new(),
buffers = [Buffer],
- segments = [] },
+ segments = [],
+ converter = undefined,
+ to_convert = queue:new()},
{reply, ok, NewState};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
-handle_call(Request, _From, State) ->
- ?PRINT({unhandled_call, Request}),
+handle_call(Msg, _From, State) ->
+ lager:error("Unexpected call ~p", [Msg]),
{reply, ok, State}.
handle_cast({compacted, CompactSegmentWO, OldSegments, OldBytes, From}, State) ->
@@ -447,7 +422,9 @@ handle_cast({compacted, CompactSegmentWO, OldSegments, OldBytes, From}, State) -
{noreply, NewState};
handle_cast({buffer_to_segment, Buffer, SegmentWO}, State) ->
- #state { locks=Locks, buffers=Buffers, segments=Segments, is_compacting=IsCompacting } = State,
+ #state { root=Root, locks=Locks, buffers=Buffers, segments=Segments,
+ to_convert=ToConvert,
+ is_compacting=IsCompacting } = State,
%% Clean up by clearing delete flag on the segment, adding delete
%% flag to the buffer, and telling the system to delete the buffer
@@ -465,12 +442,25 @@ handle_cast({buffer_to_segment, Buffer, SegmentWO}, State) ->
%% Open the segment as read only...
SegmentRO = mi_segment:open_read(mi_segment:filename(SegmentWO)),
+ {{value, _}, ToConvert2} = queue:out(ToConvert),
+ {Converter, ToConvert3} =
+ case next_buffer_to_convert(ToConvert2) of
+ {none, ToConvert2} -> {undefined, ToConvert2};
+ {Next, ToConvert2} ->
+ {ok, Pid} = mi_buffer_converter:convert(self(),
+ Root,
+ Next),
+ {Pid, ToConvert2}
+ end,
+
%% Update state...
NewSegments = [SegmentRO|Segments],
NewState = State#state {
locks=NewLocks,
buffers=Buffers -- [Buffer],
- segments=NewSegments
+ segments=NewSegments,
+ converter=Converter,
+ to_convert=ToConvert3
},
%% Give us the opportunity to do a merge...
@@ -489,21 +479,8 @@ handle_cast({buffer_to_segment, Buffer, SegmentWO}, State) ->
{noreply, State}
end;
-handle_cast({register_buffer_converter, ConverterWorker},
- #state{buffer_converter={ConverterSup,_},
- buffers=Buffers,
- root=Root}=State) ->
- %% a new buffer converter started - queue all buffers but the
- %% current one for conversion to segments
-
- %% current buffer is hd(Buffers), so just convert tl(Buffers)
- [ mi_buffer_converter:convert(ConverterWorker, Root, B)
- || B <- tl(Buffers) ],
-
- {noreply, State#state{buffer_converter={ConverterSup, ConverterWorker}}};
-
handle_cast(Msg, State) ->
- ?PRINT({unhandled_cast, Msg}),
+ lager:error("Unexpected cast ~p", [Msg]),
{noreply, State}.
handle_info({'EXIT', CompactingPid, Reason},
@@ -524,10 +501,6 @@ handle_info({'EXIT', CompactingPid, Reason},
%% clear out compaction flags, so we try again when necessary
{noreply, State#state{is_compacting=false}};
-handle_info({'EXIT', ConverterSup, Reason},
- #state{buffer_converter={ConverterSup, _}}=State) ->
- %% if our converter's supervisor died, there's a problem: exit
- {stop, {buffer_converter_death, Reason}, State};
handle_info({'EXIT', Pid, Reason},
#state{lookup_range_pids=SRPids}=State) ->
@@ -566,8 +539,8 @@ handle_info({'EXIT', Pid, Reason},
{noreply, State}
end;
-handle_info(Info, State) ->
- ?PRINT({unhandled_info, Info}),
+handle_info(Msg, State) ->
+ lager:error("Unexpected info ~p", [Msg]),
{noreply, State}.
terminate(_Reason, _State) ->
@@ -601,8 +574,7 @@ read_buf_and_seg(Root) ->
%% Get buffer files, calculate the next_id, load the buffers, turn
%% any extraneous buffers into segments...
BufferFiles = filelib:wildcard(join(Root, "buffer.*")),
- BufferFiles1 = lists:sort([{get_id_number(filename:basename(X)), X}
- || X <- BufferFiles]),
+ BufferFiles1 = lists:sort([{mi_buffer:id(X), X} || X <- BufferFiles]),
NextID = lists:max([X || {X, _} <- BufferFiles1] ++ [0]) + 1,
{NextID1, Buffer, Segments1} = read_buffers(Root, BufferFiles1, NextID, Segments),
@@ -748,3 +720,16 @@ join(Root, Name) ->
fuzzed_rollover_size() ->
ActualRolloverSize = element(2,application:get_env(merge_index, buffer_rollover_size)),
mi_utils:fuzz(ActualRolloverSize, 0.25).
+
+next_buffer_to_convert(Buffers) ->
+ case queue:peek(Buffers) of
+ {value, Next} ->
+ %% Buffer may have since been dropped via merge_index:drop
+ case mi_buffer:exists(Next) of
+ true -> {Next, Buffers};
+ false ->
+ {_, Buffers2} = queue:out(Buffers),
+ next_buffer_to_convert(Buffers2)
+ end;
+ empty -> {none, Buffers}
+ end.
View
5 src/mi_sup.erl
@@ -42,4 +42,7 @@ start_link() ->
%% ===================================================================
init([]) ->
- {ok, {{one_for_one, 5, 10}, [?CHILD(mi_scheduler, worker)]}}.
+ Scheduler = ?CHILD(mi_scheduler, worker),
+ Converter = ?CHILD(mi_buffer_converter_sup, supervisor),
+
+ {ok, {{one_for_one, 5, 10}, [Converter, Scheduler]}}.
View
4 test/merge_index_tests.erl
@@ -42,10 +42,12 @@ prop_api_test_() ->
prop_api() ->
application:load(merge_index),
application:start(sasl),
+ application:start(lager),
+
%% Comment out following lines to see error reports...otherwise
%% it's too much noise
error_logger:delete_report_handler(sasl_report_tty_h),
- error_logger:delete_report_handler(error_logger_tty_h),
+ lager:set_loglevel(lager_console_backend, critical),
?FORALL(Cmds, commands(?MODULE),
?TRAPEXIT(
Please sign in to comment.
Something went wrong with that request. Please try again.