Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement the Streams API BatchAppend rpc #53

Closed
Tracked by #45
the-mikedavis opened this issue Aug 19, 2021 · 3 comments · Fixed by #57
Closed
Tracked by #45

implement the Streams API BatchAppend rpc #53

the-mikedavis opened this issue Aug 19, 2021 · 3 comments · Fixed by #57

Comments

@the-mikedavis
Copy link
Collaborator

the-mikedavis commented Aug 19, 2021

The BatchAppend rpc is a new rpc call in the streams API which is supposed to optimize append throughput.

from #45:

here's the diff of the streams.proto definitions which adds in the new batch-append rpc

diff --git a/src/Protos/Grpc/streams.proto b/src/Protos/Grpc/streams.proto
index d5fc1e50b..dd71e65f5 100644
--- a/src/Protos/Grpc/streams.proto
+++ b/src/Protos/Grpc/streams.proto
@@ -3,12 +3,16 @@ package event_store.client.streams;
 option java_package = "com.eventstore.dbclient.proto.streams";
 
 import "shared.proto";
+import "status.proto";
+import "google/protobuf/empty.proto";
+import "google/protobuf/timestamp.proto";
 
 service Streams {
 	rpc Read (ReadReq) returns (stream ReadResp);
 	rpc Append (stream AppendReq) returns (AppendResp);
 	rpc Delete (DeleteReq) returns (DeleteResp);
 	rpc Tombstone (TombstoneReq) returns (TombstoneResp);
+	rpc BatchAppend (stream BatchAppendReq) returns (stream BatchAppendResp);
 }
 
 message ReadReq {
@@ -157,48 +161,101 @@ message AppendResp {
+message BatchAppendReq {
+	event_store.client.UUID correlation_id = 1;
+	Options options = 2;
+	repeated ProposedMessage proposed_messages = 3;
+	bool is_final = 4;
+
+	message Options {
+		event_store.client.StreamIdentifier stream_identifier = 1;
+		oneof expected_stream_position {
+			uint64 stream_position = 2;
+			google.protobuf.Empty no_stream = 3;
+			google.protobuf.Empty any = 4;
+			google.protobuf.Empty stream_exists = 5;
+		}
+		google.protobuf.Timestamp deadline = 6;
+	}
+
+	message ProposedMessage {
+		event_store.client.UUID id = 1;
+		map<string, string> metadata = 2;
+		bytes custom_metadata = 3;
+		bytes data = 4;
+	}
+}
+
+message BatchAppendResp {
+	event_store.client.UUID correlation_id = 1;
+	oneof result {
+		google.rpc.Status error = 2;
+		Success success = 3;
+	}
+
+	event_store.client.StreamIdentifier stream_identifier = 4;
+
+	oneof expected_stream_position {
+		uint64 stream_position = 5;
+		google.protobuf.Empty no_stream = 6;
+		google.protobuf.Empty any = 7;
+		google.protobuf.Empty stream_exists = 8;
+	}
+
+	message Success {
+		oneof current_revision_option {
+			uint64 current_revision = 1;
+			google.protobuf.Empty no_stream = 2;
+		}
+		oneof position_option {
+			event_store.client.AllStreamPosition position = 3;
+			google.protobuf.Empty no_position = 4;
+		}
+	}
+}
+

(note that changes to event_store.client.{shared. => }* have been omitted)

Glancing at the new proto it looks like you stream individual BatchAppendReq messages where each BatchAppendReq adds a chunk of new messages. I would imagine that this rpc was created for the migrator efforts (the tool that migrates data from one eventstoredb to another) where the total append bytes would typically exceed the maximum. It also looks like there are some cool new controls in there for setting a deadline timestamp which could prove useful.

@the-mikedavis the-mikedavis changed the title a new batch-append rpc implement the Streams API BatchAppend rpc Aug 19, 2021
@the-mikedavis
Copy link
Collaborator Author

ah actually the (stream BatchAppendResp) return rpc value means that this actually works a lot like a persistent subscription but in reverse: the client emits chunks of events to append and the server acknowledges the chunks as they get committed

and the other thing that's interesting with the new proto definition here is that each BatchAppendReq has a stream_identifier in its options, which means that you can append to multiple different streams in a single invocation of this rpc, which removes some overhead


this feature is pretty different than any of the other functionality exposed by the grpc interface and the function(s) in spear will probably have to diverge from the normal request/5 template. I could see implementing this in a similar way to Spear.append/4, so that it takes an enumerable and does the chunking on stream_name itself, but then the feature misses the async multi-chunk ability (you can't have an effective in-flight buffer of messages to append)

I imagine this will need to be implemented with multiple new functions so that a consumer of spear has fine-grained control over how the feature gets used (it'll probably need something like a GenStage pipeline to get used most efficiently anyways)

@the-mikedavis
Copy link
Collaborator Author

I think a function interface in spear like this makes sense:

@spec append_batch(
        events :: Enumerable.t(),
        conn,
        send_ack_to :: pid() | GenServer.name(),
        stream_name :: String.t(),
        opts :: Keyword.t()
      ) :: :ok

And this would be asynchronous: the acknowledgement BatchAppendResp would be forwarded to that send_ack_to pid.

opts would have fields:

  • :deadline - the deadline the EventStoreDB must commit the events by
    with some sane default
  • :expect - an expection as one might pass to append/4
  • :done? - (default: false) used to mark the chunk of events as the final in the batch request

@the-mikedavis
Copy link
Collaborator Author

☝️ that might be problematic because there'd be no way to do multiple separate batch appends at the same time on the same connection, which one may wish to do for better performance (maybe there's a way to maximize http2 performance here by splitting up batch-append requests across http2 streams?)

instead it might make sense to shuffle around the arguments like so:

@spec append_batch(
        events :: Enumerable.t(),
        conn,
        batch_id :: reference() | :new,
        stream_name :: String.t(),
        opts :: Keyword.t()
      ) :: {:ok, batch_id :: reference()}

And then opts would have the :send_ack_to option defaulting to self().

Then acknowledgements would be received like so

iex> flush()
{:batch, :ok, batch_id :: reference()}
# or in cases of error
{:batch, :error, reason :: term(), batch_id :: reference()}

or it may make sense to write a new struct for this:

iex> flush()
%Spear.BatchAppendResult{
  batch_id: reference(),
  result: :ok | {:error, reason :: term()} | Spear.Records.BatchAppendResp.t(),
  correlation_id: String.t() # uuid4 correlation_id
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant