Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27588] Binary file data source fails fast and doesn't attempt to read very large files #24483

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,17 @@ object SQLConf {
"and from_utc_timestamp() functions.")
.booleanConf
.createWithDefault(false)

private[sql]
val CONF_SOURCES_BINARY_FILE_MAX_LENGTH = "spark.sql.sources.binaryFile.maxLength"
private[sql]
val SOURCES_BINARY_FILE_MAX_LENGTH = buildConf(CONF_SOURCES_BINARY_FILE_MAX_LENGTH)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think we can follow the other SQLConf here by putting the conf key into buildConf without assigning it into a variable. Also, we can remove the private[sql].

val SOURCES_BINARY_FILE_MAX_LENGTH = buildConf("spark.sql.sources.binaryFile.maxLength")...`

We can set the key with SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH.key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

.doc("The max length of a file that can be read by the binary file data source. " +
"Spark will fail fast and not attempt to read the file if its length exceeds this value. " +
"The theoretical max is Int.MaxValue, though VMs might implement a smaller max.")
.internal()
.intConf
.createWithDefault(Int.MaxValue)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path}
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH
import org.apache.spark.sql.sources.{And, DataSourceRegister, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Not, Or}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -99,6 +101,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
val binaryFileSourceOptions = new BinaryFileSourceOptions(options)
val pathGlobPattern = binaryFileSourceOptions.pathGlobFilter
val filterFuncs = filters.map(filter => createFilterFunction(filter))
val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can define a method in SQLConf, like SQLConf.maxRecordsPerFile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? The logic is not general enough to be applied outside binary data source.


file: PartitionedFile => {
val path = new Path(file.filePath)
Expand All @@ -115,6 +118,11 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
case (MODIFICATION_TIME, i) =>
writer.write(i, DateTimeUtils.fromMillis(status.getModificationTime))
case (CONTENT, i) =>
if (status.getLen > maxLength) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move this to line 113.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it. The conf is to prevent reading very large files that we are sure about failures. User can still use the data source if they don't need content.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I am actually OK with either way.

throw new SparkException(
s"The length of ${status.getPath} is ${status.getLen}, " +
s"which exceeds the max length allowed: ${maxLength}.")
}
val stream = fs.open(status.getPath)
try {
writer.write(i, ByteStreams.toByteArray(stream))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
import org.mockito.Mockito.{mock, when}

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf.{CONF_SOURCES_BINARY_FILE_MAX_LENGTH, SOURCES_BINARY_FILE_MAX_LENGTH}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -339,4 +341,31 @@ class BinaryFileFormatSuite extends QueryTest with SharedSQLContext with SQLTest
assert(df.select("LENGTH").first().getLong(0) === content.length,
"column pruning should be case insensitive")
}

test("fail fast and do not attempt to read if a file is too big") {
assert(spark.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) === Int.MaxValue)
withTempPath { file =>
val path = file.getPath
val content = "123".getBytes
Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE)
def readContent(): DataFrame = {
spark.read.format(BINARY_FILE)
.load(path)
.select(CONTENT)
}
val expected = Seq(Row(content))
QueryTest.checkAnswer(readContent(), expected)
withSQLConf(CONF_SOURCES_BINARY_FILE_MAX_LENGTH -> content.length.toString) {
QueryTest.checkAnswer(readContent(), expected)
}
// Disable read. If the implementation attempts to read, the exception would be different.
file.setReadable(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the test can still pass without this line. Maybe we can remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we still set the max to content.length, the test will fail. This is to ensure we don't even attempt to read the file if the file is too big.

val caught = intercept[SparkException] {
withSQLConf(CONF_SOURCES_BINARY_FILE_MAX_LENGTH -> (content.length - 1).toString) {
QueryTest.checkAnswer(readContent(), expected)
}
}
assert(caught.getMessage.contains("exceeds the max length allowed"))
}
}
}