Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client streaming RPC implementation: Cannot call context.responsePromise.fail() outside of the EventLoopFuture #520

Closed
vyshane opened this issue Jul 22, 2019 · 9 comments

Comments

@vyshane
Copy link

commented Jul 22, 2019

New Issue Checklist

Issue Description

I'm using grpc-swift with NIO (gRPC Swift 1.0.0-alpha.1). When implementing a client streaming RPC, I get a fatal error if I don't call context.responsePromise.fail(status) inside of the EventLoopFuture<(StreamEvent<EchoRequest>) -> Void>. I have distilled the issue into the following.

The Protobuf

service ClientStreamingScenarios {
  rpc ClientStreamFailedPrecondition (stream EchoRequest) returns (Empty) {}
}

message EchoRequest {
  string message = 1;
}

message Empty {}

The RPC Implementation

class ClientStreamingTestsService: ClientStreamingScenariosProvider {

  func clientStreamFailedPrecondition(context: UnaryResponseCallContext<Empty>)
    -> EventLoopFuture<(StreamEvent<EchoRequest>) -> Void>
  {
    let status = GRPCStatus(code: .failedPrecondition, message: "Failed precondition message")
    context.responsePromise.fail(status)
    return context.eventLoop.makeSucceededFuture({ streamEvent in
      // Noop
    })
  }
}

The Error

When I exercise the RPC, I get:

Fatal error: tried to decode as type RawGRPCServerResponsePart but found GRPCServerResponsePart<Empty> with contents other(GRPC.GRPCServerResponsePart<Empty>.status(GRPC.GRPCStatus)): file [...]/.build/checkouts/swift-nio/Sources/NIO/NIOAny.swift, line 200

What Works

If I change the RPC implementation to the following it works:

  func clientStreamFailedPrecondition(context: UnaryResponseCallContext<Empty>)
    -> EventLoopFuture<(StreamEvent<EchoRequest>) -> Void>
  {
    return context.eventLoop.makeSucceededFuture({ streamEvent in
      let status = GRPCStatus(code: .failedPrecondition, message: "Failed precondition message")
      context.responsePromise.fail(status)
    })
  }

In my use case I really want to be able to call context.responsePromise.fail() outside of the EventLoopFuture. I'd appreciate any guidance regarding this issue.

Stack Trace

Thread 5 Queue : nio.nioTransportServices.connectionchannel (serial)
#0	0x00007fff6eb26d80 in _swift_runtime_on_report ()
#1	0x00007fff6eb9c373 in _swift_stdlib_reportFatalErrorInFile ()
#2	0x00007fff6eab3aee in specialized closure #1 in closure #1 in closure #1 in _assertionFailure(_:_:file:line:flags:) ()
#3	0x00007fff6eab3c67 in specialized closure #1 in closure #1 in _assertionFailure(_:_:file:line:flags:) ()
#4	0x00007fff6e883b2c in specialized String.withUTF8(_:) ()
#5	0x00007fff6ea86080 in specialized _assertionFailure(_:_:file:line:flags:) ()
#6	0x00007fff6e8830e9 in _assertionFailure(_:_:file:line:flags:) ()
#7	0x000000010655bd1d in NIOAny.forceAsOther(type:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/NIOAny.swift:200
#8	0x000000010655c0ab in NIOAny.forceAs(type:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/NIOAny.swift:219
#9	0x00000001065c48e1 in ChannelOutboundHandler.unwrapOutboundIn(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/TypeAssistedChannelHandler.swift:83
#10	0x0000000104d37313 in HTTP1ToRawGRPCServerCodec.write(context:data:promise:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/grpc-swift/Sources/GRPC/HTTP1ToRawGRPCServerCodec.swift:178
#11	0x0000000104d3a039 in protocol witness for _ChannelOutboundHandler.write(context:data:promise:) in conformance HTTP1ToRawGRPCServerCodec ()
#12	0x00000001064d68d6 in ChannelHandlerContext.invokeWriteAndFlush(_:promise:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:1433
#13	0x00000001064d69a5 in ChannelHandlerContext.invokeWriteAndFlush(_:promise:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:1436
#14	0x00000001064d440f in ChannelPipeline.writeAndFlush0(_:promise:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:760
#15	0x00000001064d4248 in ChannelPipeline.writeAndFlush(_:promise:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:672
#16	0x00000001064c131a in Channel.writeAndFlush(_:promise:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/Channel.swift:200
#17	0x000000010605b367 in protocol witness for ChannelOutboundInvoker.writeAndFlush(_:promise:) in conformance HTTP2StreamChannel ()
#18	0x0000000104d4d204 in closure #4 in UnaryResponseCallContextImpl.init(channel:request:errorDelegate:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/grpc-swift/Sources/GRPC/ServerCallContexts/UnaryResponseCallContext.swift:71
#19	0x0000000104d4a26f in thunk for @escaping @callee_guaranteed (@guaranteed GRPCStatus) -> () ()
#20	0x000000010652a027 in closure #1 in EventLoopFuture.whenSuccess(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/EventLoopFuture.swift:658
#21	0x00000001065229ff in CallbackList._run() at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/EventLoopFuture.swift:83
#22	0x0000000106523d68 in EventLoopPromise._resolve(value:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/EventLoopFuture.swift:198
#23	0x0000000106524024 in EventLoopPromise.fail(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/EventLoopFuture.swift:184
#24	0x0000000104c4b5c1 in ClientStreamingTestsService.clientStreamFailedPrecondition(context:) at /Users/shane/Projects/grpc-swift-combine/Tests/CombineGRPCTests/Server Implementations/ClientStreamingTestsService.swift:50
#25	0x0000000104c4b9ee in protocol witness for ClientStreamingScenariosProvider.clientStreamFailedPrecondition(context:) in conformance ClientStreamingTestsService ()
#26	0x0000000104c43ae5 in closure #2 in ClientStreamingScenariosProvider.handleMethod(_:request:serverHandler:channel:errorDelegate:) at /Users/shane/Projects/grpc-swift-combine/Tests/CombineGRPCTests/Generated/test_scenarios.grpc.swift:349
#27	0x0000000104cf6342 in ClientStreamingCallHandler.init(channel:request:errorDelegate:eventObserverFactory:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/grpc-swift/Sources/GRPC/CallHandlers/ClientStreamingCallHandler.swift:24
#28	0x0000000104cf5ff0 in ClientStreamingCallHandler.__allocating_init(channel:request:errorDelegate:eventObserverFactory:) ()
#29	0x0000000104c43731 in ClientStreamingScenariosProvider.handleMethod(_:request:serverHandler:channel:errorDelegate:) at /Users/shane/Projects/grpc-swift-combine/Tests/CombineGRPCTests/Generated/test_scenarios.grpc.swift:348
#30	0x0000000104c4ba70 in protocol witness for CallHandlerProvider.handleMethod(_:request:serverHandler:channel:errorDelegate:) in conformance ClientStreamingTestsService ()
#31	0x0000000104d1c9c1 in GRPCChannelHandler.getCallHandler(channel:requestHead:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/grpc-swift/Sources/GRPC/GRPCChannelHandler.swift:95
#32	0x0000000104d1be2a in GRPCChannelHandler.channelRead(context:data:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/grpc-swift/Sources/GRPC/GRPCChannelHandler.swift:56
#33	0x0000000104d1d0e9 in protocol witness for _ChannelInboundHandler.channelRead(context:data:) in conformance GRPCChannelHandler ()
#34	0x00000001064d8974 in ChannelHandlerContext.invokeChannelRead(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:1328
#35	0x00000001064dc5b3 in ChannelHandlerContext.fireChannelRead(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:1141
#36	0x0000000104d354be in HTTP1ToRawGRPCServerCodec.processHead(context:requestHead:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/grpc-swift/Sources/GRPC/HTTP1ToRawGRPCServerCodec.swift:126
#37	0x0000000104d3498e in HTTP1ToRawGRPCServerCodec.channelRead(context:data:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/grpc-swift/Sources/GRPC/HTTP1ToRawGRPCServerCodec.swift:96
#38	0x0000000104d37049 in protocol witness for _ChannelInboundHandler.channelRead(context:data:) in conformance HTTP1ToRawGRPCServerCodec ()
#39	0x00000001064d8974 in ChannelHandlerContext.invokeChannelRead(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:1328
#40	0x00000001064dc5b3 in ChannelHandlerContext.fireChannelRead(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:1141
#41	0x000000010606557d in HTTP2ToHTTP1ServerCodec.channelRead(context:data:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio-http2/Sources/NIOHTTP2/HTTP2ToHTTP1Codec.swift:154
#42	0x0000000106067a89 in protocol witness for _ChannelInboundHandler.channelRead(context:data:) in conformance HTTP2ToHTTP1ServerCodec ()
#43	0x00000001064d8974 in ChannelHandlerContext.invokeChannelRead(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:1328
#44	0x00000001064d2bdd in ChannelPipeline.fireChannelRead0(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:824
#45	0x00000001064d2a52 in ChannelPipeline.fireChannelRead(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:582
#46	0x000000010605b618 in HTTP2StreamChannel.receiveInboundFrame(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio-http2/Sources/NIOHTTP2/HTTP2StreamChannel.swift:543
#47	0x000000010605dfee in HTTP2StreamMultiplexer.channelRead(context:data:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio-http2/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift:82
#48	0x0000000106060099 in protocol witness for _ChannelInboundHandler.channelRead(context:data:) in conformance HTTP2StreamMultiplexer ()
#49	0x00000001064d8974 in ChannelHandlerContext.invokeChannelRead(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:1328
#50	0x00000001064dc5b3 in ChannelHandlerContext.fireChannelRead(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:1141
#51	0x000000010600cc97 in NIOHTTP2Handler.processFrame(_:flowControlledLength:context:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio-http2/Sources/NIOHTTP2/HTTP2ChannelHandler.swift:294
#52	0x0000000106007ccc in NIOHTTP2Handler.frameDecodeLoop(context:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio-http2/Sources/NIOHTTP2/HTTP2ChannelHandler.swift:190
#53	0x0000000106007938 in NIOHTTP2Handler.channelRead(context:data:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio-http2/Sources/NIOHTTP2/HTTP2ChannelHandler.swift:139
#54	0x000000010600a609 in protocol witness for _ChannelInboundHandler.channelRead(context:data:) in conformance NIOHTTP2Handler ()
#55	0x00000001064d8974 in ChannelHandlerContext.invokeChannelRead(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:1328
#56	0x00000001064d2bdd in ChannelPipeline.fireChannelRead0(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:824
#57	0x00000001064d2a52 in ChannelPipeline.fireChannelRead(_:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio/Sources/NIO/ChannelPipeline.swift:582
#58	0x0000000106398e84 in NIOTSConnectionChannel.dataReceivedHandler(content:context:isComplete:error:) at /Users/shane/Projects/grpc-swift-combine/.build/checkouts/swift-nio-transport-services/Sources/NIOTransportServices/NIOTSConnectionChannel.swift:697
#59	0x00007fff6f058051 in partial apply for closure #1 in NWConnection.receiveMessage(completion:) ()
#60	0x00007fff6f033aac in thunk for @escaping @callee_guaranteed (@guaranteed OS_dispatch_data?, @guaranteed OS_nw_content_context?, @unowned Bool, @guaranteed OS_nw_error?) -> () ()
#61	0x00007fff6f384243 in _dispatch_block_async_invoke2 ()
#62	0x00007fff6f3785de in _dispatch_client_callout ()
#63	0x00007fff6f37db9e in _dispatch_lane_serial_drain ()
#64	0x00007fff6f37e555 in _dispatch_lane_invoke ()
#65	0x00007fff6f37da50 in _dispatch_lane_serial_drain ()
#66	0x00007fff6f37e555 in _dispatch_lane_invoke ()
#67	0x00007fff6f387b91 in _dispatch_workloop_worker_thread ()
#68	0x00007fff6f5d76d3 in _pthread_wqthread.cold.1 ()
#69	0x00007fff6f5d1856 in _pthread_wqthread ()
#70	0x00007fff6f5d1717 in start_wqthread ()

Environment

Key Value
OS Version macOS Catalina Beta 4
Swift Version 5.1
Xcode Version 11 Beta 4
gRPC-Swift Version 1.0.0-alpha.1
protoc Version 3.7.1
protoc-gen-swift Version 1.6.0
protoc-gen-swiftgrpc Version 1.0.0-alpha.1

@vyshane vyshane changed the title Fatal error: tried to decode as type RawGRPCServerResponsePart but found GRPCServerResponsePart<Empty> with contents other(GRPC.GRPCServerResponsePart<Message>.status(GRPC.GRPCStatus)) Client streaming RPC implementation: Cannot call context.responsePromise.fail() outside of the EventLoopFuture Jul 22, 2019

@vyshane

This comment has been minimized.

Copy link
Author

commented Jul 23, 2019

Swift code was generated with vanilla settings:

protoc test_scenarios.proto --swift_out=Generated/
protoc test_scenarios.proto --swiftgrpc_out=Generated/
@MrMage

This comment has been minimized.

Copy link
Collaborator

commented Jul 23, 2019

I think that we need to fix this crash, but did you consider returning a failed future instead? That should take care of failing the response promise for you. E.g. (pseudocode)

  func clientStreamFailedPrecondition(context: UnaryResponseCallContext<Empty>)
    -> EventLoopFuture<(StreamEvent<EchoRequest>) -> Void>
  {
    let promise = context.eventLoop.makePromise()
    promise.fail(status)
    return promise.futureResult
  }
