Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot catch Exception from Server Stub #141

Open
ryanhall07 opened this issue Jul 10, 2020 · 6 comments
Open

Cannot catch Exception from Server Stub #141

ryanhall07 opened this issue Jul 10, 2020 · 6 comments

Comments

@ryanhall07
Copy link

I am attempting to install a default Exception handler for our gRPC server stubs. I have tried using both a java gRPC interceptor and coroutine exception handlers, but nothing seems to work. The following code snippet below replicates the issue.

val handler = CoroutineExceptionHandler { _, ex ->
  println("this never happens!!")
}

internal class TestGreeterGrpcService : GreeterGrpcKt.GreeterCoroutineImplBase(handler) {
  override suspend fun sayHello(request: HelloRequest): HelloReply = throw RuntimeException("boom")
}

Using the debugger I traced the issue to here

https://github.com/grpc/grpc-kotlin/blob/master/stub/src/main/java/io/grpc/kotlin/ServerCalls.kt#L229

It seems like the CoroutineContext provided to the stub is not used in the Flow that dispatches requests.

I'm fairly new to coroutines so maybe I'm doing something wrong.

@lowasser
Copy link
Collaborator

Have you read the documentation of CoroutineExceptionHandler? It only ever applies under very specific circumstances.

@ryanhall07
Copy link
Author

yes I read the docs and was the original reason I was attempting to install the exception handler inside a gRPC interceptor. Maybe I should take a step back to explain what I am trying to do.

I don't want to wrap every gRPC service impl with a try-catch to log exceptions. Rather install an exception handler in an interceptor so it applies to every gRPC service. I originally tried doing it inside of a ServerInterceptor but the Exception would never catch. After switching to grpc-java from grpc-kotlin, the ServerInterceptor worked just fine.

I'm basically using a version of this grpc/grpc-java#1552 (comment)

@skomp
Copy link

skomp commented Jul 24, 2020

I think you could use a SimpleForwardingServerCall in the interceptor and override the close method, checking for the status there. Fields you could look at are ofc the status.code (which most likely will be UNKNOWN if there was an exception in your server) and/or the status.cause which would give you the actual exception.

@quckly
Copy link

quckly commented Aug 14, 2020

Implement ServerInterceptor with custom ServerCall.Listener doesn't work. onHalfClose() doesn't produce exception.
Exception only is catch here https://github.com/grpc/grpc-kotlin/blob/master/stub/src/main/java/io/grpc/kotlin/ServerCalls.kt#L233, but how to propagate this to global exception handler?

@jonpeterson
Copy link

I got really stuck on this as well and was about to switch to grpc-java, but then saw how TransmitStatusRuntimeExceptionInterceptor implemented a custom ServerCall. I'm going to post my solution here to help anyone who comes across this in the future. There are some custom exceptions and extension functions (ex. getLogPrefix for request and caller ID tracing) that I'm using in my project, but I'll leave them in just to give an idea of what I was getting at.

private val logger = KotlinLogging.logger {}

class LoggingAndExceptionTranslationServerInterceptor : ServerInterceptor {

    private class ExceptionTranslatingServerCall<ReqT, RespT>(
        delegate: ServerCall<ReqT, RespT>
    ) : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(delegate) {

        override fun close(status: Status, trailers: Metadata) {
            val newStatus = if (!status.isOk) {
                val cause = status.cause
                logger.error(logger.nullIfNotDebug(cause)) { "${getLogPrefix()}closing due to error" }

                if (status.code == Status.Code.UNKNOWN) {
                    val newStatus = when (cause) {
                        is IllegalArgumentException -> Status.INVALID_ARGUMENT
                        is IllegalStateException -> Status.FAILED_PRECONDITION
                        is NotFoundException -> Status.NOT_FOUND
                        is ConflictException -> Status.ALREADY_EXISTS
                        is UnauthenticationException -> Status.UNAUTHENTICATED
                        is UnauthorizationException -> Status.PERMISSION_DENIED
                        else -> Status.UNKNOWN
                    }
                    newStatus.withDescription(cause?.message).withCause(cause)
                } else
                    status
            } else {
                logger.trace { "${getLogPrefix()}closing" }
                status
            }

            super.close(newStatus, trailers)
        }
    }

    private class LoggingServerCallListener<ReqT>(
        delegate: ServerCall.Listener<ReqT>
    ) : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegate) {

        override fun onMessage(message: ReqT) {
            logger.trace { "${getLogPrefix()}message: $message" }
            try {
                super.onMessage(message)
            } catch (t: Throwable) {
                if (logger.isDebugEnabled)
                    logger.debug(t) { "${getLogPrefix()}error on message: $message" }
                else
                    logger.error { "${getLogPrefix()}error handling message" }
                throw t
            }
        }

        override fun onHalfClose() {
            logger.trace { "${getLogPrefix()}half-close" }
            try {
                super.onHalfClose()
            } catch (t: Throwable) {
                logger.error(logger.nullIfNotDebug(t)) { "${getLogPrefix()}error handling half-close" }
                throw t
            }
        }

        override fun onCancel() {
            logger.trace { "${getLogPrefix()}cancel" }
            try {
                super.onCancel()
            } catch (t: Throwable) {
                logger.error(logger.nullIfNotDebug(t)) { "${getLogPrefix()}error handling cancel" }
                throw t
            }
        }

        override fun onComplete() {
            logger.trace { "${getLogPrefix()}complete" }
            try {
                super.onComplete()
            } catch (t: Throwable) {
                logger.error(logger.nullIfNotDebug(t)) { "${getLogPrefix()}error handling complete" }
                throw t
            }
        }

        override fun onReady() {
            logger.trace { "${getLogPrefix()}ready" }
            try {
                super.onReady()
            } catch (t: Throwable) {
                logger.error(logger.nullIfNotDebug(t)) { "${getLogPrefix()}error handling ready" }
                throw t
            }
        }
    }

    override fun <ReqT : Any, RespT : Any> interceptCall(
        call: ServerCall<ReqT, RespT>,
        metadata: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> =
        LoggingServerCallListener(next.startCall(ExceptionTranslatingServerCall(call), metadata))
}

@mfickett
Copy link

@jonpeterson Thanks for sharing that! I was trying to use onHalfClose etc and finding they did nothing.

In the implementation you shared, is the LoggingServerCallListener superfluous? It's what you'd want in grpc-java, right, but exceptions never show up in it in grpc-kotlin? So I think this would suffice:

package com.yourproject

import io.grpc.ForwardingServerCall
import io.grpc.Metadata
import io.grpc.ServerCall
import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor
import io.grpc.Status
import mu.KotlinLogging
import javax.inject.Singleton

private val logger = KotlinLogging.logger {}

/**
 * Log all exceptions thrown from gRPC endpoints, and adjust Status for known exceptions.
 */
@Singleton
class ExceptionInterceptor : ServerInterceptor {

    /**
     * When closing a gRPC call, extract any error status information to top-level fields. Also
     * log the cause of errors.
     */
    private class ExceptionTranslatingServerCall<ReqT, RespT>(
        delegate: ServerCall<ReqT, RespT>
    ) : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(delegate) {

        override fun close(status: Status, trailers: Metadata) {
            if (status.isOk) {
                return super.close(status, trailers)
            }
            val cause = status.cause
            var newStatus = status

            logger.error(cause) { "Error handling gRPC endpoint." }

            if (status.code == Status.Code.UNKNOWN) {
                val translatedStatus = when (cause) {
                    is IllegalArgumentException -> Status.INVALID_ARGUMENT
                    is IllegalStateException -> Status.FAILED_PRECONDITION
                    else -> Status.UNKNOWN
                }
                newStatus = translatedStatus.withDescription(cause?.message).withCause(cause)
            }

            super.close(newStatus, trailers)
        }
    }

    override fun <ReqT : Any, RespT : Any> interceptCall(
        call: ServerCall<ReqT, RespT>,
        headers: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> {
        return next.startCall(ExceptionTranslatingServerCall(call), headers)
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants