Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connector uncorrectly determines the number of partitions #1196

Closed
avlahop opened this issue Oct 2, 2018 · 9 comments

Comments

@avlahop
Copy link

commented Oct 2, 2018

What kind an issue is this?

  • Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
    The easier it is to track down the bug, the faster it is solved.

Issue description

It seems that there is an issue in how Elasticsearch Hadoop's RestService is calculating the slice paritions for a specified, index/shard. In my opinion there are two issues:

  • Number of produced tasks is not correct: There seems to be an issue with the division being executed here We can see the code calculating the number of parititions is using an casting the result of the division to integer. This will cause one partition lower than the expected partitions. E.g:
    If we have 3 shards with 184 items each, and we want 30 documents per partition(setting es.input.max.docs.per.partition equal to 30). Then for each shard the following number of partitions will be calculated:For shard 0:
(int)Math.max(1, 184 / 30) => (int)Max.math(1, 6.1) => (int)(6.1)

so 6. We should expect 7 because 6* 30 = 180 and there are 184 documents in that shard. The same should stand for shard 1 and 2. As a result, we should be expecting 21 partitions, but ES-Hadoop will give us 18.

  • Slices create empty tasks: From the same file we can see here that for each shard, it creates one sliced partition, always starting from zero to the total number of partitions per shard. It also uses the preference query parameter to target each query to the specified shard of the Partition. In the above example this would lead to the following paritions(slices)

For shard0:
(0, 6), (1, 6), (2, 6) (3, 6) (4, 6), (5, 6)

for shard 1:
(0, 6), (1, 6), (2, 6) (3, 6) (4, 6), (5, 6)

and for shard 2:
(0, 6), (1, 6), (2, 6) (3, 6) (4, 6), (5, 6)

According to the scroll documentation here sliced scroll is executed on the shards first, meaning that for the first shard, shard 0, queries only (0, 6) and (3, 6) are going to fetch data. All the other slices will just create empty tasks. The same will go for all the other shards, thus creating 6 tasks with data, and 12 empty tasks. Furthermore instead of 30 documents per task we are now fetching actually 90 docs per partition.

Version Info

Elasticsearch cluster
OS: : Ubuntu
JVM : 1.8.0
Hadoop/Spark: 2.2.1
ES-Hadoop : elasticsearch-spark-20_2.11-6.3.2
ES : 6.3.2

@jimczi

This comment has been minimized.

Copy link
Member

commented Oct 2, 2018

Number of produced tasks is not correct:

We could round to the closest integer but the idea is to get approximatively es.input.max.docs.per.partition documents per slice which is what you get with the current computation so I am not sure what you mean by "is not correct".

Slices create empty tasks:

This is a known issue that we fixed in 6.4 with elastic/elasticsearch#29533. Slices are now computed based on the number of shards that participate in the request so your example would get the same number of documents per slice if you upgrade your cluster to the latest 6.4 version.

@jbaiera

This comment has been minimized.

Copy link
Contributor

commented Oct 2, 2018

We could round to the closest integer but the idea is to get approximatively es.input.max.docs.per.partition documents per slice which is what you get with the current computation so I am not sure what you mean by "is not correct".

Agreed. The max docs setting has always been an approximation - not a guarantee. Its inclusion into ES-Hadoop was to allow for higher levels of parallelism than just the shard level scrolling would allow. Additionally, there were cases where Spark users with particularly large shards in ES could not read using the connector - since Spark has a hard limit of documents that are allowed per partition (they use an integer per document in a split).

All that said, I'm not against making the setting more of a guarantee, though I think there are a decent number of things that we can and should be doing to improve the use of this property - starting with increasing the default value by quite a bit. 100k documents is fine in many cases, but in practice it can cause significant slowdowns when querying very large shards, leading to the default value almost always needing to be tuned higher or disabled all together.

@avlahop

This comment has been minimized.

Copy link
Author

commented Oct 2, 2018

I am sorry, maybe I should have used the word misleading. From the documentation of the es.input.max.docs.per.partition I read:

this parameter advises the connector on what the maximum number of documents per input partition should be.

