Skip to content

Commit

Permalink
Emit fewer flushes when handling RPCs on the server (#1110)
Browse files Browse the repository at this point in the history
Motivation:

The changes in #1101 included a change in when flushes were emitted.
For unary RPCs this led to an additional flush, which in some
circumstances led to a drop in QPS of approximately 5%.

Modifications:

- Introduce an option as to whether a flush should be emitted when
  writing initial response metadata
- Fix a bug where the request to flush or not for messages was ignored

Result:

A 5% increase in QPS for the unary-single benchmark (single connection,
one rpc at a time).
  • Loading branch information
glbrntt committed Jan 21, 2021
1 parent 7dc3216 commit 6f92056
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public final class BidirectionalStreamingServerHandler<
) {
switch part {
case let .metadata(headers):
self.context.responseWriter.sendMetadata(headers, promise: promise)
self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)

case let .message(message, metadata):
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public final class ClientStreamingServerHandler<
) {
switch part {
case let .metadata(headers):
self.context.responseWriter.sendMetadata(headers, promise: promise)
self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)

case let .message(message, metadata):
do {
Expand Down
3 changes: 2 additions & 1 deletion Sources/GRPC/CallHandlers/ServerHandlerProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ internal protocol GRPCServerResponseWriter {
/// Send the initial response metadata.
/// - Parameters:
/// - metadata: The user-provided metadata to send to the client.
/// - flush: Whether a flush should be emitted after writing the metadata.
/// - promise: A promise to complete once the metadata has been handled.
func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise<Void>?)
func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?)

/// Send the serialized bytes of a response message.
/// - Parameters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public final class ServerStreamingServerHandler<
) {
switch part {
case let .metadata(headers):
self.context.responseWriter.sendMetadata(headers, promise: promise)
self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)

case let .message(message, metadata):
do {
Expand Down
3 changes: 2 additions & 1 deletion Sources/GRPC/CallHandlers/UnaryServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ public final class UnaryServerHandler<
) {
switch part {
case let .metadata(headers):
self.context.responseWriter.sendMetadata(headers, promise: promise)
// We can delay this flush until the end of the RPC.
self.context.responseWriter.sendMetadata(headers, flush: false, promise: promise)

case let .message(message, metadata):
do {
Expand Down
46 changes: 22 additions & 24 deletions Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer

switch responsePart {
case let .metadata(headers):
self.sendMetadata(headers, promise: promise)
// We're in 'write' so we're using the old type of RPC handler which emits its own flushes,
// no need to emit an extra one.
self.sendMetadata(headers, flush: false, promise: promise)

case let .message(buffer, metadata):
self.sendMessage(buffer, metadata: metadata, promise: promise)
Expand All @@ -181,13 +183,7 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
}

internal func flush(context: ChannelHandlerContext) {
if self.isReading {
// We're already reading; record the flush and emit it when the read completes.
self.flushPending = true
} else {
// Not reading: flush now.
context.flush()
}
self.markFlushPoint()
}

/// Called when the pipeline has finished configuring.
Expand Down Expand Up @@ -280,17 +276,15 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer

internal func sendMetadata(
_ headers: HPACKHeaders,
flush: Bool,
promise: EventLoopPromise<Void>?
) {
switch self.state.send(headers: headers) {
case let .success(headers):
let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
self.context.write(self.wrapOutboundOut(payload), promise: promise)

if self.isReading {
self.flushPending = true
} else {
self.context.flush()
if flush {
self.markFlushPoint()
}

case let .failure(error):
Expand All @@ -313,11 +307,8 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
case let .success(buffer):
let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
self.context.write(self.wrapOutboundOut(payload), promise: promise)

if self.isReading {
self.flushPending = true
} else {
self.context.flush()
if metadata.flush {
self.markFlushPoint()
}

case let .failure(error):
Expand All @@ -335,15 +326,22 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
// Always end stream for status and trailers.
let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
self.context.write(self.wrapOutboundOut(payload), promise: promise)

if self.isReading {
self.flushPending = true
} else {
self.context.flush()
}
// We'll always flush on end.
self.markFlushPoint()

case let .failure(error):
promise?.fail(error)
}
}

/// Mark a flush as pending - to be emitted once the read completes - if we're currently reading,
/// or emit a flush now if we are not.
private func markFlushPoint() {
if self.isReading {
self.flushPending = true
} else {
self.flushPending = false
self.context.flush()
}
}
}
2 changes: 1 addition & 1 deletion Tests/GRPCTests/HTTP2ToRawGRPCStateMachineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ extension ServerMessageEncoding {
}

class NoOpResponseWriter: GRPCServerResponseWriter {
func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise<Void>?) {
func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
promise?.succeed(())
}

Expand Down
2 changes: 1 addition & 1 deletion Tests/GRPCTests/UnaryServerHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ final class ResponseRecorder: GRPCServerResponseWriter {
var status: GRPCStatus?
var trailers: HPACKHeaders?

func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise<Void>?) {
func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
XCTAssertNil(self.metadata)
self.metadata = metadata
promise?.succeed(())
Expand Down

0 comments on commit 6f92056

Please sign in to comment.