Skip to content

Commit

Permalink
Update offsets after message send
Browse files Browse the repository at this point in the history
  • Loading branch information
2m committed Mar 29, 2019
1 parent a93b8b7 commit d381b4f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.Decider
import akka.stream.{Attributes, FlowShape, Supervision}
import akka.stream.stage._
import org.apache.kafka.clients.producer.{Callback, Producer, RecordMetadata}
import org.apache.kafka.clients.producer.{Callback, Producer, ProducerRecord, RecordMetadata}

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -83,6 +83,8 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
override val onMessageAckCb: AsyncCallback[Envelope[K, V, P]] = getAsyncCallback[Envelope[K, V, P]] { _ =>
}

def postSend(msg: Envelope[K, V, P]) = ()

setHandler(stage.out, new OutHandler {
override def onPull(): Unit = tryPull(stage.in)
})
Expand Down Expand Up @@ -115,6 +117,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
onMessageAckCb.invoke(msg)
r.success(Result(metadata, msg))
}))
postSend(msg)
val future = r.future.asInstanceOf[Future[OUT]]
push(stage.out, future)

Expand All @@ -127,6 +130,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
producer.send(msg, sendCallback(r, onSuccess = metadata => r.success(MultiResultPart(metadata, msg))))
r.future
}
postSend(multiMsg)
implicit val ec: ExecutionContext = this.materializer.executionContext
val res = Future.sequence(promises).map { parts =>
onMessageAckCb.invoke(multiMsg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ private final class TransactionalProducerStageLogic[K, V, P](stage: Transactiona
}

private def suspendDemand(): Unit =
setHandler(stage.out, new OutHandler {
// suspend demand while a commit is in process so we can drain any outstanding message acknowledgements
override def onPull(): Unit = ()
})
setHandler(
stage.out,
new OutHandler {
// suspend demand while a commit is in process so we can drain any outstanding message acknowledgements
override def onPull(): Unit = ()
}
)

override protected def onTimer(timerKey: Any): Unit =
if (timerKey == commitSchedulerKey) {
Expand All @@ -141,11 +144,10 @@ private final class TransactionalProducerStageLogic[K, V, P](stage: Transactiona
}
}

override val onMessageAckCb: AsyncCallback[Envelope[K, V, P]] =
getAsyncCallback[Envelope[K, V, P]](_.passThrough match {
case o: ConsumerMessage.PartitionOffset => batchOffsets = batchOffsets.updated(o)
case _ =>
})
override def postSend(msg: Envelope[K, V, P]): Unit = msg.passThrough match {
case o: ConsumerMessage.PartitionOffset => batchOffsets = batchOffsets.updated(o)
case _ =>
}

override def onCompletionSuccess(): Unit = {
log.debug("Committing final transaction before shutdown")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec)
(0 until sourcePartitions).map(
part => produce(sourceTopic, ((part * partitionSize) + 1) to (partitionSize * (part + 1)), part)
)
Await.result(Future.sequence(producers), remainingOrDefault)
Await.result(Future.sequence(producers), 30.seconds)

val consumerSettings = consumerDefaults.withGroupId(group)

Expand Down

0 comments on commit d381b4f

Please sign in to comment.