Skip to content

Commit

Permalink
Elasticsearch: retry logic improvement (#2062)
Browse files Browse the repository at this point in the history
* Fixes #1566 ElasticsearchFlowStage retry logic

- failed messages are treated independently to avoid losing track of previously failed ones
- retry counts are kept per message so to guarantee the correct number of retries and intervals
- add test case proving fix correctness
  • Loading branch information
nivox authored and ennru committed Jan 13, 2020
1 parent dc20885 commit 993cbae
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,76 +49,82 @@ private[elasticsearch] final class ElasticsearchFlowStage[T, C](
private var upstreamFinished = false
private var inflight = 0

private val failureHandler = getAsyncCallback[(immutable.Seq[WriteMessage[T, C]], Throwable)](handleFailure)
private val responseHandler = getAsyncCallback[(immutable.Seq[WriteMessage[T, C]], Response)](handleResponse)
private var failedMessages: immutable.Seq[WriteMessage[T, C]] = Nil
private var retryCount: Int = 0
private val failureHandler =
getAsyncCallback[(immutable.Seq[WriteMessageWithRetry[T, C]], Throwable)](handleFailure)
private val responseHandler =
getAsyncCallback[(immutable.Seq[WriteMessageWithRetry[T, C]], Response)](handleResponse)

private def tryPull(): Unit =
if (!isClosed(in) && !hasBeenPulled(in)) {
pull(in)
}

override def onTimer(timerKey: Any): Unit = {
if (log.isDebugEnabled) log.debug("retrying inflight={} {}", inflight, failedMessages)
sendBulkUpdateRequest(failedMessages)
failedMessages = Nil
override def onTimer(timerKey: Any): Unit = timerKey match {
case RetrySend(failedMessages: immutable.Seq[WriteMessageWithRetry[T, C]] @unchecked) =>
if (log.isDebugEnabled) log.debug("retrying inflight={} {}", inflight, failedMessages)
sendBulkUpdateRequest(failedMessages)

case _ =>
}

private def handleFailure(args: (immutable.Seq[WriteMessage[T, C]], Throwable)): Unit = {
private def handleFailure(args: (immutable.Seq[WriteMessageWithRetry[T, C]], Throwable)): Unit = {
val (messages, exception) = args
if (!settings.retryLogic.shouldRetry(retryCount, List(exception.toString))) {
log.error("Received error from elastic. Giving up after {} tries. {}, Error: {}",
retryCount,
settings.retryLogic,
exception)
failStage(exception)
} else {
log.warning("Received error from elastic. Try number {}. {}, Error: {}",
retryCount,
settings.retryLogic,
exception)
retryCount = retryCount + 1
failedMessages = messages
scheduleOnce(RetrySend, settings.retryLogic.nextRetry(retryCount))

messages.groupBy(_.retryCount).foreach {
case (retryCount, messagesByRetryCount) =>
if (!settings.retryLogic.shouldRetry(retryCount, List(exception.toString))) {
log.error("Received error from elastic. Giving up after {} tries. {}, Error: {}",
retryCount,
settings.retryLogic,
exception)
failStage(exception)
} else {
log.warning("Received error from elastic. Try number {}. {}, Error: {}",
retryCount,
settings.retryLogic,
exception)
scheduleOnce(RetrySend(messagesByRetryCount), settings.retryLogic.nextRetry(retryCount))
}
}
}

private def handleResponse(args: (immutable.Seq[WriteMessage[T, C]], Response)): Unit = {
private def handleResponse(args: (immutable.Seq[WriteMessageWithRetry[T, C]], Response)): Unit = {
val (messages, response) = args
val jsonString = EntityUtils.toString(response.getEntity)
if (log.isDebugEnabled) {
import spray.json._
log.debug("response {}", jsonString.parseJson.prettyPrint)
}
val messageResults = restApi.toWriteResults(messages, jsonString)

val failedMsgs = messageResults.filterNot(_.error.isEmpty)

if (failedMsgs.nonEmpty && settings.retryLogic.shouldRetry(retryCount, failedMsgs.map(_.error.get).toList)) {
retryPartialFailedMessages(messageResults, failedMsgs)
val messageResults = restApi.toWriteResults(messages.map(_.writeMessage), jsonString)
val partialFailure = messageResults.exists(_.error.nonEmpty)
if (partialFailure) {
val messageResultsWithRetry = messageResults.zip(messages.map(_.retryCount))
retryPartialFailedMessages(messageResultsWithRetry)
} else {
retryCount = 0
emitResults(messageResults)
}
}

private def retryPartialFailedMessages(
messageResults: immutable.Seq[WriteResult[T, C]],
failedMsgs: immutable.Seq[WriteResult[T, C]]
): Unit = {
private def retryPartialFailedMessages(messageResultsWithRetry: immutable.Seq[(WriteResult[T, C], Int)]): Unit = {
val (failedMsgsIt, successMsgsIt) = messageResultsWithRetry.iterator.partition(_._1.error.nonEmpty)
val failedMsgs = failedMsgsIt.toList
if (log.isDebugEnabled) log.debug("retryPartialFailedMessages inflight={} {}", inflight, failedMsgs)
// Retry partial failed messages
// NOTE: When we partially return message like this, message will arrive out of order downstream
// and it can break commit-logic when using Kafka
retryCount = retryCount + 1
failedMessages = failedMsgs.map(_.message) // These are the messages we're going to retry
scheduleOnce(RetrySend, settings.retryLogic.nextRetry(retryCount))

val successMsgs = messageResults.filter(_.error.isEmpty)
if (successMsgs.nonEmpty) {
// push the messages that DID succeed
emitResults(successMsgs)

emitResults(successMsgsIt.map(_._1).toList)
failedMsgs.groupBy(_._2).foreach {
case (retryCount, failedMsgsByRetry) =>
if (settings.retryLogic.shouldRetry(retryCount, Nil)) {
val updatedFailedMsgsByRetry = failedMsgsByRetry.map {
case (wr, _) =>
WriteMessageWithRetry(wr.message, retryCount + 1)
}
scheduleOnce(RetrySend(updatedFailedMsgsByRetry), settings.retryLogic.nextRetry(retryCount))
} else {
emitResults(failedMsgsByRetry.map(_._1))
}
}
}

Expand All @@ -129,8 +135,8 @@ private[elasticsearch] final class ElasticsearchFlowStage[T, C](
if (upstreamFinished && inflight == 0) completeStage()
}

private def sendBulkUpdateRequest(messages: immutable.Seq[WriteMessage[T, C]]): Unit = {
val json: String = restApi.toJson(messages)
private def sendBulkUpdateRequest(messages: immutable.Seq[WriteMessageWithRetry[T, C]]): Unit = {
val json: String = restApi.toJson(messages.map(_.writeMessage))

log.debug("Posting data to Elasticsearch: {}", json)

Expand All @@ -153,8 +159,9 @@ private[elasticsearch] final class ElasticsearchFlowStage[T, C](

override def onPush(): Unit = {
val messages = grab(in)
inflight += messages.size
sendBulkUpdateRequest(messages)
val messagesWithRetry = messages.map(WriteMessageWithRetry(_, 0))
inflight += messagesWithRetry.size
sendBulkUpdateRequest(messagesWithRetry)
}

override def onUpstreamFinish(): Unit =
Expand All @@ -168,6 +175,6 @@ private[elasticsearch] final class ElasticsearchFlowStage[T, C](
*/
@InternalApi
private[elasticsearch] object ElasticsearchFlowStage {

private object RetrySend
private case class WriteMessageWithRetry[T, C](writeMessage: WriteMessage[T, C], retryCount: Int)
private case class RetrySend[T, C](messages: Seq[WriteMessageWithRetry[T, C]])
}
46 changes: 46 additions & 0 deletions elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,52 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll wi
)
}

"retry ALL failed document and pass retried documents to downstream" in assertAllStagesStopped {
val indexName = "sink5_1"

val bookNr = 100
val writeMsgs = Iterator
.from(0)
.take(bookNr)
.grouped(5)
.zipWithIndex
.flatMap {
case (numBlock, index) =>
val writeMsgBlock = numBlock.map { n =>
WriteMessage
.createCreateMessage(n.toString, Map("title" -> s"Book ${n}"))
.withPassThrough(n)
}

val writeMsgFailed = WriteMessage
.createCreateMessage("0", Map("title" -> s"Failed"))
.withPassThrough(bookNr + index)

(writeMsgBlock ++ Iterator(writeMsgFailed)).toList
}
.toList

val createBooks = Source(writeMsgs)
.via(
ElasticsearchFlow.createWithPassThrough(
indexName,
"_doc",
ElasticsearchWriteSettings()
.withRetryLogic(RetryAtFixedRate(5, 1.millis))
)
)
.runWith(Sink.seq)

val writeResults = createBooks.futureValue

writeResults should have size writeMsgs.size

flush(indexName)

val expectedBookTitles = Iterator.from(0).map(n => s"Book ${n}").take(bookNr).toSet
readTitlesFrom(indexName).futureValue should contain theSameElementsAs expectedBookTitles
}

"kafka-example - store documents and pass Responses with passThrough" in assertAllStagesStopped {

//#kafka-example
Expand Down

0 comments on commit 993cbae

Please sign in to comment.