Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClientOnlyStream.receiveAndClose(): Cancellation Exception occurring #100

Closed
Jeremy-Brown14 opened this issue Sep 15, 2023 · 6 comments
Closed
Assignees

Comments

@Jeremy-Brown14
Copy link

Jeremy-Brown14 commented Sep 15, 2023

I'm getting a crash calling this function, occasionally, it doesn't occur every time but it occurs about ⅓ of the time. It seems that if the function doesn't return the resultChannel.receive() in a timely manner, it hits the finally block and cancels the resultChannel, throwing the below exception. Also, shouldn't this function close the messageStream after receiving the result? I have to call the close() function before calling the receiveAndClose() function, otherwise my client doesn't receive from the result channel and my backend service does not process the resulting stream I sent to it. Thanks for looking into this!

FATAL EXCEPTION: OkHttp Dispatcher
    PID: 20742
    java.util.concurrent.CancellationException: Channel was cancelled
        at kotlinx.coroutines.channels.BufferedChannel.cancelImpl$kotlinx_coroutines_core(BufferedChannel.kt:1765)
        at kotlinx.coroutines.channels.BufferedChannel.cancel(BufferedChannel.kt:1762)
        at kotlinx.coroutines.channels.ReceiveChannel$DefaultImpls.cancel$default(Channel.kt:297)
        at build.buf.connect.impl.ClientOnlyStream.receiveAndClose(ClientOnlyStream.kt:36)
        at build.buf.connect.impl.ClientOnlyStream$receiveAndClose$1.invokeSuspend(Unknown Source:14)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
        at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115)
        at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:100)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

Here is my client code:

private suspend fun <PayloadType, RequestType, ResponseType> performNetworkCall(
        request: RequestType,
        performRequest: suspend (RequestType) -> ResponseMessage<ResponseType>,
        mapResponseOnSuccess: (ResponseMessage.Success<ResponseType>) -> PayloadType,
        enableLogs: Boolean = false
    ): Result<PayloadType> {
        if (enableLogs) {
            Logger.v("ConnectNetworkRequest: ${request.toString().redactPrivateFieldsConnectLibrary()}")
        }

        val isTokenValidResult = refreshTokenWithRetries()
        if (isTokenValidResult.isFailure) {
            return Result.failure(isTokenValidResult.exceptionOrNull() ?: Exception("Error token is no longer valid"))
        }

        return when (val response = performRequest(request)) {
            is ResponseMessage.Success -> {
                if (enableLogs) {
                    Logger.v("ConnectNetworkResponse: ${response.message.toString().redactPrivateFieldsConnectLibrary()}")
                }
                Result.success(mapResponseOnSuccess(response))
            }
            is ResponseMessage.Failure -> {
                Logger.e("ConnectNetworkResponse (Error): ${response.error}", response.error)
                Result.failure(response.error)
            }
        }
    }

override suspend fun savePhoto(
        file: File,
        imageType: String
    ): Result<String> {
        val imageInfo = ImageInfo
            .newBuilder()
            .setImageType(imageType)
            .build()

        return performNetworkCall(
            request = SavePhotoRequest
                .newBuilder()
                .setInfo(imageInfo)
                .build(),
            performRequest = { request ->
                val savePhoto = orderService.savePhoto()
                val response = savePhoto.send(request)
                val chunkData: MutableList<ByteString> = mutableListOf()
                file.forEachBlock(blockSize = AppConfig.getPhotoUploadChunkSize()) { buffer, bytesRead ->
                    Logger.v("ConnectNetworkRequest: savePhoto, bufferSize: ${buffer.size}, bytesRead: $bytesRead")
                    chunkData.add(ByteString.copyFrom(buffer, 0, bytesRead))
                }
                val response2: MutableList<Result<Unit>> = mutableListOf()
                chunkData.forEach { byteString ->
                    response2.add(
                        savePhoto.send(SavePhotoRequest
                            .newBuilder()
                            .setChunkData(byteString)
                            .build()
                        )
                    )
                }
                savePhoto.close()
                val result = savePhoto.receiveAndClose()
                if (response.isSuccess && response2.all { it.isSuccess }) {
                    ResponseMessage.Success(
                        message = result,
                        code = Code.OK,
                        headers = emptyMap(),
                        trailers = emptyMap()
                    )
                } else {
                    ResponseMessage.Failure(
                        error = ConnectError(code = Code.INTERNAL_ERROR),
                        code = Code.INTERNAL_ERROR,
                        headers = emptyMap(),
                        trailers = emptyMap()
                    )
                }
            },
            mapResponseOnSuccess = { "SUCCESS" }
        )
    }
@pkwarren pkwarren self-assigned this Sep 20, 2023
@pkwarren
Copy link
Contributor

Sorry for the delay - I'll be taking a look at this today. What type of server are you interfacing with when you see this behavior? (i.e. connect-go, grpc-go, grpc-java)

@Jeremy-Brown14
Copy link
Author

connect-go

@Jeremy-Brown14
Copy link
Author

Jeremy-Brown14 commented Sep 20, 2023

I believe what is happening is that it doesn't receive the result in a timely enough manner, as the backend awaits a push notification for the photo URL, and then it hits the finally block and cancels, causing this crash, as the receive channel is still open awaiting a result. It always does process the result, so that's the upside, it's just that it crashes my application sometimes. Initially when implementing this, I could not get my server to provide the URL in its history. The result is not used immediately, it is used later on with a history request. That was when I was just calling close() and not close() then receiveAndClose().

@Jeremy-Brown14
Copy link
Author

Jeremy-Brown14 commented Sep 20, 2023

Awesome, fix looks great! That should absolutely take care of my issues. Thank you!

pkwarren added a commit that referenced this issue Sep 22, 2023
Update conformance tests to add a new test exercising client side
streaming, which exposed several issues in streaming call
implementations.

The first issue only affected client streaming (it stopped attempting to
read a response from the server once the send side closed - it should
have stopped only if the receive side closed).

The second issue resulted from not calling close on the channel after
the completion message was received, which lead to hangs consuming from
`resultChannel()` (it would never complete). After this fix, both
examples (for java and javalite) were updated and fixed to correctly
exit when finished.

Additionally, several cleanups were made to the API (since the current
API for client streaming was non-functional - it would only return the
initial Headers result and not the message or completion result).

This should help resolve reported streaming issues like #100.

# API Updates

## `com.connectrpc.BidirectionalStreamInterface`

### Removed
* `close()`
* Use `sendClose()` instead. This may have confused callers that the
close() method would close both send and receive sides of the connection
when it was only closing the send side.

## `com.connectrpc.ClientOnlyStreamInterface`

### Added
* `sendClose()`
* This shouldn't typically need to be called as receiveAndClose()
already closes the send side of the stream.
* `isSendClosed()`

### Changed
* `receiveAndClose()`
* Changed to return a ResponseMessage instead of a StreamResult. This
allows callers to easily get access to the response as if they were
calling a unary method. Previously, the StreamResult would only return
the first result retrieved by the call, which typically was a Headers
result (leaving callers unable to access the Message or Completion
contents).

### Removed
* `close()`
  * Replaced with `sendClose()`.

## `com.connectrpc.ServerOnlyStreamInterface`

### Added
* `receiveClose()`
* `isReceiveClosed()`

### Removed
* `close()`
* This closed both the send and receive side of the stream (unlike in
other interfaces which just closed the send side). If needed, callers
should invoke `receiveClose()` instead (although this isn't necessary in
normal use).
* `send()`
* Callers should invoke `sendAndClose()` instead. Otherwise, reading
results from `resultChannel()` will hang since the send side of the
stream should be closed before reading responses.

## `com.connectrpc.StreamResult`

### Removed
* Removed the `error` field from the base `StreamResult` class. It was
never used by the `Headers` or `Message` subclasses and only used on the
`Complete` type. This should make it easier for callers to use `Headers`
and `Message` types since they don't need to worry about handling
`error`.
@pkwarren
Copy link
Contributor

connect-kotlin v0.2.0 is now available with several fixes to streaming (including client streaming). Can you give the new release a try and let me know if it resolves the issues on your side?

@pkwarren
Copy link
Contributor

Confirmed this is now working. Closing issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants