Skip to content

Commit

Permalink
fix binary trailer decoding (#304)
Browse files Browse the repository at this point in the history
* fix binary trailer decoding

* scalafmt
  • Loading branch information
jtjeferreira committed Apr 23, 2024
1 parent 83bdd04 commit 12eda29
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import pekko.annotation.InternalApi
import pekko.event.LoggingAdapter
import pekko.grpc.GrpcProtocol.GrpcProtocolReader
import pekko.grpc.{ GrpcClientSettings, GrpcResponseMetadata, GrpcSingleResponse, ProtobufSerializer }
import pekko.grpc.scaladsl.StringEntry
import pekko.http.scaladsl.model.HttpEntity.{ Chunk, Chunked, LastChunk, Strict }
import pekko.http.scaladsl.{ ClientTransport, ConnectionContext, Http }
import pekko.http.scaladsl.model._
import pekko.http.scaladsl.model.headers.RawHeader
import pekko.http.scaladsl.settings.ClientConnectionSettings
import pekko.stream.{ Materializer, OverflowStrategy }
import pekko.stream.scaladsl.{ Keep, Sink, Source }
Expand Down Expand Up @@ -228,9 +228,7 @@ object PekkoHttpClientUtils {
case Strict(_, data) =>
val rawTrailers =
response.attribute(AttributeKeys.trailer).map(_.headers).getOrElse(immutable.Seq.empty)
val trailers = rawTrailers.map(h => HttpHeader.parse(h._1, h._2)).collect {
case HttpHeader.ParsingResult.Ok(header, _) => header
}
val trailers = rawTrailers.map(h => RawHeader(h._1, h._2))
trailerPromise.success(trailers)
Source.single[ByteString](data)
case _ =>
Expand Down Expand Up @@ -285,8 +283,7 @@ object PekkoHttpClientUtils {

private def mapToStatusException(response: HttpResponse, trailers: Seq[HttpHeader]): StatusRuntimeException = {
val allHeaders = response.headers ++ trailers
val metadata: io.grpc.Metadata =
new MetadataImpl(allHeaders.map(h => (h.name, StringEntry(h.value))).toList).toGoogleGrpcMetadata()
val metadata: io.grpc.Metadata = new MetadataImpl(new HeaderMetadataImpl(allHeaders).asList).toGoogleGrpcMetadata()
allHeaders.find(_.name == "grpc-status").map(_.value) match {
case None =>
new StatusRuntimeException(mapHttpStatus(response).withDescription("No grpc-status found"), metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,32 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi
val source = PekkoHttpClientUtils.responseToSource(response, null)

val failure = source.run().failed.futureValue
failure shouldBe a[StatusRuntimeException]
// https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.UNIMPLEMENTED)
}

"map a strict 200 response with non-0 gRPC error code to a failed stream" in {
val responseHeaders = List(RawHeader("grpc-status", "9"), RawHeader("custom-key", "custom-value-in-header"))
val responseHeaders = RawHeader("grpc-status", "9") ::
RawHeader("custom-key", "custom-value-in-header") ::
RawHeader("custom-key-bin", ByteString("custom-trailer-value").encodeBase64.utf8String) ::
Nil
val response =
Future.successful(HttpResponse(OK, responseHeaders, Strict(GrpcProtocolNative.contentType, ByteString.empty)))
val source = PekkoHttpClientUtils.responseToSource(response, null)

val failure = source.run().failed.futureValue
failure shouldBe a[StatusRuntimeException]
failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION)
failure.asInstanceOf[StatusRuntimeException].getTrailers.get(key) should be("custom-value-in-header")
}

"map a strict 200 response with non-0 gRPC error code with a trailer to a failed stream with trailer metadata" in {
val responseHeaders = List(RawHeader("grpc-status", "9"))
val responseTrailers = Trailer(RawHeader("custom-key", "custom-trailer-value") :: Nil)
val responseTrailers = Trailer(
RawHeader("custom-key", "custom-trailer-value") ::
RawHeader("custom-key-bin", ByteString("custom-trailer-value").encodeBase64.utf8String) ::
Nil)
val response = Future.successful(
new HttpResponse(
OK,
Expand All @@ -72,8 +80,10 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi
val failure = source.run().failed.futureValue
failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION)
failure.asInstanceOf[StatusRuntimeException].getTrailers.get(key) should be("custom-trailer-value")
failure.asInstanceOf[StatusRuntimeException].getTrailers.get(keyBin) should be(ByteString("custom-trailer-value"))
}

lazy val key = Metadata.Key.of("custom-key", Metadata.ASCII_STRING_MARSHALLER)
lazy val keyBin = Metadata.Key.of("custom-key-bin", Metadata.BINARY_BYTE_MARSHALLER)
}
}

0 comments on commit 12eda29

Please sign in to comment.