@vyshane

This comment has been minimized.

Copy link
Author

commented Jul 23, 2019

I have. However, doing it this way means that I need to already know that the call should fail before I have a chance to look at the request stream coming in from the client.

I'm writing an RPC handler function that takes a client request stream and returns either the response as a stream of one, or fails the output stream with a GRPCStatus.

Stream of requests -> handler -> Failable stream of one response

Using Combine types, this is what the function signature looks like:

func handle<Req, Resp>(_ context: UnaryResponseCallContext<Resp>,
                       handler: (AnyPublisher<Req, Never>) -> AnyPublisher<Resp, GRPCStatus>)
                      -> EventLoopFuture<(StreamEvent<Req>) -> Void>

Sample RPC implementation using the above:

  func clientStreamFailedPrecondition(context: UnaryResponseCallContext<Empty>)
    -> EventLoopFuture<(StreamEvent<EchoRequest>) -> Void>
  {
    return handle(context) { requests in
      // We'd normally do stuff with the request stream, but here we are exercising a failing RPC
      let status = GRPCStatus(code: .failedPrecondition, message: "Failed precondition message")
      return Fail<Empty, GRPCStatus>(error: status).eraseToAnyPublisher()
    }
  }

In order to set up the stream of requests (AnyPublisher<Req, Never>) I need to have already returned a successful future. Therefore it looks to me that I need to rely on context.responsePromise.

@glbrntt

This comment has been minimized.

Copy link
Collaborator

commented Jul 23, 2019

Thanks for the bug report @vyshane. Your use case looks totally reasonable (I'm not too familiar with Combine) and this does appear to be a bug. The fact we're unwrapping the wrong type in NIOAny (actual: GRPCServerResponsePart<Empty>, expected: RawGRPCServerResponsePart) suggests we're writing into the pipeline before it's been configured with the appropriate handlers. I'll look into this!

@vyshane

This comment has been minimized.

Copy link
Author

commented Jul 23, 2019

Thank you @glbrntt, much appreciated!

@vyshane

This comment has been minimized.

Copy link
Author

commented Jul 25, 2019

Thanks for merging the fix! Can you cut a new release so I can test it out?

@glbrntt

This comment has been minimized.

Copy link
Collaborator

commented Jul 25, 2019

Good timing, we planned on cutting one soon: hopefully @MrMage will do within the next hour or so!

@MrMage

This comment has been minimized.

Copy link
Collaborator

commented Jul 25, 2019

@MrMage MrMage closed this Jul 25, 2019

@vyshane

This comment has been minimized.

Copy link
Author

commented Jul 27, 2019

I can confirm that the issue is fixed for my use cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.