Skip to content

Commit

Permalink
fix: avoid dropping attachment chunks on quorum writes
Browse files Browse the repository at this point in the history
This only applies to databases that have an n > [cluster] n.

Our `middleman()` function that proxies attachment streams from
the incoming HTTP socket on the coordinating node to the target
shard-bearing nodes used the server config to determine whether
it should start dropping chunks from the stream.

If a database was created with a larger `n`, the `middleman()`
function could have started to drop attachment chunks before
all attached nodes had a chance to receive it.

This fix uses a database’s concrete `n` value rather than the
server config default value.

Co-Authored-By: James Coglan <jcoglan@gmail.com>
Co-Authored-By: Robert Newson <rnewson@apache.org>
  • Loading branch information
3 people committed Aug 26, 2021
1 parent 4262714 commit 9b30ada
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/chttpd/src/chttpd_db.erl
Expand Up @@ -1574,7 +1574,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa
undefined -> <<"application/octet-stream">>;
CType -> list_to_binary(CType)
end,
Data = fabric:att_receiver(Req, chttpd:body_length(Req)),
Data = fabric:att_receiver(Req, couch_db:name(Db), chttpd:body_length(Req)),
ContentLen = case couch_httpd:header_value(Req,"Content-Length") of
undefined -> undefined;
Length -> list_to_integer(Length)
Expand Down
8 changes: 4 additions & 4 deletions src/fabric/src/fabric.erl
Expand Up @@ -28,7 +28,7 @@
% Documents
-export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3,
get_missing_revs/2, get_missing_revs/3, update_doc/3, update_docs/3,
purge_docs/3, att_receiver/2]).
purge_docs/3, att_receiver/3]).

% Views
-export([all_docs/4, all_docs/5, changes/4, query_view/3, query_view/4,
Expand Down Expand Up @@ -324,11 +324,11 @@ purge_docs(DbName, IdsRevs, Options) when is_list(IdsRevs) ->
%% returns a fabric attachment receiver context tuple
%% with the spawned middleman process, an empty binary,
%% or exits with an error tuple {Error, Arg}
-spec att_receiver(#httpd{}, Length :: undefined | chunked | pos_integer() |
-spec att_receiver(#httpd{}, dbname(), Length :: undefined | chunked | pos_integer() |
{unknown_transfer_encoding, any()}) ->
{fabric_attachment_receiver, pid(), chunked | pos_integer()} | binary().
att_receiver(Req, Length) ->
fabric_doc_atts:receiver(Req, Length).
att_receiver(Req, DbName, Length) ->
fabric_doc_atts:receiver(Req, DbName, Length).

%% @equiv all_docs(DbName, [], Callback, Acc0, QueryArgs)
all_docs(DbName, Callback, Acc, QueryArgs) ->
Expand Down
26 changes: 13 additions & 13 deletions src/fabric/src/fabric_doc_atts.erl
Expand Up @@ -18,25 +18,25 @@
-include_lib("couch/include/couch_db.hrl").

-export([
receiver/2,
receiver/3,
receiver_callback/2
]).


receiver(_Req, undefined) ->
receiver(_Req, _DbName, undefined) ->
<<"">>;
receiver(_Req, {unknown_transfer_encoding, Unknown}) ->
receiver(_Req, _DbName, {unknown_transfer_encoding, Unknown}) ->
exit({unknown_transfer_encoding, Unknown});
receiver(Req, chunked) ->
MiddleMan = spawn(fun() -> middleman(Req, chunked) end),
receiver(Req, DbName, chunked) ->
MiddleMan = spawn(fun() -> middleman(Req, DbName, chunked) end),
{fabric_attachment_receiver, MiddleMan, chunked};
receiver(_Req, 0) ->
receiver(_Req, _DbName, 0) ->
<<"">>;
receiver(Req, Length) when is_integer(Length) ->
receiver(Req, DbName, Length) when is_integer(Length) ->
maybe_send_continue(Req),
Middleman = spawn(fun() -> middleman(Req, Length) end),
Middleman = spawn(fun() -> middleman(Req, DbName, Length) end),
{fabric_attachment_receiver, Middleman, Length};
receiver(_Req, Length) ->
receiver(_Req, _DbName, Length) ->
exit({length_not_integer, Length}).


Expand Down Expand Up @@ -108,21 +108,21 @@ receive_unchunked_attachment(Req, Length) ->
end,
receive_unchunked_attachment(Req, Length - size(Data)).

middleman(Req, chunked) ->
middleman(Req, DbName, chunked) ->
% spawn a process to actually receive the uploaded data
RcvFun = fun(ChunkRecord, ok) ->
receive {From, go} -> From ! {self(), ChunkRecord} end, ok
end,
Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end),

% take requests from the DB writers and get data from the receiver
N = config:get_integer("cluster", "n", 3),
N = mem3:n(DbName),
Timeout = fabric_util:attachments_timeout(),
middleman_loop(Receiver, N, [], [], Timeout);

middleman(Req, Length) ->
middleman(Req, DbName, Length) ->
Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end),
N = config:get_integer("cluster", "n", 3),
N = mem3:n(DbName),
Timeout = fabric_util:attachments_timeout(),
middleman_loop(Receiver, N, [], [], Timeout).

Expand Down

0 comments on commit 9b30ada

Please sign in to comment.