Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

heavy load from spark to ES errors #706

Closed
vgkowski opened this issue Feb 24, 2016 · 3 comments

Comments

@vgkowski
Copy link

commented Feb 24, 2016

Hi,
I have a small configuration with 4 mesos slaves running Spark and ES and I am trying to write lots of documents regarding the size of the cluster (500 000 000 docs) with saveToEsWithMeta because i want to deduplicate spark retries.
The problem is that I encounter network errors near 80 000 000 docs threshold in the same index. No problem if I write one index per day (50 000 000 docs and 12GB size).
My configuration:

  • spark in fine grained with 24 partition. Each worker has 8 GB of RAM (4 workers).
  • ES 2.1.1 on 4 nodes with default settings except bulk pool up to 100
  • spark ES connector : es.batch.size.bytes=10MB, es.batch.size.entries=20000

The errors

org.apache.spark.util.TaskCompletionListenerException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[xxxxx, xxxxxxx, xxxxxx:9200, xxxxxxxx:9200]] 
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)


rg.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[xxxxxxxxx:9200, xxxxxxxxxx:9200, xxxxxxxxxxxxx:9200, xxxxxxxxxxxxxxxx:9200]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:423)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:415)
    at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:145)
    at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:225)
    at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:248)
    at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:187)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:163)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:49)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEsWithMeta$1.apply(EsSpark.scala:86)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEsWithMeta$1.apply(EsSpark.scala:86)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
@vgkowski

This comment has been minimized.

Copy link
Author

commented Feb 24, 2016

I have done the same test with saveToEs method and I am currently loading 360 000 000 without any issue. What's the problem with saveToEsWithMeta, I assume ES needs to lookup for the doc id when upserting and it's costly.
Is there any other solution to get idem potency with ES bulk writes ?

@costin

This comment has been minimized.

Copy link
Member

commented Mar 22, 2016

If I understand correctly, you have 24 partitions across 4 workers which means around 6 tasks per worker.
ES has 4 nodes, unclear whether they are on separate machines or not.
You are using the connector with a bulk size of 10MB/20K docs.

The issue is that in case of updates (which are basically two operations - delete and index) performance suffers after 80M documents while pure indexing works fine (360M and working).

Since I don't know what hardware you have allocated to ES, I can infer the following:

  • there are around 6 tasks per ES node. Since each task is writing 10MB/20K at once that means 60MB/120K docs at once per node. In case of updates, as this goes across the index it means 240MB/480K at once.

Basically you are overloading the cluster which eventually gives up. Again, an update is more expensive then just a simple index and favours merging which likely is starting around the 50-60M docs and starts affecting performance seriously at the 80M mark.

You have various options here (see the performance) page for more info.

  1. Balance things out:
    a. allocate more hardware to ES (and use SSDs).
    b. minimize the number of concurrent tasks in Spark
  2. Use smaller bulks
    Always look at the maximum amount of data going to ES at once. Ideally a bulk request should be processed in maximum 1-2s. Revert the bulk request to the default, go back to your setup, with the amount of workers that you want and only then, start increasing the bulk size while keeping an eye on how the performance of bulk processing deteriorates.
  3. Monitor, monitor, monitor
    Keep an eye on your cluster to see how it is behaving. If you are doing a big bulk at once you can consider turning off replication and refresh and enabling them back after the ingest is done.
    And of course, splitting the index in smaller pieces (shards, per day/per hours) helps.

Note the above are typical ES / distributed systems advices.

Is there any other solution to get idem potency with ES bulk writes ?

Not sure why you are referring to idempotence. The issue in your case is not the outcome but rather performance.

Last but not least, this is not a bug - ES is pushing back and eventually the job fails since it takes too long to process requests. If you need more help, please use the forums.

Thanks

@vgkowski

This comment has been minimized.

Copy link
Author

commented Mar 22, 2016

I will use forum now... thanks for the answer, it's more clear now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.