Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tests for RPC error handling, and a fix for StatusRuntimeException #252

Merged
merged 1 commit into from
Apr 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion modules/internal/src/main/scala/server/monixCalls.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import io.grpc.stub.ServerCalls.{
UnaryMethod
}
import io.grpc.stub.StreamObserver
import io.grpc.{Status, StatusException}
import io.grpc.{Status, StatusException, StatusRuntimeException}
import monix.execution.Scheduler
import monix.reactive.Observable

Expand Down Expand Up @@ -89,6 +89,8 @@ object monixCalls {
observer.onCompleted()
case Left(s: StatusException) =>
observer.onError(s)
case Left(s: StatusRuntimeException) =>
observer.onError(s)
case Left(e) =>
observer.onError(
Status.INTERNAL.withDescription(e.getMessage).withCause(e).asException()
Expand Down
19 changes: 18 additions & 1 deletion modules/server/src/test/scala/fs2/RPCTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,27 @@ class RPCTests extends RpcBaseTestSuite with BeforeAndAfterAll {

"be able to run server streaming services" in {

freesRPCServiceClient.serverStreaming(b1).compile.toList.unsafeRunSync shouldBe cList
freesRPCServiceClient.serverStreaming(b1).compile.toList.unsafeRunSync() shouldBe cList

}

"handle errors in server streaming services" in {

def clientProgram(errorCode: String): Stream[ConcurrentMonad, C] =
freesRPCServiceClient
.serverStreamingWithError(E(a1, errorCode))
.handleErrorWith(ex => Stream(C(ex.getMessage, a1)))

clientProgram("SE").compile.toList
.unsafeRunSync() shouldBe List(C("INVALID_ARGUMENT: SE", a1))
clientProgram("SRE").compile.toList
.unsafeRunSync() shouldBe List(C("INVALID_ARGUMENT: SRE", a1))
clientProgram("RTE").compile.toList
.unsafeRunSync() shouldBe List(C("UNKNOWN", a1)) //todo: consider preserving the exception as is done for unary
clientProgram("Thrown").compile.toList
.unsafeRunSync() shouldBe List(C("UNKNOWN", a1))
}

"be able to run client streaming services" in {

freesRPCServiceClient
Expand Down
16 changes: 16 additions & 0 deletions modules/server/src/test/scala/fs2/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import freestyle.rpc.common._
import freestyle.rpc.protocol._
import _root_.fs2._
import cats.effect.Effect
import io.grpc.Status

object Utils extends CommonUtils {

Expand All @@ -41,6 +42,10 @@ object Utils extends CommonUtils {
@stream[ResponseStreaming.type]
def serverStreaming(b: B): Stream[F, C]

@rpc(Protobuf)
@stream[ResponseStreaming.type]
def serverStreamingWithError(e: E): Stream[F, C]

@rpc(Protobuf, Gzip)
@stream[ResponseStreaming.type]
def serverStreamingCompressed(b: B): Stream[F, C]
Expand Down Expand Up @@ -100,6 +105,17 @@ object Utils extends CommonUtils {
Stream.fromIterator(cList.iterator)
}

def serverStreamingWithError(e: E): Stream[F, C] = e.foo match {
case "SE" =>
Stream.raiseError(Status.INVALID_ARGUMENT.withDescription(e.foo).asException)
case "SRE" =>
Stream.raiseError(Status.INVALID_ARGUMENT.withDescription(e.foo).asRuntimeException)
case "RTE" =>
Stream.raiseError(new IllegalArgumentException(e.foo))
case _ =>
sys.error(e.foo)
}

def serverStreamingCompressed(b: B): Stream[F, C] = serverStreaming(b)

def clientStreaming(oa: Stream[F, A]): F[D] =
Expand Down
40 changes: 35 additions & 5 deletions modules/server/src/test/scala/protocol/RPCTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@
package freestyle.rpc
package protocol

import cats.Monad
import cats.{Monad, MonadError}
import cats.syntax.flatMap._
import cats.syntax.functor._
import com.google.protobuf.InvalidProtocolBufferException
import org.scalatest._
import freestyle.rpc.common._
import freestyle.rpc.protocol.Utils.handlers.client.{
FreesRPCServiceClientCompressedHandler,
FreesRPCServiceClientHandler
}
import freestyle.rpc.protocol.Utils.handlers.client._
import freestyle.rpc.server._

class RPCTests extends RpcBaseTestSuite with BeforeAndAfterAll {
Expand Down Expand Up @@ -84,6 +81,22 @@ class RPCTests extends RpcBaseTestSuite with BeforeAndAfterAll {

}

"handle errors in unary services" in {

def clientProgram[F[_]](
errorCode: String)(implicit APP: MyRPCClient[F], M: MonadError[F, Throwable]): F[C] =
M.handleError(APP.uwe(a1, errorCode))(ex => C(ex.getMessage, a1))

clientProgram[ConcurrentMonad]("SE")
.unsafeRunSync() shouldBe C("INVALID_ARGUMENT: SE", a1)
clientProgram[ConcurrentMonad]("SRE")
.unsafeRunSync() shouldBe C("INVALID_ARGUMENT: SRE", a1)
clientProgram[ConcurrentMonad]("RTE")
.unsafeRunSync() shouldBe C("INTERNAL: RTE", a1)
clientProgram[ConcurrentMonad]("Thrown")
.unsafeRunSync() shouldBe C("UNKNOWN", a1)
}

"be able to run unary services with avro schema" in {

def clientProgram[F[_]](implicit APP: MyRPCClient[F]): F[C] =
Expand All @@ -102,6 +115,23 @@ class RPCTests extends RpcBaseTestSuite with BeforeAndAfterAll {

}

"handle errors in server streaming services" in {

def clientProgram[F[_]](errorCode: String)(
implicit APP: MyRPCClient[F],
M: MonadError[F, Throwable]): F[List[C]] =
M.handleError(APP.sswe(a1, errorCode))(ex => List(C(ex.getMessage, a1)))

clientProgram[ConcurrentMonad]("SE")
.unsafeRunSync() shouldBe List(C("INVALID_ARGUMENT: SE", a1))
clientProgram[ConcurrentMonad]("SRE")
.unsafeRunSync() shouldBe List(C("INVALID_ARGUMENT: SRE", a1))
clientProgram[ConcurrentMonad]("RTE")
.unsafeRunSync() shouldBe List(C("UNKNOWN", a1)) //todo: consider preserving the exception as is done for unary
clientProgram[ConcurrentMonad]("Thrown")
.unsafeRunSync() shouldBe List(C("UNKNOWN", a1))
}

"be able to run client streaming services" in {

def clientProgram[F[_]](implicit APP: MyRPCClient[F]): F[D] =
Expand Down
74 changes: 73 additions & 1 deletion modules/server/src/test/scala/protocol/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import cats.syntax.applicative._
import freestyle.rpc.common._
import freestyle.rpc.server.implicits._
import freestyle.tagless.tagless
import io.grpc.Status
import monix.reactive.Observable

object Utils extends CommonUtils {
Expand All @@ -40,10 +41,14 @@ object Utils extends CommonUtils {

@rpc(Avro) def unary(a: A): F[C]

@rpc(Avro) def unaryWithError(e: E): F[C]

@rpc(AvroWithSchema) def unaryWithSchema(a: A): F[C]

@rpc(Avro, Gzip) def unaryCompressed(a: A): F[C]

@rpc(Avro, Gzip) def unaryCompressedWithError(e: E): F[C]

@rpc(AvroWithSchema, Gzip) def unaryCompressedWithSchema(a: A): F[C]

@rpc(Protobuf) def empty(empty: Empty.type): F[Empty.type]
Expand Down Expand Up @@ -93,10 +98,18 @@ object Utils extends CommonUtils {
@stream[ResponseStreaming.type]
def serverStreaming(b: B): Observable[C]

@rpc(Protobuf)
@stream[ResponseStreaming.type]
def serverStreamingWithError(e: E): Observable[C]

@rpc(Protobuf, Gzip)
@stream[ResponseStreaming.type]
def serverStreamingCompressed(b: B): Observable[C]

@rpc(Protobuf, Gzip)
@stream[ResponseStreaming.type]
def serverStreamingCompressedWithError(e: E): Observable[C]

@rpc(Protobuf)
@stream[RequestStreaming.type]
def clientStreaming(oa: Observable[A]): F[D]
Expand Down Expand Up @@ -152,7 +165,9 @@ object Utils extends CommonUtils {
def bigDecimalAvroWithSchemaParam(bd: BigDecimal): F[BigDecimal]
def u(x: Int, y: Int): F[C]
def uws(x: Int, y: Int): F[C]
def uwe(a: A, err: String): F[C]
def ss(a: Int, b: Int): F[List[C]]
def sswe(a: A, err: String): F[List[C]]
def cs(cList: List[C], bar: Int): F[D]
def bs(eList: List[E]): F[E]
def bsws(eList: List[E]): F[E]
Expand All @@ -168,7 +183,8 @@ object Utils extends CommonUtils {
import service._
import freestyle.rpc.protocol._

class ServerRPCService[F[_]: Async] extends RPCService[F] {
class ServerRPCService[F[_]: Async](implicit M: MonadError[F, Throwable])
extends RPCService[F] {

def notAllowed(b: Boolean): F[C] = c1.pure

Expand All @@ -195,13 +211,35 @@ object Utils extends CommonUtils {

def unary(a: A): F[C] = c1.pure

def unaryWithError(e: E): F[C] = e.foo match {
case "SE" =>
M.raiseError(Status.INVALID_ARGUMENT.withDescription(e.foo).asException)
case "SRE" =>
M.raiseError(Status.INVALID_ARGUMENT.withDescription(e.foo).asRuntimeException)
case "RTE" =>
M.raiseError(new IllegalArgumentException(e.foo))
case _ =>
sys.error(e.foo)
}

def unaryWithSchema(a: A): F[C] = unary(a)

def serverStreaming(b: B): Observable[C] = {
debug(s"[SERVER] b -> $b")
Observable.fromIterable(cList)
}

def serverStreamingWithError(e: E): Observable[C] = e.foo match {
case "SE" =>
Observable.raiseError(Status.INVALID_ARGUMENT.withDescription(e.foo).asException)
case "SRE" =>
Observable.raiseError(Status.INVALID_ARGUMENT.withDescription(e.foo).asRuntimeException)
case "RTE" =>
Observable.raiseError(new IllegalArgumentException(e.foo))
case _ =>
sys.error(e.foo)
}

def clientStreaming(oa: Observable[A]): F[D] =
oa.foldLeftL(D(0)) {
case (current, a) =>
Expand Down Expand Up @@ -252,8 +290,12 @@ object Utils extends CommonUtils {

def unaryCompressedWithSchema(a: A): F[C] = unaryCompressed(a)

def unaryCompressedWithError(e: E): F[C] = unaryWithError(e)

def serverStreamingCompressed(b: B): Observable[C] = serverStreaming(b)

def serverStreamingCompressedWithError(e: E): Observable[C] = serverStreamingWithError(e)

def clientStreamingCompressed(oa: Observable[A]): F[D] = clientStreaming(oa)

def biStreamingCompressed(oe: Observable[E]): Observable[E] = biStreaming(oe)
Expand Down Expand Up @@ -326,6 +368,9 @@ object Utils extends CommonUtils {
override def uws(x: Int, y: Int): F[C] =
client.unaryWithSchema(A(x, y))

override def uwe(a: A, err: String): F[C] =
client.unaryWithError(E(a, err))

override def ss(a: Int, b: Int): F[List[C]] =
client
.serverStreaming(B(A(a, a), A(b, b)))
Expand All @@ -338,6 +383,18 @@ object Utils extends CommonUtils {
.toListL
.to[F]

override def sswe(a: A, err: String): F[List[C]] =
client
.serverStreamingWithError(E(a, err))
.zipWithIndex
.map {
case (c, i) =>
debug(s"[CLIENT] Result #$i: $c")
c
}
.toListL
.to[F]

override def cs(cList: List[C], bar: Int): F[D] =
client.clientStreaming(Observable.fromIterable(cList.map(c => c.a)))

Expand Down Expand Up @@ -417,6 +474,9 @@ object Utils extends CommonUtils {
override def u(x: Int, y: Int): F[C] =
client.unaryCompressed(A(x, y))

override def uwe(a: A, err: String): F[C] =
client.unaryCompressedWithError(E(a, err))

override def uws(x: Int, y: Int): F[C] =
client.unaryCompressedWithSchema(A(x, y))

Expand All @@ -432,6 +492,18 @@ object Utils extends CommonUtils {
.toListL
.to[F]

override def sswe(a: A, err: String): F[List[C]] =
client
.serverStreamingCompressedWithError(E(a, err))
.zipWithIndex
.map {
case (c, i) =>
debug(s"[CLIENT] Result #$i: $c")
c
}
.toListL
.to[F]

override def cs(cList: List[C], bar: Int): F[D] =
client.clientStreamingCompressed(Observable.fromIterable(cList.map(c => c.a)))

Expand Down