Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to create IndexPartition based on the desired number of documents per split #812

Closed
wants to merge 3 commits into from

Conversation

jimczi
Copy link
Contributor

@jimczi jimczi commented Jul 21, 2016

This pretty big PR (sorry for the size) changes how the splits are created from an elasticsearch query.
For cluster running with version prior to v5.x:

  • We create one partition for each shard of each requested index.
  • If an alias is requested the search routing and the alias filter are respected.
  • The partition is no longer attached to a node nor an ip. Only the shardId and index name are defined in order to be able to use any replica in the cluster when the partition is consumed. This makes the retry possible if a node disapears during a job.
  • The ability to consume a partition on the node that is responsible for the index/shardId has been removed temporarily and should be re-added in a follow up.

For cluster ruuning with version v5.x:

  • We first split by index then by shard and finally by the maximum number of documents allowed per partition (configurable through the new option named es.input.maxdocsperpartition. For instance an index with 5 shards, 1M documents and a maximum number of documents allowed per partition equals to 100,000, a match all query would be splitted in 50 partitions, 10 partitions per shard.
  • If an alias is requested the search routing and the alias filter are respected.

Fixes #778

@jimczi
Copy link
Contributor Author

jimczi commented Jul 21, 2016

@costin @jbaiera, this PR introduces the SlicePartitionDefinition. I had to refactor (cleanup) some parts to make it work. This is still a work in progress and I need to add more tests. In the meanwhile it would be very helpful if you can check the progress. Considering the size of the PR I am available at your convenience for a deep dive.

@@ -410,7 +408,7 @@ public void refresh(Resource resource) {
sb.setLength(sb.length() - 1);
sb.append("\n]}");

if (!isES20) {
if (esVersion().equals(EsVersion.V_2_X)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be !esVersion().equals(EsVersion.V_2_X)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, This should just be if the version is equal to V_1_X. This line is balancing the curly braces.

@jbaiera
Copy link
Member

jbaiera commented Jul 22, 2016

This is a very large PR. I'll be leaving comments as I find items.


return version.startsWith("2.");
}

/**
* Whether the settings indicate a ES 5.0.x (which introduces breaking changes) or otherwise.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to remove this settings check as well in favor of the new route? I'm assuming this is an item that is still in progress...

@costin
Copy link
Member

costin commented Jul 25, 2016

Jim, thanks again for starting to work on this.
As you mentioned, this is quite a big PR which makes merging and reviewing tricky. Breaking it down into several smaller pieces would definitely help.

A couple of things from the top of my head:

A. Regarding the supported ES versions - currently ES-Hadoop supports 1.x, 2.x and master adds support for 5.x. Going forward, remains to be decided whether support for 1.x will be kept - either way I would not bother with 0.X

B. Also things like mapping are resolved eagerly to avoid having each client/task do that - basically for N tasks, currently there's only one mapping fetch (which also does some validation); if it's lazy it translates to N mapping calls.

Fwiw, I favor keeping the new slice feature in a separate package and try to keep the old code around to make the transition and code porting between 2.x and 5.x branches easy.

@jimczi jimczi force-pushed the sliced_scroll branch 2 times, most recently from dbd09ce to 2000457 Compare July 25, 2016 12:19
@jimczi
Copy link
Contributor Author

jimczi commented Jul 25, 2016

Thanks @costin and @jbaiera for the first round of review.

As you mentioned, this is quite a big PR which makes merging and reviewing tricky. Breaking it down into several smaller pieces would definitely help.

I splitted the big commit in smaller pieces. I've intentionally kept the error spotted by your review in the original commits. The commits that address your comments are last. I hope this can help the review process.

Also things like mapping are resolved eagerly to avoid having each client/task do that - basically for N tasks, currently there's only one mapping fetch (which also does some validation); if it's lazy it translates to N mapping calls.

I don’t see it that way, IMO keeping the mapping (and the config) in each PartitionDefinition is worst than having to retrieve it lazily when the task is initialized. The mapping (and the config) can be big and it's even worst with the base64 conversion, furthermore if the mapping is resolved when the partitions are created there is a high probability that a mapping update happens between the initialization of the job and the execution of the task.

Fwiw, I favor keeping the new slice feature in a separate package and try to keep the old code around to make the transition and code porting between 2.x and 5.x branches easy.

There should be no code porting between 2.x and 5.x. There is no breaking change for the available clients in the repo. The changes are internal, the only thing that client should change is the new configuration option to set the expected number of documents per task.
The changes on this PR are not all about the slice feature. For cluster that runs on versions prior to 5 the removal of the ShardSorter (which is possible now that we check for filter and routing aliases) and the fact that each partition is assigned to an index shard (and not a node and a shard id) is already a big win. Not to mention that the removal of the mapping and the config in each partition definition should have a positive impact on the required memory to run a big job.

Map<String, String> version = get("", "version");
if (version == null || !StringUtils.hasText(version.get("number"))) {
return "Unknown";
public synchronized EsVersion esVersion() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be reverted back to being retrieved from the configuration at construction time. This method is called many times in the client and the synchronized nature of it is not great for performance. There really shouldn't be a scenario where the value of the version will be different between tasks, so it's reasonable for the version information to be stored once for the entire job.

@jimczi
Copy link
Contributor Author

jimczi commented Jul 25, 2016

This is removing the ability for Hadoop to attempt to schedule days local tasks. I'm not sure if this is a big deal though since most people run elasticsearch on separate hardware from their running tasks.

Right, we discussed this with @costin and he said approximatively the same so I removed it.
If it's important we could check the list of nodes that could answer the query and pick the local one if exists. But yeah not sure it's really helpful and could be counterproductive.
I think that the nice part of this is that a task can run on any node now which makes retries of the same task possible even if some nodes are not available on the cluster.

@jimczi
Copy link
Contributor Author

jimczi commented Jul 25, 2016

@jbaiera about the Elasticsearch version discovery in the RestClient I pushed f101370

This change retrieves an internal version from the settings and use it to build the requests.
Since the settings are not part of the PartitionDefinition anymore the elasticsearch version is retrieved and added in the settings each time a Partition{Reader/Writer} is built.
I think it's a good compromise, we don't have the cost of the duplicate serialized settings in each partition but we have to make a light request once per task/split to get the version.

@jbaiera
Copy link
Member

jbaiera commented Jul 25, 2016

Re: Data Locality - If we're all +1 on removing support for the locality features then the docs should probably be updated to remove the mentions to data locality as well.

This commits changes how we split the query in multiple partitions.
For cluster running with version prior to v5.x:
* We create one partition for each shard of each requested index.
* If an alias is requested the search routing and the alias filter are respected.
* The partition is no longer attached to a node nor an ip. Only the shardId and index name are defined in order to be able to use any replica in the cluster when the partition is consumed. This makes the retry possible if a node disapears during a job.
* The ability to consume a partition on the node that is responsible for the index/shardId has been removed temporarily and should be re-added in a follow up.

For cluster ruuning with version v5.x:
* We first split by index then by shard and finally by the maximum number of documents allowed per partition (configurable through the new option named es.input.maxdocsperpartition. For instance an index with 5 shards, 1M documents and a maximum number of documents allowed per partition equals to 100,000, a match all query would be splitted in 50 partitions, 10 partitions per shard.
* If an alias is requested the search routing and the alias filter are respected.

Fixes elastic#778
@jimczi
Copy link
Contributor Author

jimczi commented Jul 26, 2016

@jbaiera @costin I've updated the description of the PR with the latest status.
It seems that the integration tests are all green but we'll need to do some backward compatibility tests at least with elasticsearch v2.x.
I did not restore the data locality yet. It's trickier than I thought so I wonder if it would be ok to merge this PR without this functionality for alpha5 and make it a blocker for 5.0 ?

public RawQueryBuilder(Map<String, Object> map) throws IOException {
Object query = map;
if (map.containsKey("query")) {
query = map.remove("query");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line right here breaks backwards compatibility with ES v1.7.3 for SparkSQL. We have code that hooks into Spark that allows us to make sense of some of the SQL operations being executed and translate those operations into pushdown filters and queries.

In the case of running against ES 1.7.3, the framework will detect the version and will create a query context with the pushdown query underneath it to allow compatibility with the filter context. Because all the filters are passed through the parsing code listed above before building the final query, the query context marker is removed from the JSON and the query ceases to compile from within the filter context.

Basically, the filter fragment goes from being this:

{ "query": { "match" : { "..." : "..." } } }

to this:

{ "match": { "..." : "..." } }

which is then inserted into a filter context and subsequently comes back from the server with the following error:

org.elasticsearch.search.SearchParseException: [defaultstrictspark-test][2]: from[-1],size[-1]: Parse Failure [Failed to parse source [{"query":{"filtered":{"query":{"match_all":{}},"filter":{"match":{"airport":"OTP"}}}}}]]
    at org.elasticsearch.search.SearchService.parseSource(SearchService.java:747)
    <...>
Caused by: org.elasticsearch.index.query.QueryParsingException: [defaultstrictspark-test] No filter registered for [match]

This probably isn't a huge blocker for the alpha release, but it is a big deal if we want to continue the support for 1.x.

@jbaiera
Copy link
Member

jbaiera commented Jul 27, 2016

Local integration tests are looking good for 5.x and 2.x Elasticsearch versions. Ran into some snags with 1.x surrounding SparkSQL that I have commented on above.

For elasticsearch versions prior to 5.x the raw query builder should be able to parse a query that wraps a filter.
Since we use the RawQueryBuilder to parse root queries as well we need to differenciate the parsing of a query and the parsing of a filter.
For the former we need to remove the root "query" object if it exists and for the latter we need to keep it because it only means that the filter is wrapped in a query.
This change also adds javadoc for the query builders.
@jimczi
Copy link
Contributor Author

jimczi commented Jul 28, 2016

Good catch @jbaiera. I pushed a fix to handle 1.x query DSL.

Regarding the data locality I don't think that it works as expected in the current version (prior to this PR). The preference API is used to restrict the query to the node where the targeted shard is present but the rest client will use a random node to send the query which ultimately will be redirected to the targeted node. IMO this extra round trip makes the data locality useless. The task can run on the same machine than the node which hosts the shard but since we use a random node to do the coordination there is no advantage of doing this.

@jimczi
Copy link
Contributor Author

jimczi commented Jul 28, 2016

Just discoverer that SettingsUtils.pinNode is supposed to restrict the client to a node. This means that my last comment about data locality is wrong ;).
I'll continue to dig on how we could have the data locality back without breaking the fact that a partition can now run on any replica of the shard.

Handle the new format for NodeInfo with elasticsearch 5.x:
* http_address has been removed
* the list of roles is declared in a dedicated section named 'roles' (instead of parameters)

This change fixes the discovery of the nodes when elasticsearch 5.x is used. It also fixes the client/data only mode where only allowed nodes are requested.
@jimczi
Copy link
Contributor Author

jimczi commented Jul 28, 2016

I pushed a fix for the NodeInfo parsing bug:
e6f046b

This fixes the discovery and the selection of nodes based on some criteria (isClientOnly, isDataOnly, ...). Please note that the bug is not due to this PR, the reason is that the format of the node info changed in 5.x.
@costin can you take a look at this patch ?

@jbaiera
Copy link
Member

jbaiera commented Jul 28, 2016

re: Node Info - LGTM
Thanks for taking a look at the 1.x compatibility issues as well.

@jbaiera
Copy link
Member

jbaiera commented Jul 28, 2016

I have performed the manual merge steps to get this PR into master. The commits on master can be found at 77f2564 (main pr) and 2e53b63 (node info fix). Thanks so much @jimferenczi!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants