Skip to content
Browse files

Make converter worker transient so it restarts itself

Get rid of my whacky link/trap exit/restart code and let the supervisor do that
since that's what it's meant for.
  • Loading branch information...
1 parent a4ec187 commit adfe3d7ffff36a9eda4bd1ac468d02a72b3184cf @rzezeski rzezeski committed Sep 1, 2011
Showing with 44 additions and 34 deletions.
  1. +3 −0 src/mi_buffer.erl
  2. +14 −5 src/mi_buffer_converter.erl
  3. +2 −2 src/mi_buffer_converter_sup.erl
  4. +25 −27 src/mi_server.erl
View
3 src/mi_buffer.erl
@@ -27,6 +27,7 @@
new/1,
filename/1,
id/1,
+ exists/1,
close_filehandle/1,
delete/1,
filesize/1,
@@ -79,6 +80,8 @@ filename(Buffer) -> Buffer#buffer.filename.
id(#buffer{filename=Filename}) -> id(Filename);
id(Filename) -> list_to_integer(tl(filename:extension(Filename))).
+exists(#buffer{table=Table}) -> ets:info(Table) /= undefined.
+
delete(Buffer=#buffer{table=Table, filename=Filename}) ->
ets:delete(Table),
close_filehandle(Buffer),
View
19 src/mi_buffer_converter.erl
@@ -61,9 +61,18 @@ convert(Parent, Server, Root, Buffer) ->
mi_server:buffer_to_segment(Server, Buffer, SegmentWO),
exit(normal)
catch
- error:badarg ->
- lager:warning("`convert` attempted to work with a"
- " nonexistent buffer, probably because"
- " drop was called ~p", [erlang:get_stacktrace()]),
- exit(buffer_dropped)
+ error:Reason ->
+ case mi_buffer:exists(Buffer) of
+ false ->
+ lager:warning("conversion for buffer ~p failed, probably"
+ " because the buffer has been dropped ~p",
+ [mi_buffer:filename(Buffer),
+ erlang:get_stacktrace()]),
+ exit(normal);
+ true ->
+ lager:error("conversion for buffer ~p failed with trace ~p",
+ [mi_buffer:filename(Buffer),
+ erlang:get_stacktrace()]),
+ exit({error, Reason})
+ end
end.
View
4 src/mi_buffer_converter_sup.erl
@@ -31,7 +31,7 @@ start_child(Server, Root, Buffer) ->
supervisor:start_child(mi_buffer_converter_sup, [Server, Root, Buffer]).
init([]) ->
- Spec = {undefined,
+ Spec = {ignored,
{mi_buffer_converter, start_link, []},
- temporary, 1000, worker, [mi_buffer_converter]},
+ transient, 1000, worker, [mi_buffer_converter]},
{ok, {{simple_one_for_one, 10, 1}, [Spec]}}.
View
52 src/mi_server.erl
@@ -188,15 +188,15 @@ handle_call({index, Postings}, _From, State) ->
mi_buffer:close_filehandle(CurrentBuffer),
ToConvert2 = queue:in(CurrentBuffer, ToConvert),
- Converter2 =
+ {Converter2, ToConvert4} =
case Converter of
undefined ->
- {value, Buff} = queue:peek(ToConvert2),
+ {Next, ToConvert3} = next_buffer_to_convert(ToConvert2),
{ok, Pid} = mi_buffer_converter:convert(self(),
Root,
- Buff),
- Pid;
- _ -> Converter
+ Next),
+ {Pid, ToConvert3};
+ _ -> {Converter, ToConvert2}
end,
%% Create a new empty buffer...
@@ -208,7 +208,7 @@ handle_call({index, Postings}, _From, State) ->
next_id=NextID + 1,
buffer_rollover_size = fuzzed_rollover_size(),
converter = Converter2,
- to_convert = ToConvert2
+ to_convert = ToConvert4
},
{reply, ok, NewState1};
false ->
@@ -443,13 +443,13 @@ handle_cast({buffer_to_segment, Buffer, SegmentWO}, State) ->
SegmentRO = mi_segment:open_read(mi_segment:filename(SegmentWO)),
{{value, _}, ToConvert2} = queue:out(ToConvert),
- {Converter, ToConverter3} =
- case queue:peek(ToConvert2) of
- empty -> {undefined, ToConvert2};
- {value, Buff} ->
+ {Converter, ToConvert3} =
+ case next_buffer_to_convert(ToConvert2) of
+ {none, ToConvert2} -> {undefined, ToConvert2};
+ {Next, ToConvert2} ->
{ok, Pid} = mi_buffer_converter:convert(self(),
Root,
- Buff),
+ Next),
{Pid, ToConvert2}
end,
@@ -460,7 +460,7 @@ handle_cast({buffer_to_segment, Buffer, SegmentWO}, State) ->
buffers=Buffers -- [Buffer],
segments=NewSegments,
converter=Converter,
- to_convert=ToConverter3
+ to_convert=ToConvert3
},
%% Give us the opportunity to do a merge...
@@ -502,21 +502,6 @@ handle_info({'EXIT', CompactingPid, Reason},
%% clear out compaction flags, so we try again when necessary
{noreply, State#state{is_compacting=false}};
-handle_info({'EXIT', Converter, Reason},
- #state{root=Root,
- converter=Converter, to_convert=ToConvert}=State) ->
- case Reason of
- normal -> {noreply, State};
- buffer_dropped -> {noreply, State};
- _ ->
- {value, Buff} = queue:peek(ToConvert),
- lager:error("Converter ~p crashed while converting buffer ~p",
- [Converter, Buff]),
- lager:error("Restarting conversion of buffer ~p", [Buff]),
- {ok, Pid} = mi_buffer_converter:convert(self(), Root, Buff),
- {noreply, State#state{converter=Pid}}
- end;
-
handle_info({'EXIT', Pid, Reason},
#state{lookup_range_pids=SRPids}=State) ->
@@ -735,3 +720,16 @@ join(Root, Name) ->
fuzzed_rollover_size() ->
ActualRolloverSize = element(2,application:get_env(merge_index, buffer_rollover_size)),
mi_utils:fuzz(ActualRolloverSize, 0.25).
+
+next_buffer_to_convert(Buffers) ->
+ case queue:peek(Buffers) of
+ {value, Next} ->
+ %% Buffer may have since been dropped via merge_index:drop
+ case mi_buffer:exists(Next) of
+ true -> {Next, Buffers};
+ false ->
+ {_, Buffers2} = queue:out(Buffers),
+ next_buffer_to_convert(Buffers2)
+ end;
+ empty -> {none, Buffers}
+ end.

0 comments on commit adfe3d7

Please sign in to comment.
Something went wrong with that request. Please try again.