Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33739] [SQL] Jobs committed through the S3A Magic committer don't track bytes #30714

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 36 additions & 7 deletions docs/cloud-integration.md
Expand Up @@ -49,7 +49,6 @@ They cannot be used as a direct replacement for a cluster filesystem such as HDF

Key differences are:

* Changes to stored objects may not be immediately visible, both in directory listings and actual data access.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we remove line 60 together?

  1. The output of work may not be immediately visible to a follow-on query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry about openstack swift. The original version was woefully inconsistent. Not been near it long enough to know what's change. with swift I could consistently replicate an inconsistency in a few lines

  1. write a file of length, say, 1KB
  2. overwrite with a shorter dataset, e.g. 512 bytes
  3. open file
  4. read from 0-128: get the new data
  5. read from 768-1024: get the old data

No S3 implementation that I know of (i.e. the open source or commercial ones) are inconsistent, nor have they ever had the 404 caching issue. But people should still be aware of the risk.

oh, and I haven' t played with any of the chinese cloud stores for which connectors now exist. So I can't make statements there. All I can say is the "big three outside china" are consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line deletion is based on the fact S3 is now strong consistency, but that also means we only consider these three vendors. (Probably you're considering more like S3 compatible implementations, but you've also mentioned you don't consider the chinese cloud stores so it can't be exhaustive.) Why not explicitly saying it and update the description tied to these vendors? We would never be able to consider all possible implementations and for some minority the description may be wrong. Let's just make it clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, what about leaving this line as it is (so that the description enumerates all "possible" issues on object store considering beyond the big three), and elaborate which key differences are no longer in effect with strong consistency in consistency section?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Probably you're considering more like S3 compatible implementations, but you've also mentioned you don't consider the chinese cloud stores so it can't be exhaustive.)

I don't test them; I don't know their consistency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reviewed the docs, huawei cos and tencent obs both seem consistent. swift is now the outlier. Moving the issue into a paragraph in the "consistency" section

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steveloughran Thanks for the update!
@dongjoon-hyun Could you please check whether the comments and new change make sense to you? Thanks in advance!

* The means by which directories are emulated may make working with them slow.
* Rename operations may be very slow and, on failure, leave the store in an unknown state.
* Seeking within a file may require new HTTP calls, hurting performance.
Expand All @@ -58,16 +57,35 @@ How does this affect Spark?

1. Reading and writing data can be significantly slower than working with a normal filesystem.
1. Some directory structures may be very inefficient to scan during query split calculation.
1. The output of work may not be immediately visible to a follow-on query.
1. The rename-based algorithm by which Spark normally commits work when saving an RDD, DataFrame or Dataset
is potentially both slow and unreliable.

For these reasons, it is not always safe to use an object store as a direct destination of queries, or as
an intermediate store in a chain of queries. Consult the documentation of the object store and its
connector to determine which uses are considered safe.

In particular: *without some form of consistency layer, Amazon S3 cannot
be safely used as the direct destination of work with the normal rename-based committer.*
### Consistency

As of 2021, the object stores of Amazon (S3), Google Cloud (GCS) and Microsoft (Azure Storage, ADLS Gen1, ADLS Gen2) are all *consistent*.

This means that as soon as a file is written/updated it can be listed, viewed and opened by other processes
-and the latest version will be retrieved. This was a known issue with AWS S3, especially with 404 caching
of HEAD requests made before an object was created.

Even so: none of the store connectors provide any guarantees as to how their clients cope with objects
which are overwritten while a stream is reading them. Do not assume that the old file can be safely
read, nor that there is any bounded time period for changes to become visible -or indeed, that
the clients will not simply fail if a file being read is overwritten.

For this reason: avoid overwriting files where it is known/likely that other clients
will be actively reading them.

Other object stores are *inconsistent*

This includes [OpenStack Swift](https://docs.openstack.org/swift/latest/).

Such stores are not always safe to use as a destination of work -consult
each store's specific documentation.

### Installation

Expand Down Expand Up @@ -163,10 +181,15 @@ different stores and connectors when renaming directories:
| Amazon S3 | s3a | Unsafe | O(data) |
| Azure Storage | wasb | Safe | O(files) |
| Azure Datalake Gen 2 | abfs | Safe | O(1) |
| Google Cloud Storage | gs | Safe | O(1) |
| Google Cloud Storage | gs | Mixed | O(files) |

As storing temporary files can run up charges; delete
1. As storing temporary files can run up charges; delete
directories called `"_temporary"` on a regular basis.
1. For AWS S3, set a limit on how long multipart uploads can remain outstanding.
This avoids incurring bills from incompleted uploads.
1. For Google cloud, directory rename is file-by-file. Consider using the v2 committer
and only write code which generates idemportent output -including filenames,
as it is *no more unsafe* than the v1 committer, and faster.

### Parquet I/O Settings

Expand Down Expand Up @@ -245,17 +268,23 @@ mydataframe.write.format("parquet").save("s3a://bucket/destination")

More details on these committers can be found in the latest Hadoop documentation.

Note: depending upon the committer used, in-progress statistics may be
under-reported with Hadoop versions before 3.3.1.

## Further Reading

Here is the documentation on the standard connectors both from Apache and the cloud providers.

* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html).
* [Azure Blob Storage and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
* [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html).
* [Amazon S3 Strong Consistency](https://aws.amazon.com/s3/consistency/)
* [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html).
* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current2/hadoop-aws/tools/hadoop-aws/index.html).
* [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon.
* [Using the EMRFS S3-optimized Committer](https://docs.amazonaws.cn/en_us/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html)
* [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage). From Google.
* [The Azure Blob Filesystem driver (ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver)
* IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator), [IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM.
* IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator),
[IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM.

Expand Up @@ -18,11 +18,12 @@
package org.apache.spark.sql.execution.datasources

import java.io.FileNotFoundException
import java.nio.charset.StandardCharsets

import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -66,14 +67,66 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
private def getFileSize(filePath: String): Option[Long] = {
val path = new Path(filePath)
val fs = path.getFileSystem(hadoopConf)
getFileSize(fs, path)
}

/**
* Get the size of the file expected to have been written by a worker.
* This supports the XAttr in HADOOP-17414 when the "magic committer" adds
* a custom HTTP header to the a zero byte marker.
* If the output file as returned by getFileStatus > 0 then the length if
* returned. For zero-byte files, the (optional) Hadoop FS API getXAttr() is
* invoked. If a parseable, non-negative length can be retrieved, this
* is returned instead of the length.
* @return the file size or None if the file was not found.
*/
private [datasources] def getFileSize(fs: FileSystem, path: Path): Option[Long] = {
// the normal file status probe.
try {
Some(fs.getFileStatus(path).getLen())
val len = fs.getFileStatus(path).getLen
if (len > 0) {
return Some(len)
}
} catch {
case e: FileNotFoundException =>
// may arise against eventually consistent object stores
// may arise against eventually consistent object stores.
logDebug(s"File $path is not yet visible", e)
None
return None
}

// Output File Size is 0. Look to see if it has an attribute
// declaring a future-file-length.
// Failure of API call, parsing, invalid value all return the
// 0 byte length.

var len = 0L
try {
val attr = fs.getXAttr(path, BasicWriteJobStatsTracker.FILE_LENGTH_XATTR)
if (attr != null && attr.nonEmpty) {
val str = new String(attr, StandardCharsets.UTF_8)
logDebug(s"File Length statistics for $path retrieved from XAttr: $str")
// a non-empty header was found. parse to a long via the java class
val l = java.lang.Long.parseLong(str)
if (l > 0) {
len = l
} else {
logDebug("Ignoring negative value in XAttr file length")
}
}
} catch {
case e: NumberFormatException =>
// warn but don't dump the whole stack
logInfo(s"Failed to parse" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want this warn instead or perhaps just extend saying file stats may be wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'd say at info as warn seems overkill for progress report issues.

s" ${BasicWriteJobStatsTracker.FILE_LENGTH_XATTR}:$e;" +
s" bytes written may be under-reported");
case e: UnsupportedOperationException =>
// this is not unusual; ignore
logDebug(s"XAttr not supported on path $path", e);
case e: Exception =>
// Something else. Log at debug and continue.
logDebug(s"XAttr processing failure on $path", e);
}
Some(len)
}


Expand Down Expand Up @@ -170,6 +223,8 @@ object BasicWriteJobStatsTracker {
private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
private val NUM_PARTS_KEY = "numParts"
/** XAttr key of the data length header added in HADOOP-17414. */
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"

def metrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Expand Down
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.sql.execution.datasources

import java.nio.charset.Charset
import java.nio.charset.{Charset, StandardCharsets}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, FilterFileSystem, Path}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker.FILE_LENGTH_XATTR
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -221,4 +222,127 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
write(file, data2)
}

/**
* Does a length specified in the XAttr header get picked up?
*/
test("XAttr sourced length") {
val file = new Path(tempDirPath, "file")
touch(file)
val xattrFS = new FsWithFakeXAttrs(localfs)
val bigLong = 34359738368L
xattrFS.set(FILE_LENGTH_XATTR, s"$bigLong")
val tracker = new BasicWriteTaskStatsTracker(conf)
assert(Some(bigLong) === tracker.getFileSize(xattrFS, file),
"Size not collected from XAttr entry")
}

/**
* If a file is non-empty then the XAttr size declaration
* is not used.
*/
test("XAttr sourced length only used for 0-byte-files") {
val file = new Path(tempDirPath, "file")
write2(file)
val xattrFS = new FsWithFakeXAttrs(localfs)
val bigLong = 34359738368L
xattrFS.set(FILE_LENGTH_XATTR, s"$bigLong")
val tracker = new BasicWriteTaskStatsTracker(conf)
assert(Some(len2) === tracker.getFileSize(xattrFS, file),
"Size not collected from XAttr entry")
}

/**
* Any FS which supports XAttr must raise an FNFE if the
* file is missing. This verifies resilience on a path
* which the the local FS would not normally take.
*/
test("Missing File with XAttr") {
val missing = new Path(tempDirPath, "missing")
val xattrFS = new FsWithFakeXAttrs(localfs)
val tracker = new BasicWriteTaskStatsTracker(conf)
tracker.newFile(missing.toString)
assert(None === tracker.getFileSize(xattrFS, missing))
}

/**
* If there are any problems parsing/validating the
* header attribute, fall back to the file length.
*/
test("XAttr error recovery") {
val file = new Path(tempDirPath, "file")
touch(file)
val xattrFS = new FsWithFakeXAttrs(localfs)

val tracker = new BasicWriteTaskStatsTracker(conf)

// without a header
assert(Some(0) === tracker.getFileSize(xattrFS, file))

// will fail to parse as a long
xattrFS.set(FILE_LENGTH_XATTR, "Not-a-long")
assert(Some(0) === tracker.getFileSize(xattrFS, file))

// a negative value
xattrFS.set(FILE_LENGTH_XATTR, "-1")
assert(Some(0) === tracker.getFileSize(xattrFS, file))

// empty string
xattrFS.set(FILE_LENGTH_XATTR, "")
assert(Some(0) === tracker.getFileSize(xattrFS, file))

// then a zero byte array
xattrFS.setXAttr(file, FILE_LENGTH_XATTR,
new Array[Byte](0))
assert(Some(0) === tracker.getFileSize(xattrFS, file))
}

/**
* Extend any FS with a mock get/setXAttr.
* A map of attributes is used, these are returned on a getXAttr(path, key)
* call to any path; the other XAttr list/get calls are not implemented.
*/
class FsWithFakeXAttrs(fs: FileSystem) extends FilterFileSystem(fs) {

private val xattrs = scala.collection.mutable.Map[String, Array[Byte]]()

/**
* Mock implementation of setAttr.
*
* @param path path (ignored)
* @param name attribute name.
* @param value byte array value
*/
override def setXAttr(
path: Path,
name: String,
value: Array[Byte]): Unit = {

xattrs.put(name, value)
}

/**
* Set an attribute to the UTF-8 byte value of a string.
*
* @param name attribute name.
* @param value string value
*/
def set(name: String, value: String): Unit = {
setXAttr(null, name, value.getBytes(StandardCharsets.UTF_8))
}

/**
* Get any attribute if it is found in the map, else null.
* @param path path (ignored)
* @param name attribute name.
* @return the byte[] value or null.
*/
override def getXAttr(
path: Path,
name: String): Array[Byte] = {
// force a check for the file and raise an FNFE if not found
getFileStatus(path)

xattrs.getOrElse(name, null)
}
}
}