Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

WIP: fix drop and iterator

  • Loading branch information...
commit f6d199e9e2c04a387ae64a4cac0b16c8d9698e3b 1 parent 5bcb7ca
@rzezeski rzezeski authored
View
1  src/mi_locks.erl
@@ -65,6 +65,7 @@ release(Key, Locks) ->
lists:keystore(Key, #lock.key, Locks, NewLock);
false ->
+ lager:error("couldn't release key ~p in ~p", [Key, Locks]),
throw({lock_does_not_exist, Key})
end.
View
8 src/mi_segment.erl
@@ -154,8 +154,12 @@ iterator(Segment) ->
false ->
%% Open a filehandle to the start of the segment.
{ok, ReadAheadSize} = application:get_env(merge_index, segment_compact_read_ahead_size),
- {ok, FH} = file:open(data_file(Segment), [read, raw, binary, {read_ahead, ReadAheadSize}]),
- fun() -> iterate_all_filehandle(FH, undefined, undefined) end
+
+ fun() ->
+ {ok, FH} = file:open(data_file(Segment), [read, raw, binary, {read_ahead, ReadAheadSize}]),
+
+ iterate_all_filehandle(FH, undefined, undefined)
+ end
end.
%% @private Create an iterator over a binary which represents the
View
27 src/mi_server.erl
@@ -71,7 +71,8 @@
caller,
ref,
buffers,
- segments
+ segments,
+ tag=fooey
}).
-define(RESULTVEC_SIZE, 1000).
@@ -363,14 +364,16 @@ handle_call({iterator, Filter, DestPid, DestRef}, _From, State) ->
SegmentItrs = [mi_segment:iterator(S) || S <- Segments],
Itr = build_iterator_tree(BufferItrs ++ SegmentItrs),
- Args = [Filter, DestPid, DestRef, Itr(), {[], 0}],
- ItrPid = spawn_link(?MODULE, iterate2, Args),
+ ItrPid = spawn_link(
+ fun() -> iterate2(Filter, DestPid, DestRef, Itr(), {[], 0}) end
+ ),
NewPids = [ #stream_range{pid=ItrPid,
caller=DestPid,
ref=DestRef,
buffers=Buffers,
- segments=Segments}
+ segments=Segments,
+ tag=iterator}
| State#state.lookup_range_pids ],
State2 = State#state{locks=NewLocks, lookup_range_pids=NewPids},
@@ -379,6 +382,7 @@ handle_call({iterator, Filter, DestPid, DestRef}, _From, State) ->
%% TODO what about resetting next_id?
handle_call(drop, _From, State) ->
#state { buffers=Buffers, segments=Segments } = State,
+ %% lager:info("A drop was called buffers: ~p segments: ~p", [Buffers, Segments]),
%% Delete files, reset state...
[mi_buffer:delete(X) || X <- Buffers],
@@ -389,6 +393,7 @@ handle_call(drop, _From, State) ->
buffers = [Buffer],
segments = [],
converter = undefined,
+ lookup_range_pids = [],
to_convert = queue:new()},
{reply, ok, NewState};
@@ -511,9 +516,15 @@ handle_info({'EXIT', CompactingPid, Reason},
handle_info({'EXIT', Pid, Reason},
#state{lookup_range_pids=SRPids}=State) ->
+ try
case lists:keytake(Pid, #stream_range.pid, SRPids) of
{value, SR, NewSRPids} ->
%% One of our lookup or range processes exited
+ %% if SR#stream_range.tag == iterator ->
+ %% lager:info("An iterator pid exited ~p with reason ~p", [SR, Reason]);
+ %% true ->
+ %% ok
+ %% end,
case Reason of
normal ->
@@ -537,13 +548,17 @@ handle_info({'EXIT', Pid, Reason},
end,
NewLocks1 = lists:foldl(F2, NewLocks,
SR#stream_range.segments),
-
{noreply, State#state { locks=NewLocks1,
lookup_range_pids=NewSRPids }};
false ->
%% some random other process exited: ignore
{noreply, State}
- end;
+ end
+ catch _:Reason ->
+ lager:error("caught reason: ~p~ntrace: ~p", [Reason, erlang:get_stacktrace()])
+ end;
+
+
handle_info(Msg, State) ->
lager:error("Unexpected info ~p", [Msg]),
View
4 test/merge_index_tests.erl
@@ -51,7 +51,7 @@ prop_api() ->
%% error_logger:delete_report_handler(sasl_report_tty_h),
%% lager:set_loglevel(lager_console_backend, critical),
- ?FORALL(Cmds, commands(?MODULE),
+ ?FORALL(Cmds, resize(40,commands(?MODULE)),
?TRAPEXIT(
begin
application:stop(merge_index),
@@ -100,7 +100,7 @@ command(S) ->
{call,?MODULE,range, [P, g_range_query(Postings), all]},
{call,?MODULE,range_sync, [P, g_range_query(Postings), all]},
{call,?MODULE,iterator, [P]},
- %% {call,?MODULE,drop, [P]},
+ {call,?MODULE,drop, [P]},
{call,?MODULE,compact, [P]}]).
next_state(S, Pid, {call,_,init,_}) ->
Please sign in to comment.
Something went wrong with that request. Please try again.