So I thought that partitions should as many as to not go over the set max limit. Thank you for making it clear to me :)

Either way please proceed with closing since the most important issue for me is solved in v6.4

@jimczi

This comment has been minimized.

Copy link
Member

commented Oct 2, 2018

starting with increasing the default value by quite a bit. 100k documents is fine in many cases, but in practice it can cause significant slowdowns when querying very large shards, leading to the default value almost always needing to be tuned higher or disabled all together.

Maybe we should not set any default here and restore the one partition per shard by default ? I see how the default value can be deceptive, creating a lot of partitions for little benefit on large shards. One partition per shard is more predictible and users can use the extra option if they want to increase parallelism.

@jimczi

This comment has been minimized.

Copy link
Member

commented Oct 2, 2018

So I thought that partitions should as many as to not go over the set max limit.

I agree that the wording in the documentation is ... misleading ;). We should mention that it's a best effort.

@jbaiera

This comment has been minimized.

Copy link
Contributor

commented Oct 2, 2018

Maybe we should not set any default here and restore the one partition per shard by default ? I see how the default value can be deceptive, creating a lot of partitions for little benefit on large shards. One partition per shard is more predictible and users can use the extra option if they want to increase parallelism.

My thinking is the same along these lines, but in the case of using Spark it might make sense to set the default number to the hard limit of records per Spark partition. If the shard that we're examining comes back with a count that is less than this amount, we should probably just skip the slicing all together and read from the shard alone. I think I'll pull together a fix for that soon and put it in for 7.0. I feel a little wary changing the default on a non-major release.

As for the documentation, I'll push a quick change and backport it.

@jbaiera

This comment has been minimized.

Copy link
Contributor

commented Oct 2, 2018

but in the case of using Spark it might make sense to set the default number to the hard limit of records per Spark partition.

After posting this I realized that the Spark hard limit is at 2GB of data, not based on the limitations of an integer, but rather based on the maximum size of byte buffers. In this case, I think it makes more sense to simply have the user set the max docs per partition in the case that they run into issues with Spark.

jbaiera added a commit that referenced this issue Oct 4, 2018
We added support for sliced scrolls back in 5.0, which allows subdividing scrolls into
smaller input splits. There are some cases where the added subdivision of the scroll
operations causes high amounts of overhead when reading very large shards. In most cases,
shards should be small enough that a regular read operation over them should complete
in reasonable time. In order to avoid performance degradation at higher levels, we are
removing the default value of 100k from this setting, and instead, checking if it is
set. Additionally, the 'es.input.use.sliced.partitions' setting has been removed as it
is now redundant.

relates #1196
jbaiera added a commit that referenced this issue Oct 4, 2018
We added support for sliced scrolls back in 5.0, which allows
subdividing scrolls into smaller input splits. there are some
cases where the added subdivision of the scroll operations
causes high amounts of overhead when reading very large
shards. in most cases, shards should be small enough that a
regular read operation over them should complete in
reasonable time. In order to avoid performance degradation at
higher levels, we are removing the default value of 100k from
this setting, and instead, checking if it is set.
Additionally, the 'es.input.use.sliced.partitions' setting
has been removed as it is now redundant.

relates #1196
jbaiera added a commit that referenced this issue Oct 4, 2018
Additionally, deprecate the usage of es.input.use.sliced.partitions since it will be removed in 7.0.0

relates #1196
jbaiera added a commit that referenced this issue Oct 4, 2018
@jbaiera jbaiera added v7.0.0 v6.5.0 doc and removed question labels Oct 4, 2018
@jbaiera

This comment has been minimized.

Copy link
Contributor

commented Oct 4, 2018

I've pushed up some changes to the documentation around the max docs per partition setting. I've also remove the default from the master line (7.0.0) and deprecated the use of a switch property in 6.X (6.5.0).

Thanks for the input all!

@jbaiera jbaiera closed this Oct 4, 2018
@jbaiera jbaiera added the breaking label Oct 4, 2018
@jimczi

This comment has been minimized.

Copy link
Member

commented Oct 4, 2018

Thanks @jbaiera !

@jbaiera jbaiera added the deprecation label Nov 5, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.