Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch createWithPassThrough sometimes doesn't return WriteResult [1.0-M3] #1566

Closed
danelkotev opened this issue Mar 7, 2019 · 11 comments

Comments

@danelkotev
Copy link
Contributor

Hi,

I upgraded to 1.0-M3 version of Alpakka ES and it seems like there is a bug when using ElasticsearchFlow.createWithPassThrough - I'm not receiving WriteResult for all bulk documents.
Everything worked fine when I used 1.0-M1 version of Alpakka.
I know you have made some changes in ES, and I think it's related.

@ennru
Copy link
Member

ennru commented Mar 7, 2019

Thank you for reporting, that is definitely possible.

@ennru
Copy link
Member

ennru commented Mar 11, 2019

Do you have retrying enabled? When retrying at least order isn't kept.

@danelkotev
Copy link
Contributor Author

Hi, I do use retrying and the problem is not with the order. It seems like some messages are missing. I have a unit test that can be used to produce the issue.
Anyways, when I not using retry mechanism, it seems like everything works as expected. On 1.0-M1 using retry mechanism yield the expected result.

@ennru
Copy link
Member

ennru commented Mar 12, 2019

Ok, so it seems related to retrying. Can you see (eg. in the log) if it actually retries in cases where you are missing WriteResults?

@ennru
Copy link
Member

ennru commented Mar 15, 2019

Please try creating a RetryLogic similar to RetryAtFixedRate and log and exceptions it receives in shouldRetry. It would be interesting to improve this back to the earlier behaviour.

@danelkotev
Copy link
Contributor Author

I created a unit test with RetryAtFixedRate. This test reaches timeout. I don't see lost of messages in logs. Please review my test below and share your opinion.

"index bulk of document with passThrough" in assertAllStagesStopped {
      val indexName = "sink10"
      val sink = ElasticsearchFlow.createWithPassThrough[Book, Book](
        indexName = indexName,
        typeName = "doc",
        ElasticsearchWriteSettings() //.withBufferSize(5)
          .withRetryLogic(RetryAtFixedRate(maxRetries = 5,
          retryInterval = 1.millis))
      )

      val bookMessages = List(
        Book("Book 1"),
        Book("Book 2"),
        Book("Book 3"),
        Book("Book 4"),
        Book("Book 5"),
        Book("Book 6"),
        Book("Book 3"),
        Book("Book 8"),
        Book("Book 9"),
        Book("Book 10"),
        Book("Book 3"),
        Book("Book 11"),
        Book("Book 12"),
        Book("Book 13"),
        Book("Book 14"),
        Book("Book 3"),
        Book("Book 1"),
        Book("Book 2"),
        Book("Book 3"),
        Book("Book 1"),
        Book("Book 2"),
        Book("Book 3"),
        Book("Book 1"),
        Book("Book 2"),
        Book("Book 3"),
        Book("Book 1")
      )

      val bookToEs = Source(bookMessages)
        .map { book: Book =>
          val id = book.title
          println("title: " + book.title)
          // Transform message so that we can write to elastic
          WriteMessage.createCreateMessage(id, book).withPassThrough(book)
        }
        .via( // write to elastic
          sink
        )
        .map { result =>
          if (result.success) println("success")
          else println("failure")
        }
        .runFold(0)((count, msg) => count + 1)

      bookToEs.futureValue( Timeout(PatienceConfig(5 minute).timeout)) shouldBe 26
    }

@danelkotev
Copy link
Contributor Author

danelkotev commented Mar 23, 2019

In addition, I took a look at how RetryLogic works, and as far as I understood RetryLogic does not aggregate failedMessages and just overrides them when new failures arrive.
When I changed retryPartialFailedMessages to update failedMessages using union:
failedMessages = failedMessages union failedMsgs.map(_.message), the code didn't reach timeout (but this is not the solution).
Maybe I don't understand how RetryLogic works, however I believe that failedMessages should hold leftRetries for every message, and in general failedMessages should be updated respectfully and not overridden.

@ennru
Copy link
Member

ennru commented Apr 26, 2019

Sorry for being so quiet, I've been traveling and on vacation.
Your reasoning about failedMessages makes sense to me.
I'll try to get back on this...

@nivox
Copy link
Contributor

nivox commented Dec 23, 2019

I've stumbled on the same issue.

In my case there are 2 document updates:

  1. analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary
  2. analyticJob~72679b24-49ca-3d7c-b37d-b114b31c6577~tileDwellings

Doc 1 is part of a bulk update along with other documents, while doc 2 is alone.
Doc 2 fails and gets scheduled for retry. Right after, also doc 1 fails and gets scheduled for retry.

After some time the retry is carried on but only for doc 1.

Here are the actual logs:

13:50:25.382 [ DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - Posting data to Elasticsearch: ...
{"update":{"_id":"analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary","_index":"thinkin-jobs-prod-static","_type":"_doc"}}
{"doc": {...}},"doc_as_upsert":true}
13:50:25.384 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - Posting data to Elasticsearch: {"update":{"_id":"analyticJob~72679b24-49ca-3d7c-b37d-b114b31c6577~tileDwellings","_index":"thinkin-jobs-prod-static","_type":"_doc"}}
{"doc":{...},"doc_as_upsert":true}

...

