From 993cbae96dc22e177567f52b01b888d2b46a7928 Mon Sep 17 00:00:00 2001 From: Andrea Zito Date: Mon, 13 Jan 2020 15:49:05 +0100 Subject: [PATCH] Elasticsearch: retry logic improvement (#2062) * Fixes #1566 ElasticsearchFlowStage retry logic - failed messages are treated independently to avoid losing track of previously failed ones - retry counts are kept per message so to guarantee the correct number of retries and intervals - add test case proving fix correctness --- .../impl/ElasticsearchFlowStage.scala | 105 ++++++++++-------- .../docs/scaladsl/ElasticsearchSpec.scala | 46 ++++++++ 2 files changed, 102 insertions(+), 49 deletions(-) diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchFlowStage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchFlowStage.scala index e96885ee60..3409ac4055 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchFlowStage.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchFlowStage.scala @@ -49,76 +49,82 @@ private[elasticsearch] final class ElasticsearchFlowStage[T, C]( private var upstreamFinished = false private var inflight = 0 - private val failureHandler = getAsyncCallback[(immutable.Seq[WriteMessage[T, C]], Throwable)](handleFailure) - private val responseHandler = getAsyncCallback[(immutable.Seq[WriteMessage[T, C]], Response)](handleResponse) - private var failedMessages: immutable.Seq[WriteMessage[T, C]] = Nil - private var retryCount: Int = 0 + private val failureHandler = + getAsyncCallback[(immutable.Seq[WriteMessageWithRetry[T, C]], Throwable)](handleFailure) + private val responseHandler = + getAsyncCallback[(immutable.Seq[WriteMessageWithRetry[T, C]], Response)](handleResponse) private def tryPull(): Unit = if (!isClosed(in) && !hasBeenPulled(in)) { pull(in) } - override def onTimer(timerKey: Any): Unit = { - if (log.isDebugEnabled) log.debug("retrying inflight={} {}", inflight, failedMessages) - sendBulkUpdateRequest(failedMessages) - failedMessages = Nil + override def onTimer(timerKey: Any): Unit = timerKey match { + case RetrySend(failedMessages: immutable.Seq[WriteMessageWithRetry[T, C]] @unchecked) => + if (log.isDebugEnabled) log.debug("retrying inflight={} {}", inflight, failedMessages) + sendBulkUpdateRequest(failedMessages) + + case _ => } - private def handleFailure(args: (immutable.Seq[WriteMessage[T, C]], Throwable)): Unit = { + private def handleFailure(args: (immutable.Seq[WriteMessageWithRetry[T, C]], Throwable)): Unit = { val (messages, exception) = args - if (!settings.retryLogic.shouldRetry(retryCount, List(exception.toString))) { - log.error("Received error from elastic. Giving up after {} tries. {}, Error: {}", - retryCount, - settings.retryLogic, - exception) - failStage(exception) - } else { - log.warning("Received error from elastic. Try number {}. {}, Error: {}", - retryCount, - settings.retryLogic, - exception) - retryCount = retryCount + 1 - failedMessages = messages - scheduleOnce(RetrySend, settings.retryLogic.nextRetry(retryCount)) + + messages.groupBy(_.retryCount).foreach { + case (retryCount, messagesByRetryCount) => + if (!settings.retryLogic.shouldRetry(retryCount, List(exception.toString))) { + log.error("Received error from elastic. Giving up after {} tries. {}, Error: {}", + retryCount, + settings.retryLogic, + exception) + failStage(exception) + } else { + log.warning("Received error from elastic. Try number {}. {}, Error: {}", + retryCount, + settings.retryLogic, + exception) + scheduleOnce(RetrySend(messagesByRetryCount), settings.retryLogic.nextRetry(retryCount)) + } } } - private def handleResponse(args: (immutable.Seq[WriteMessage[T, C]], Response)): Unit = { + private def handleResponse(args: (immutable.Seq[WriteMessageWithRetry[T, C]], Response)): Unit = { val (messages, response) = args val jsonString = EntityUtils.toString(response.getEntity) if (log.isDebugEnabled) { import spray.json._ log.debug("response {}", jsonString.parseJson.prettyPrint) } - val messageResults = restApi.toWriteResults(messages, jsonString) - - val failedMsgs = messageResults.filterNot(_.error.isEmpty) - - if (failedMsgs.nonEmpty && settings.retryLogic.shouldRetry(retryCount, failedMsgs.map(_.error.get).toList)) { - retryPartialFailedMessages(messageResults, failedMsgs) + val messageResults = restApi.toWriteResults(messages.map(_.writeMessage), jsonString) + val partialFailure = messageResults.exists(_.error.nonEmpty) + if (partialFailure) { + val messageResultsWithRetry = messageResults.zip(messages.map(_.retryCount)) + retryPartialFailedMessages(messageResultsWithRetry) } else { - retryCount = 0 emitResults(messageResults) } } - private def retryPartialFailedMessages( - messageResults: immutable.Seq[WriteResult[T, C]], - failedMsgs: immutable.Seq[WriteResult[T, C]] - ): Unit = { + private def retryPartialFailedMessages(messageResultsWithRetry: immutable.Seq[(WriteResult[T, C], Int)]): Unit = { + val (failedMsgsIt, successMsgsIt) = messageResultsWithRetry.iterator.partition(_._1.error.nonEmpty) + val failedMsgs = failedMsgsIt.toList if (log.isDebugEnabled) log.debug("retryPartialFailedMessages inflight={} {}", inflight, failedMsgs) // Retry partial failed messages // NOTE: When we partially return message like this, message will arrive out of order downstream // and it can break commit-logic when using Kafka - retryCount = retryCount + 1 - failedMessages = failedMsgs.map(_.message) // These are the messages we're going to retry - scheduleOnce(RetrySend, settings.retryLogic.nextRetry(retryCount)) - - val successMsgs = messageResults.filter(_.error.isEmpty) - if (successMsgs.nonEmpty) { - // push the messages that DID succeed - emitResults(successMsgs) + + emitResults(successMsgsIt.map(_._1).toList) + failedMsgs.groupBy(_._2).foreach { + case (retryCount, failedMsgsByRetry) => + if (settings.retryLogic.shouldRetry(retryCount, Nil)) { + val updatedFailedMsgsByRetry = failedMsgsByRetry.map { + case (wr, _) => + WriteMessageWithRetry(wr.message, retryCount + 1) + } + scheduleOnce(RetrySend(updatedFailedMsgsByRetry), settings.retryLogic.nextRetry(retryCount)) + } else { + emitResults(failedMsgsByRetry.map(_._1)) + } } } @@ -129,8 +135,8 @@ private[elasticsearch] final class ElasticsearchFlowStage[T, C]( if (upstreamFinished && inflight == 0) completeStage() } - private def sendBulkUpdateRequest(messages: immutable.Seq[WriteMessage[T, C]]): Unit = { - val json: String = restApi.toJson(messages) + private def sendBulkUpdateRequest(messages: immutable.Seq[WriteMessageWithRetry[T, C]]): Unit = { + val json: String = restApi.toJson(messages.map(_.writeMessage)) log.debug("Posting data to Elasticsearch: {}", json) @@ -153,8 +159,9 @@ private[elasticsearch] final class ElasticsearchFlowStage[T, C]( override def onPush(): Unit = { val messages = grab(in) - inflight += messages.size - sendBulkUpdateRequest(messages) + val messagesWithRetry = messages.map(WriteMessageWithRetry(_, 0)) + inflight += messagesWithRetry.size + sendBulkUpdateRequest(messagesWithRetry) } override def onUpstreamFinish(): Unit = @@ -168,6 +175,6 @@ private[elasticsearch] final class ElasticsearchFlowStage[T, C]( */ @InternalApi private[elasticsearch] object ElasticsearchFlowStage { - - private object RetrySend + private case class WriteMessageWithRetry[T, C](writeMessage: WriteMessage[T, C], retryCount: Int) + private case class RetrySend[T, C](messages: Seq[WriteMessageWithRetry[T, C]]) } diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala index c4846e2c54..8cf3bd856f 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala @@ -437,6 +437,52 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll wi ) } + "retry ALL failed document and pass retried documents to downstream" in assertAllStagesStopped { + val indexName = "sink5_1" + + val bookNr = 100 + val writeMsgs = Iterator + .from(0) + .take(bookNr) + .grouped(5) + .zipWithIndex + .flatMap { + case (numBlock, index) => + val writeMsgBlock = numBlock.map { n => + WriteMessage + .createCreateMessage(n.toString, Map("title" -> s"Book ${n}")) + .withPassThrough(n) + } + + val writeMsgFailed = WriteMessage + .createCreateMessage("0", Map("title" -> s"Failed")) + .withPassThrough(bookNr + index) + + (writeMsgBlock ++ Iterator(writeMsgFailed)).toList + } + .toList + + val createBooks = Source(writeMsgs) + .via( + ElasticsearchFlow.createWithPassThrough( + indexName, + "_doc", + ElasticsearchWriteSettings() + .withRetryLogic(RetryAtFixedRate(5, 1.millis)) + ) + ) + .runWith(Sink.seq) + + val writeResults = createBooks.futureValue + + writeResults should have size writeMsgs.size + + flush(indexName) + + val expectedBookTitles = Iterator.from(0).map(n => s"Book ${n}").take(bookNr).toSet + readTitlesFrom(indexName).futureValue should contain theSameElementsAs expectedBookTitles + } + "kafka-example - store documents and pass Responses with passThrough" in assertAllStagesStopped { //#kafka-example