-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Wrap MetricsControl access to consumer in future #531
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Great that the access to the ActorRef
was hidden behind a method. So changing it to Future[ActorRef]
is not going to change use API.
Needs a rebase. |
f4c293c
to
37bafa9
Compare
.ask(RequestMetrics)(Timeout(1.minute)) | ||
.mapTo[ConsumerMetrics] | ||
.map(_.metrics)(ExecutionContexts.sameThreadExecutionContext) | ||
}(executionContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ennru I'm calling metrics
after materializing a stream and I'm getting an exception like this:
java.lang.IllegalStateException: not yet initialized: only setHandler is allowed in GraphStageLogic constructor. To access materializer use Source/Flow/Sink.setup factory
at akka.stream.stage.GraphStageLogic.interpreter(GraphStage.scala:393)
at akka.stream.stage.GraphStageLogic.materializer(GraphStage.scala:403)
at akka.kafka.internal.BaseSingleSourceLogic.executionContext(BaseSingleSourceLogic.scala:34)
at akka.kafka.internal.MetricsControl.metrics(ControlImplementations.scala:88)
at akka.kafka.internal.MetricsControl.metrics$(ControlImplementations.scala:78)
at akka.kafka.internal.BaseSingleSourceLogic.metrics(BaseSingleSourceLogic.scala:26)
The flow takes messages from a KafkaConsumerActor
and sends them to another actor, the whole thing looks like this:
val sink: Sink[Output, NotUsed] =
ActorSink.actorRefWithAck[Output, SinkActorCommandType, Ack](...)
val control: Consumer.Control =
Consumer.plainExternalSource[String, Array[Byte]](kafkaConsumerActor, subscription)
.map { consumerRecord =>
processRecord(consumerRecord)
}
.to(sink)
.run()
control.metrics
Did the race condition just move to a different place now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, looks like somebody already raised the issue, #847
Fixes #528