13:50:25.436 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - response {
  "errors": true,
  "items": [{
    "update": {
      "_id": "analyticJob~72679b24-49ca-3d7c-b37d-b114b31c6577~tileDwellings",
      "_index": "thinkin-jobs-prod-static",
      "_type": "_doc",
      "error": {
        "index": "thinkin-jobs-prod-static",
        "index_uuid": "2DinhZHvR7mwuHGuubRWlw",
        "reason": "[_doc][analyticJob~72679b24-49ca-3d7c-b37d-b114b31c6577~tileDwellings]: version conflict, current version [4] is different than the one provided [3]",
        "shard": "2",
        "type": "version_conflict_engine_exception"
      },
      "status": 409
    }
  }],
  "took": 48
}
13:50:25.437 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - retryPartialFailedMessages inflight=63 Vector(WriteResult(message=WriteMessage(operation=update,id=Some(analyticJob~72679b24-49ca-3d7c-b37d-b114b31c6577~tileDwellings),source=Some({...}),passThrough=...,version=None,indexName=Some(thinkin-jobs-prod-static),customMetadata=Map()),error=Some({"index":"thinkin-jobs-prod-static","index_uuid":"2DinhZHvR7mwuHGuubRWlw","reason":"[_doc][analyticJob~72679b24-49ca-3d7c-b37d-b114b31c6577~tileDwellings]: version conflict, current version [4] is different than the one provided [3]","shard":"2","type":"version_conflict_engine_exception"})))

...

13:50:26.025 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - response {
  "errors": true,
  "items": [...
    , {
    "update": {
      "_id": "analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary",
      "_index": "thinkin-jobs-prod-static",
      "_type": "_doc",
      "error": {
        "index": "thinkin-jobs-prod-static",
        "index_uuid": "2DinhZHvR7mwuHGuubRWlw",
        "reason": "[_doc][analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary]: version conflict, current version [4] is different than the one provided [3]",
        "shard": "2",
        "type": "version_conflict_engine_exception"
      },
      "status": 409
    }
  }],
  "took": 639
}
13:50:26.025 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - retryPartialFailedMessages inflight=62 Vector(WriteResult(message=WriteMessage(operation=update,id=Some(analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary),source=Some({...}),passThrough=...,version=None,indexName=Some(thinkin-jobs-prod-static),customMetadata=Map()),error=Some({"index":"thinkin-jobs-prod-static","index_uuid":"2DinhZHvR7mwuHGuubRWlw","reason":"[_doc][analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary]: version conflict, current version [4] is different than the one provided [3]","shard":"2","type":"version_conflict_engine_exception"})))

...

13:50:27.043 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - retrying inflight=2 Vector(WriteMessage(operation=update,id=Some(analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary),source=Some({...}),passThrough=...,version=None,indexName=Some(thinkin-jobs-prod-static),customMetadata=Map()))
13:50:27.043 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - Posting data to Elasticsearch: {"update":{"_id":"analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary","_index":"thinkin-jobs-prod-static","_type":"_doc"}}
{...},"doc_as_upsert":true}
13:50:27.148 DEBUG a.s.a.e.i.ElasticsearchFlowStage$StageLogic - response {
  "errors": false,
  "items": [{
    "update": {
      "_id": "analyticJob~ef40a099-a7b3-3c8e-8dfa-c4c3913c8579~tileDwellingSummary",
      "_index": "thinkin-jobs-prod-static",
      "_primary_term": 3,
      "_seq_no": 6362956,
      "_shards": {
        "failed": 0,
        "successful": 2,
        "total": 2
      },
      "_type": "_doc",
      "_version": 5,
      "result": "updated",
      "status": 200
    }
  }],
  "took": 101
}

Looking at the code for retryPartialFailedMessages it seems to me that the line

failedMessages = failedMsgs.map(_.message) // These are the messages we're going to retry

causes the loosing of previous failed messages if a retry doesn't happen before the next failure.

I think that the desired behaviour should be:

failedMessages = failedMessages ++ failedMsgs.map(_.message)

The same reasoning should also be applied for the handleFailure function by turning

failedMessages = messages

into

failedMessages = failedMessages ++ messages

Furthermore to correctly handle the retry count for messages failed at different times, instead of having a global retryCount, it should somehow tied to each message.

I'm going to prepare a patch and test it on my code in the next days.

I see that #2031 is targeting the retry logic, will that be a rewrite of it thus solving this issue at the root (and making possible patches moot)?

nivox added a commit to nivox/alpakka that referenced this issue Dec 27, 2019
- failed messages are treated independently to avoid loosing track of previously failed ones
- retry counts are kept per message so to guarantee correct number of retries and intervals
- added test case proving fix correctness
@Teudimundo
Copy link

I experienced a similar issue. The PR by @nivox seems to fix it.

@seglo
Copy link
Member

seglo commented Jan 3, 2020

@nivox Thanks for the analysis and the PR!

I see that #2031 is targeting the retry logic, will that be a rewrite of it thus solving this issue at the root (and making possible patches moot)?

Using an upstream RetryFlow in the ES connector is a goal, but its design is inspired by retry use cases we've found in the ES and other Alpakka connectors. I think we can evaluate the PR at face value. I'll take a look next week when I'm back, but @ennru has the best context here.

ennru pushed a commit to nivox/alpakka that referenced this issue Jan 13, 2020
- failed messages are treated independently to avoid loosing track of previously failed ones
- retry counts are kept per message so to guarantee correct number of retries and intervals
- added test case proving fix correctness
@ennru ennru closed this as completed in 993cbae Jan 13, 2020
@ennru ennru added this to the 2.0.0 milestone Jan 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants