New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33739] [SQL] Jobs committed through the S3A Magic committer don't track bytes #30714
[SPARK-33739] [SQL] Jobs committed through the S3A Magic committer don't track bytes #30714
Conversation
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #132588 has finished for PR 30714 at commit
|
…n't track bytes BasicWriteStatsTracker to probe for a custom Xattr if the size of the generated file is 0 bytes; if found and parseable use that as the declared length of the output. The matching Hadoop patch HADOOP-17414. * Returns all S3 object headers as XAttr attributes prefixed "header." * Sets the custom header x-hadoop-s3a-magic-data-length to the length of the data in the marker file. As a result, spark job tracking will correctly report the amount of data uploaded and yet to materialize. Testing New tests in BasicWriteTaskStatsTrackerSuite which use a filter FS to implement getXAttr on top of LocalFS; this is used to explore the set of options: * no XAttr API implementation (existing tests; what callers would see with most filesystems) * no attribute found (HDFS, ABFS without the attribute) * invalid data of different forms All of these return Some(0) as file length. External downstream testing has done the full hadoop+spark end to end operation, with manual review of logs to verify that the data was succesfully collected from the attribute. Change-Id: I1fd0b9ac2eba1c8c27cbd776739e693a57d38fc3
132a2e9
to
c680138
Compare
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #134371 has finished for PR 30714 at commit
|
Thank you for updating. I'm looking forward to using Hadoop 3.3.1! |
ok, Hadoop-side PR is in trunk; verifying backport compiles and new tests work then it'll be in branch-3.3. This patch is ready for final review/merge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks making sense to me
cc @tgravescs @mridulm FYI |
val attr = fs.getXAttr(path, BasicWriteJobStatsTracker.FILE_LENGTH_XATTR) | ||
if (attr != null && attr.nonEmpty) { | ||
val str = new String(attr, StandardCharsets.UTF_8) | ||
logInfo(s"File Length statistics for $path retrieved from XAttr: $str") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want this info or just debug? seems like I would only care if stats didn't come out but maybe its more useful...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can put to debug.
} catch { | ||
case e: NumberFormatException => | ||
// warn but don't dump the whole stack | ||
logInfo(s"Failed to parse" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want this warn instead or perhaps just extend saying file stats may be wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I'd say at info as warn seems overkill for progress report issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks fine to me, do we need to document certain version of Hadoop this works with and that they will be wrong with?
I'll update the docs. Also going to cut back on the warnings now that AWS S3 is consistent (And all third party ones were consistent out the box too). This is a good thing for apps like spark (much easier to do workflows across applications), but you still can't safely use the rename committer there as dir rename is non-atomic. |
Change-Id: Id02711b83f3159d6c68bbad2cb74b303b277bb65
Change-Id: Iddcc8fd9af8f89d4a0429e077c09c03ada2fed72
@@ -49,7 +49,6 @@ They cannot be used as a direct replacement for a cluster filesystem such as HDF | |||
|
|||
Key differences are: | |||
|
|||
* Changes to stored objects may not be immediately visible, both in directory listings and actual data access. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we remove line 60 together?
- The output of work may not be immediately visible to a follow-on query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry about openstack swift. The original version was woefully inconsistent. Not been near it long enough to know what's change. with swift I could consistently replicate an inconsistency in a few lines
- write a file of length, say, 1KB
- overwrite with a shorter dataset, e.g. 512 bytes
- open file
- read from 0-128: get the new data
- read from 768-1024: get the old data
No S3 implementation that I know of (i.e. the open source or commercial ones) are inconsistent, nor have they ever had the 404 caching issue. But people should still be aware of the risk.
oh, and I haven' t played with any of the chinese cloud stores for which connectors now exist. So I can't make statements there. All I can say is the "big three outside china" are consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line deletion is based on the fact S3 is now strong consistency, but that also means we only consider these three vendors. (Probably you're considering more like S3 compatible implementations, but you've also mentioned you don't consider the chinese cloud stores so it can't be exhaustive.) Why not explicitly saying it and update the description tied to these vendors? We would never be able to consider all possible implementations and for some minority the description may be wrong. Let's just make it clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, what about leaving this line as it is (so that the description enumerates all "possible" issues on object store considering beyond the big three), and elaborate which key differences are no longer in effect with strong consistency in consistency section?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Probably you're considering more like S3 compatible implementations, but you've also mentioned you don't consider the chinese cloud stores so it can't be exhaustive.)
I don't test them; I don't know their consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reviewed the docs, huawei cos and tencent obs both seem consistent. swift is now the outlier. Moving the issue into a paragraph in the "consistency" section
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steveloughran Thanks for the update!
@dongjoon-hyun Could you please check whether the comments and new change make sense to you? Thanks in advance!
Change-Id: Ib2b379f6c456b4bec75db43d0f2290f1ea19b975
I've looked at the stores for which hadoop-trunk has connectors, looks like only openstack swift is inconsistent. Moved the details on consistency down, called out swift and said "consult the docs". After all, alternative implementations of the swift API (I'm thinking IBM's work) probably is consistent. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #135156 has finished for PR 30714 at commit
|
The doc change looks good to me, but I don't feel qualified to review the code. As @tgravescs already reviewed and approved, it sounds more natural if he can make "sign-off". @tgravescs Kindly reminder. Thanks! |
docs/cloud-integration.md
Outdated
|
||
``` | ||
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this repetition (line 194 ~ 196) because we already have this at the beginning of this section (line 149)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
logDebug(s"XAttr not supported on path $path", e); | ||
case e: Exception => | ||
// Something else. Log at debug and continue. | ||
logDebug(s"Xattr processing failure on $path", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Xattr
-> XAttr
?
/** | ||
* Does a length specified in the XAttr header get picked up? | ||
*/ | ||
test("Xattr sourced length") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Xattr
-> XAttr
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM, too.
Change-Id: Ied49268d0b68cd7603a3a52d174d640469338760
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #135196 has finished for PR 30714 at commit
|
@tgravescs can you hit the merge button before some other change breaks this PR? thanks |
merged to master, thanks @steveloughran @dongjoon-hyun @HeartSaVioR |
thanks! |
I was here too! 😃 |
BasicWriteStatsTracker to probe for a custom Xattr if the size of
the generated file is 0 bytes; if found and parseable use that as
the declared length of the output.
The matching Hadoop patch in HADOOP-17414:
the data in the marker file.
As a result, spark job tracking will correctly report the amount of data uploaded
and yet to materialize.
Why are the changes needed?
Now that S3 is consistent, it's a lot easier to use the S3A "magic" committer
which redirects a file written to
dest/__magic/job_0011/task_1245/__base/year=2020/output.avro
to its final destination
dest/year=2020/output.avro
, adding a zero byte marker file atthe end and a json file
dest/__magic/job_0011/task_1245/__base/year=2020/output.avro.pending
containing all the information for the job committer to complete the upload.
But: the write tracker statictics don't show progress as they measure the length of the
created file, find the marker file and report 0 bytes.
By probing for a specific HTTP header in the marker file and parsing that if
retrieved, the real progress can be reported.
There's a matching change in Hadoop apache/hadoop#2530
which adds getXAttr API support to the S3A connector and returns the headers; the magic
committer adds the relevant attributes.
If the FS being probed doesn't support the XAttr API, the header is missing
or the value not a positive long then the size of 0 is returned.
Does this PR introduce any user-facing change?
No
How was this patch tested?
New tests in BasicWriteTaskStatsTrackerSuite which use a filter FS to
implement getXAttr on top of LocalFS; this is used to explore the set of
options:
most filesystems)
All of these return Some(0) as file length.
The Hadoop PR verifies XAttr implementation in S3A and that
the commit protocol attaches the header to the files.
External downstream testing has done the full hadoop+spark end
to end operation, with manual review of logs to verify that the
data was successfully collected from the attribute.