Skip to content

Substream Source(EntitySource) cannot be materialized more than once using logRequestAndResponse #4148

@albertoadami

Description

@albertoadami

Hi to everyone,
I developed some simple utility function to logging the request JSON for post and put as follows:

import akka.event.LoggingAdapter
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.directives.{ DebuggingDirectives, LoggingMagnet }
import akka.stream.Materializer
import akka.util.{ ByteString, Timeout }
import com.typesafe.scalalogging.LazyLogging

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success }

object LoggingDirectives extends LazyLogging {
  private def logRequestEntity(
      route: Route)(implicit m: Materializer, executionContext: ExecutionContext, timeout: Timeout): Route = {
    def requestEntityLoggingFunction(loggingAdapter: LoggingAdapter)(req: HttpRequest): Unit = {
      val durationTimeout = timeout.duration
      val bodyAsBytes: Future[ByteString] = req.entity.toStrict(durationTimeout).map(_.data)
      val bodyAsString: Future[String] = bodyAsBytes.map(_.utf8String)
      bodyAsString.onComplete {
        case Success(body) =>
          val logMsg =
            s"$req\nReceived ${req.method.value} ${req.uri.path.toString()} request with request body $body"
          loggingAdapter.info(logMsg)
        case Failure(t) =>
          val logMsg = s"Failed to get the body for: $req"
          loggingAdapter.error(t, logMsg)
      }
    }

    DebuggingDirectives.logRequest(LoggingMagnet(requestEntityLoggingFunction))(route)
  }

  def putAndLog(route: Route)(implicit m: Materializer, executionContext: ExecutionContext, timeout: Timeout): Route =
    put {
      logRequestEntity(route)
    }

  def postAndLog(route: Route)(implicit m: Materializer, executionContext: ExecutionContext, timeout: Timeout): Route =
    post {
      logRequestEntity(route)
    }

} 


I'm using them in the code in this way:

pathEndOrSingleSlash {
        postAndLog { 
          entity(as[PostRequestPayload]) { request =>
            //some business logic
          }
        } ~
        put {
          entity(as[PutRequestPayload]) { request =>
            //some business logic
          }
        }

      }

But when the endpoints that are using this functions I'm getting the following exception:

java.lang.IllegalStateException: Substream Source(EntitySource) cannot be materialized more than once
	at akka.stream.impl.fusing.SubSource$$anon$11.createMaterializedTwiceException(StreamOfStreams.scala:846)
	at akka.stream.impl.fusing.SubSource$$anon$11.<init>(StreamOfStreams.scala:816)
	at akka.stream.impl.fusing.SubSource.createLogic(StreamOfStreams.scala:812)
	at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:106)
	at akka.stream.stage.GraphStageWithMaterializedValue.createLogicAndMaterializedValue(GraphStage.scala:50)
	at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:705)
	at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:503)
	at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:452)
	at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:444)
	at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:746)
	at akka.stream.scaladsl.Source.runWith(Source.scala:118)
	at akka.http.scaladsl.model.HttpEntity.toStrict(HttpEntity.scala:94)
	at akka.http.scaladsl.model.HttpEntity.toStrict$(HttpEntity.scala:88)
	at akka.http.scaladsl.model.HttpEntity$Default.toStrict(HttpEntity.scala:384)
	at akka.http.scaladsl.model.HttpEntity.toStrict(HttpEntity.scala:80)
	at akka.http.scaladsl.model.HttpEntity.toStrict$(HttpEntity.scala:77)
	at akka.http.scaladsl.model.HttpEntity$Default.toStrict(HttpEntity.scala:384)
	at it.warda.seecommerce.routes.SeeCommerceRoutes.requestEntityLoggingFunction$1(SeeCommerceRoutes.scala:69)
	at LoggingDirectives.$anonfun$logRequestEntity$5
	at LoggingDirectives.$anonfun$logRequestEntity$5$adapted
	at akka.http.scaladsl.server.directives.DebuggingDirectives.$anonfun$logRequest$1(DebuggingDirectives.scala:27)
	at akka.http.scaladsl.server.Directive$SingleValueTransformers$.$anonfun$flatMap$1(Directive.scala:188)
	at akka.http.scaladsl.server.Directive.$anonfun$tflatMap$2(Directive.scala:91)
	at akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$textract$2(BasicDirectives.scala:161)
	at akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$mapRouteResult$2(BasicDirectives.scala:68)
	at akka.http.scaladsl.server.directives.FutureDirectives.$anonfun$onComplete$3(FutureDirectives.scala:37)
	at akka.http.scaladsl.util.FastFuture$.$anonfun$transformWith$1(FastFuture.scala:37)
	at akka.http.scaladsl.util.FastFuture$.strictTransform$1(FastFuture.scala:41)
	at akka.http.scaladsl.util.FastFuture$.$anonfun$transformWith$3(FastFuture.scala:51)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.localPollAndExec(ForkJoinPool.java:977)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1605)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)

Am I wrong with something? I'm consuming the Json using the .toStrict method that should avoid the stream to consume the request right?

Thanks for the help.

Best regards.

Alberto

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions