-
Notifications
You must be signed in to change notification settings - Fork 997
Fix spark sql queries with metadata field #930
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3e6db02
to
f202946
Compare
@drudim Sorry for the very delayed reviews. I'm taking a look through this right now and it looks like it's written against 5.1. Could you rebase these changes on top of 5.x? There are a few changes at the moment upstream that have to do with source field filtering that you may want to get pulled into this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments on the code. I think you're headed in a very good direction. I'd like to hear about what kind of testing problems you ran into as well to see if I can help out.
static final String DATA_SOURCE_KEEP_HANDLED_FILTERS = "es.internal.spark.sql.pushdown.keep.handled.filters"; | ||
|
||
// columns selected by Spark SQL query | ||
static final String DATA_SOURCE_REQUIRED_COLUMNS = "es.internal.spark.sql.required.columns"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we change this to a different location? Like InternalConfigurationOptions
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure about this? It looks like all "es.internal.spark." options are located here. I feel like the set of columns selected by spark sql fits into "es.internal.spark." group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this comment was my mistake originally. It's fine in this location.
} | ||
|
||
val requiredFields = settings.getProperty(Utils.DATA_SOURCE_REQUIRED_COLUMNS) | ||
val scrollFields = settings.getScrollFields() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic has changed a little bit since the PR was opened. I'd recommend rebasing the whole PR onto the most recent 5.x branch.
// By default, SearchRequestBuilder includes all fields, if INTERNAL_ES_TARGET_FIELDS is empty. | ||
// To prevent us from querying useless data, we set _id field, | ||
// which isn't a part of _source, but is allowed by SearchRequestBuilder | ||
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS -> "_id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I like relying on this hack. Perhaps we could find a way to signal that we don't want to read the _source
field at all by sending _source=false
with the request. That way we can avoid depending on obscure and potentially easy-to-break functionality in Elasticsearch.
@drudim Are you still interested in moving forward with the PR? |
@jbaiera sure! I had a vacation, just got back. I am going to take look on it this weekend. |
The PR solves `scala.MatchError` in Spark SQL queries with `_metadata` field inside. More details on the issue: elastic#924 I was trying to update integration tests with that case, but I am getting failed tests even for the current code in branch `5.1`.
f202946
to
b244d9f
Compare
ce61988
to
cd96841
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jbaiera ready for review, the next things were changed:
- rebased the branch on
5.x
to usedetermineSourceFields
instead ofgetScrollFields
- introduced
es.internal.exclude.source
option to handle queries with empty_source
field
Let me know what do you think about this logic around excludeSource
. We avoided the hack with _id
, but introduced new parts which I am not confident in (from the design perspective).
static final String DATA_SOURCE_KEEP_HANDLED_FILTERS = "es.internal.spark.sql.pushdown.keep.handled.filters"; | ||
|
||
// columns selected by Spark SQL query | ||
static final String DATA_SOURCE_REQUIRED_COLUMNS = "es.internal.spark.sql.required.columns"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure about this? It looks like all "es.internal.spark." options are located here. I feel like the set of columns selected by spark sql fits into "es.internal.spark." group.
cd96841
to
2cd8aaf
Compare
I tried to run:
and got:
And nothing interesting inside of |
When the build executes the test failures are logged to files in the build directory. You'll need to grab the error logs from there. |
Found the reason why |
I actually just got bit by that same exact problem this morning. The problem here is that we import Hadoop 1.2.1 for tests, which is a problem for Spark 2.0 in some cases. We have a shim implemented for the ShutdownHookManager in the |
That's cool, will try to extend tests with my case later today. Let me know if you are ok with the new option to exclude |
@drudim Will do, I'll take a look at your changes and comments today. |
@jbaiera added the test for |
@jbaiera done:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. I left a few more comments about simplifying some code and a couple sanity assertions. This is looking very good.
} | ||
|
||
public SearchRequestBuilder excludeSource(boolean value) { | ||
this.excludeSource = value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add an assertion here that fields cannot be set if excludeSource is set, and vice-versa?
static final String DATA_SOURCE_KEEP_HANDLED_FILTERS = "es.internal.spark.sql.pushdown.keep.handled.filters"; | ||
|
||
// columns selected by Spark SQL query | ||
static final String DATA_SOURCE_REQUIRED_COLUMNS = "es.internal.spark.sql.required.columns"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this comment was my mistake originally. It's fine in this location.
|
||
// In case when we read all fields without metadata | ||
else if (StringUtils.hasText(sourceFields)) | ||
rowInfo._1.setProperty(ROOT_LEVEL_NAME, sourceFields) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curly braces please
|
||
// In case when we read all fields including metadata | ||
else if (StringUtils.hasText(sourceFields) && settings.getReadMetadata) | ||
rowInfo._1.setProperty(ROOT_LEVEL_NAME, sourceFields + StringUtils.DEFAULT_DELIMITER + settings.getReadMetadataField) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is a little confusing to me: Is there ever a point in time where any of the else
statements will be executed? I believe that with the current code requiredFields
will ALWAYS be set with text, and if it is not, then sourceFields
will also be empty. I think this could be simplified to just be:
val requiredFields = // get required fields
if (StringUtils.hasText(requiredFields)) {
// set the rowInfo._1 properties
}
In the current state it makes me wonder if when we get to this code there is a chance that something prior in the connector wasn't set up entirely, but I don't see where it wouldn't be....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right, one condition is enough. My original assumption was:
if (StringUtils.hasText(requiredFields)) {
// for queries like dataFrame.select("specific", "fields").take(1)
} else if (StringUtils.hasText(sourceFields) && settings.getReadMetadata) {
// for queries like dataFrame.take(1) when "es.read.metadata" is set to true
} else if (StringUtils.hasText(sourceFields) {
// for queries like dataFrame.take(1) when "es.read.metadata" is set to false
}
But it isn't the case, because of the way how we set requiredFields
.
|
||
// In case when we read all fields without metadata | ||
else if (StringUtils.hasText(sourceFields)) | ||
rowInfo._1.setProperty(ROOT_LEVEL_NAME, sourceFields) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curly braces please
rowInfo._1.setProperty(ROOT_LEVEL_NAME, requiredFields) | ||
|
||
// In case when we read all fields including metadata | ||
else if (StringUtils.hasText(sourceFields) && settings.getReadMetadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same confusion around simplifying the conditions here as the sql-13 package
@jbaiera, ready for review:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks so much @drudim for the contribution! I'll merge this and forward port it to master.
The PR solves `scala.MatchError` in Spark SQL queries with `_metadata` field inside. When Spark SQL is reading a metadata field, the data source removes the metadata field from the given fields and replaces it at the end. This is because the metadata field is an abstract field that is provided by the Scroll Reader instead of by explicitly asking for it. In removing the field, the order of fields in the projection returned from the Scroll reader does not match the order of fields within the Spark execution plan, thus a match error is thrown. This ordering of fields is now protected with this PR.
The PR solves `scala.MatchError` in Spark SQL queries with `_metadata` field inside. When Spark SQL is reading a metadata field, the data source removes the metadata field from the given fields and replaces it at the end. This is because the metadata field is an abstract field that is provided by the Scroll Reader instead of by explicitly asking for it. In removing the field, the order of fields in the projection returned from the Scroll reader does not match the order of fields within the Spark execution plan, thus a match error is thrown. This ordering of fields is now protected with this PR.
The PR solves
scala.MatchError
in Spark SQL queries with_metadata
field inside.More details on the issue: #924
I was trying to update integration tests with that case,
but I am getting failed tests even for the current code in branch
5.1
.[x] I have signed the Contributor License Agreement (CLA)