Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Update the file size when updated by external writer.

Make sure that whenever we are updated by an external writer and
asked to read a new header position, we also update to the file size.

Also, disable the writer to prevent couchdb from also updating the
database file, since 2 process should not write to the file.

Change-Id: I3862cb0306a4dd204f2fb1906e592943f1e1872d
Reviewed-on: http://review.couchbase.org/13604
Tested-by: Damien Katz <damien@couchbase.com>
Reviewed-by: Damien Katz <damien@couchbase.com>
  • Loading branch information...
commit fc4f6a988c4c73374411e361952ec4afd249fd5e 1 parent 9a05b78
@Damienkatz Damienkatz authored
View
2  src/couchdb/couch_db_updater.erl
@@ -192,6 +192,8 @@ handle_call({update_header_pos, FileVersion, NewPos}, _From, Db) ->
if FileVersion == ExistingFileVersion ->
case couch_file:read_header(Db#db.fd, NewPos) of
{ok, NewHeader} ->
+ % disable any more writes, as we are being updated externally!
+ ok = couch_file:only_snapshot_reads(Db#db.fd),
if Db#db.update_seq > NewHeader#db_header.update_seq ->
{reply, update_behind_couchdb, Db};
true ->
View
20 src/couchdb/couch_file.erl
@@ -402,13 +402,21 @@ handle_call(find_header, From, #file{reader = Reader, eof = Eof} = File) ->
Reader ! {find_header, Eof, From},
{noreply, File};
-handle_call({read_header, Pos}, From, #file{reader = Reader} = File) ->
- Reader ! {read_header, Pos, From},
- {noreply, File};
+handle_call({read_header, Pos}, From, #file{reader = R} = File) ->
+ R ! {read_header, Pos, From},
+ % update the eof since file must have been updated externally
+ R ! {get_eof, self()},
+ receive
+ {eof, Eof, R} -> ok;
+ {'EXIT', R, Reason} ->
+ Eof = ok, % appease compiler
+ exit({read_loop_died, Reason})
+ end,
+ {noreply, File#file{eof=Eof}};
handle_call(snapshot_reads, _From, #file{reader = R, writer = W} = File) ->
R ! {set_close_after, infinity, self()},
- couch_util:shutdown_sync(W),
+ couch_util:shutdown_sync(W), % no-op if nil
receive
{ok, R} -> ok;
{'EXIT', R, Reason} ->
@@ -732,6 +740,10 @@ handle_reader_message(Msg, Fd, FilePath, CloseTimeout) ->
Result = (catch load_header(Fd, Pos div ?SIZE_BLOCK)),
gen_server:reply(From, Result),
reader_loop(Fd, FilePath, CloseTimeout);
+ {get_eof, From} ->
+ {ok, Pos} = file:position(Fd, eof),
+ From ! {eof, Pos, self()},
+ reader_loop(Fd, FilePath, CloseTimeout);
{set_close_after, NewCloseTimeout, From} ->
From ! {ok, self()},
reader_loop(Fd, FilePath, NewCloseTimeout);
View
39 src/couchdb/couch_file_write_guard.erl
@@ -26,7 +26,7 @@
% We don't use OTP supervisors as they scale poorly for lots of child
% starts/stops (linear scans for child specs)
--export([add/2, remove/1]).
+-export([add/2, remove/1, disable_for_testing/0]).
-export([init/1, handle_call/3, sup_start_link/0]).
-export([handle_cast/2, code_change/3, handle_info/2, terminate/2]).
@@ -38,6 +38,10 @@ remove(Pid) ->
gen_server:call(couch_file_write_guard, {remove, Pid}, infinity).
+disable_for_testing() ->
+ gen_server:call(couch_file_write_guard, disable_for_testing, infinity).
+
+
sup_start_link() ->
gen_server:start_link({local,couch_file_write_guard},couch_file_write_guard,[],[]).
@@ -45,7 +49,7 @@ sup_start_link() ->
init([]) ->
ets:new(couch_files_by_name, [set, private, named_table]),
ets:new(couch_files_by_pid, [set, private, named_table]),
- {ok, ok}.
+ {ok, true}.
terminate(_Reason, _Srv) ->
@@ -57,25 +61,33 @@ terminate(_Reason, _Srv) ->
ok.
-handle_call({add, Filepath, Pid}, _From, Server) ->
+handle_call({add, Filepath, Pid}, _From, true) ->
case ets:insert_new(couch_files_by_name, {Filepath, Pid}) of
true ->
Ref = erlang:monitor(process, Pid),
true = ets:insert_new(couch_files_by_pid, {Pid, Filepath, Ref}),
- {reply, ok, Server};
+ {reply, ok, true};
false ->
- {reply, already_added_to_file_write_guard, Server}
+ {reply, already_added_to_file_write_guard, true}
end;
-handle_call({remove, Pid}, _From, Server) ->
+handle_call({add, _Filepath, _Pid}, _From, false) ->
+ % no-op for testing
+ {reply, ok, false};
+handle_call({remove, Pid}, _From, true) ->
case ets:lookup(couch_files_by_pid, Pid) of
[{Pid, Filepath, Ref}] ->
true = demonitor(Ref, [flush]),
true = ets:delete(couch_files_by_name, Filepath),
true = ets:delete(couch_files_by_pid, Pid),
- {reply, ok, Server};
+ {reply, ok, true};
_ ->
- {reply, removing_unadded_file, Server}
- end.
+ {reply, removing_unadded_file, true}
+ end;
+handle_call({remove, _Pid}, _From, false) ->
+ % no-op for testing
+ {reply, ok, false};
+handle_call(disable_for_testing, _From, _State) ->
+ {reply, ok, false}.
handle_cast(Msg, _Server) ->
@@ -86,15 +98,18 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-handle_info({'DOWN', MonRef, _Type, Pid, Reason}, Server) ->
+handle_info({'DOWN', MonRef, _Type, Pid, Reason}, true) ->
case ets:lookup(couch_files_by_pid, Pid) of
[{Pid, Filepath, MonRef}] ->
true = ets:delete(couch_files_by_name, Filepath),
true = ets:delete(couch_files_by_pid, Pid),
- {noreply, Server};
+ {noreply, true};
_ ->
?LOG_ERROR("Unexpected down message in couch_file_write_guard: ~p",
[Reason]),
exit(shutdown)
- end.
+ end;
+handle_info({'DOWN', _MonRef, _Type, _Pid, _Reason}, false) ->
+ % no-op for testing
+ {noreply, false}.
View
11 test/etap/071-couch-db-external-write.t
@@ -88,6 +88,7 @@ main(_) ->
test() ->
couch_server_sup:start_link(test_util:config_files()),
+ couch_file_write_guard:disable_for_testing(),
couch_server:delete(<<"etap-test-db">>, []),
{ok, Db} = couch_db:create(<<"etap-test-db">>, []),
ok = couch_db:update_doc(Db, #doc{id = <<"1">>,body = <<"{foo:1}">>},
@@ -110,12 +111,16 @@ test() ->
etap:is(couch_db:update_header_pos(Db2, 2, 0), update_file_ahead_of_couchdb,
"Should be ahead couchdb"),
- {ok, FileLen} = couch_file:bytes(Db2#db.fd),
+
+ DbRootDir = couch_config:get("couchdb", "database_dir", "."),
+ Filename = filename:join(DbRootDir, "etap-test-db.couch.1"),
+ {ok, Fd} = couch_file:open(Filename),
+ {ok, FileLen} = couch_file:bytes(Fd),
Header = Db2#db.header,
Header2 = Header#db_header{update_seq = 0},
- etap:is(couch_file:write_header(Db2#db.fd, Header2), ok,
+ etap:is(couch_file:write_header(Fd, Header2), ok,
"Should write new header outside of couchdb"),
% calculate where new header goes
@@ -129,7 +134,7 @@ test() ->
"Should be ahead couchdb"),
Header3 = Header#db_header{update_seq = Header#db_header.update_seq + 1},
- etap:is(couch_file:write_header(Db2#db.fd, Header3), ok,
+ etap:is(couch_file:write_header(Fd, Header3), ok,
"Should write new header outside of couchdb"),
NewHeaderPos2 = NewHeaderPos + ?SIZE_BLOCK,
Please sign in to comment.
Something went wrong with that request. Please try again.