Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18510] Fix data corruption from inferred partition column dataTypes #15951

Closed
wants to merge 16 commits into from

Conversation

brkyvz
Copy link
Contributor

@brkyvz brkyvz commented Nov 20, 2016

What changes were proposed in this pull request?

The Issue

If I specify my schema when doing

spark.read
  .schema(someSchemaWherePartitionColumnsAreStrings)

but if the partition inference can infer it as IntegerType or I assume LongType or DoubleType (basically fixed size types), then once UnsafeRows are generated, your data will be corrupted.

Proposed solution

The partition handling code path is kind of a mess. In my fix I'm probably adding to the mess, but at least trying to standardize the code path.

The real issue is that a user that uses the spark.read code path can never clearly specify what the partition columns are. If you try to specify the fields in schema, we practically ignore what the user provides, and fall back to our inferred data types. What happens in the end is data corruption.

My solution tries to fix this by always trying to infer partition columns the first time you specify the table. Once we find what the partition columns are, we try to find them in the user specified schema and use the dataType provided there, or fall back to the smallest common data type.

We will ALWAYS append partition columns to the user's schema, even if they didn't ask for it. We will only use the data type they provided if they specified it. While this is confusing, this has been the behavior since Spark 1.6, and I didn't want to change this behavior in the QA period of Spark 2.1. We may revisit this decision later.

A side effect of this PR is that we won't need #15942 if this PR goes in.

How was this patch tested?

Regression tests

@brkyvz
Copy link
Contributor Author

brkyvz commented Nov 20, 2016

cc @rxin @marmbrus Don't know who's the best person to look at this, but git blame says I mainly changed your code :)

@SparkQA
Copy link

SparkQA commented Nov 20, 2016

Test build #68909 has finished for PR 15951 at commit 9080c4e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2016

Test build #68910 has finished for PR 15951 at commit f518405.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2016

Test build #68913 has finished for PR 15951 at commit 46ab68a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

.partitionBy("part", "id")
.mode("overwrite")
.parquet(src.toString)
// make sure to specify the schema in the wrong order. Partition column in the middle, etc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the fix, the order of columns does not matter, right? As long as the types are right, it should work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the fix, it still does not matter. The comment is outdated, I thought something else was the problem. In terms of schema though, the output from Spark is always consistent, i.e. partition columns go last.

dataSchema = dataSchema,
bucketSpec = None,
format,
caseInsensitiveOptions)(sparkSession)

// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
val (schema, inferredPartitionColumns) = inferFileFormatSchema(format)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inferFileFormatSchema is expensive, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not if you already specified the schema. I'm going to rename it to getOrInfer so that people don't think it's expensive all the time

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

@gatorsmile
Copy link
Member

When users specify schemas, we do not want to infer the schemas due to its potentially expensive cost. Based on my understanding, data corruption issues are common when user-specified schemas do not provide correct types.

@gatorsmile
Copy link
Member

gatorsmile commented Nov 21, 2016

The real issue is that a user that uses the spark.read code path can never clearly specify what the partition columns are. If you try to specify the fields in schema, we practically ignore what the user provides, and fall back to our inferred data types. What happens in the end is data corruption.

For data source tables, the partition columns are part of data schema. Users do not need to know which columns are used for partitioning. If they can provide the right types, they should be able to see the expected data.

In the test cases, we can get the correct result with the following changes:

      spark.range(4).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as 'part).coalesce(1)
      val schema = new StructType()
        .add("part", LongType)
        .add("ex", ArrayType(StringType))
        .add("id", LongType)
      spark.read
        .schema(schema)
        .format("parquet")
        .load(src.toString).show()

The order of columns in the specified schema will not affect the result correctness. It only affects the column order of the final result set.

@brkyvz
Copy link
Contributor Author

brkyvz commented Nov 21, 2016

True. But there's no reason "part" and "id" can't be strings right?

On Nov 21, 2016 12:16 AM, "Xiao Li" notifications@github.com wrote:

The real issue is that a user that uses the spark.read code path can never
clearly specify what the partition columns are. If you try to specify the
fields in schema, we practically ignore what the user provides, and fall
back to our inferred data types. What happens in the end is data corruption.

For data source tables, the partition columns are part of data schema.
Users do not need to know which columns are used for partitioning. If they
can provide the right types, they should be able to see the expected data.

In the test cases, we can get the correct result with the following
changes:

  spark.range(4).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as 'part).coalesce(1)
  val schema = new StructType()
    .add("part", LongType)
    .add("ex", ArrayType(StringType))
    .add("id", LongType)
  spark.read
    .schema(schema)
    .format("parquet")
    .load(src.toString).show()


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
#15951 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AFACe7DmaOEHYPu22xbaNPZZl5OKZMwvks5rAVNagaJpZM4K3rkM
.

@gatorsmile
Copy link
Member

Your concern is right. I just did another try. Using the string-type columns as partition columns. See the following code:

val rowRdd: RDD[Row] = sparkContext.parallelize(1 to 10).map(i => Row(i, i.toString))
val inputSchema = StructType(Seq(
  StructField("intCol", IntegerType),
  StructField("stringCol", StringType)
))
spark.createDataFrame(rowRdd, inputSchema)
  .write.partitionBy("stringCol").mode("overwrite").parquet(src.toString)
val schema = new StructType()
  .add("intCol", IntegerType)
  .add("stringCol", IntegerType)
spark.read
  .schema(schema)
  .format("parquet")
  .load(src.toString).show()

Users have to use IntegerType for these partition columns even if the original data type is StringType. This looks werid. Otherwise, they will hit the following error:

Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost, executor driver): java.lang.NullPointerException
	at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getArrayLength(OnHeapColumnVector.java:375)

}

HadoopFsRelation(
fileCatalog,
partitionSchema = fileCatalog.partitionSchema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line causes the problem, right? We always ignore the user-specified column types and use the inferred partition columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly

@SparkQA
Copy link

SparkQA commented Nov 21, 2016

Test build #68947 has finished for PR 15951 at commit f0a2754.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2016

Test build #68946 has finished for PR 15951 at commit de49ba5.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (justPartitioning) {
partitionSchema -> partitionSchema.map(_.name)
}
val tableSchema = userSpecifiedSchema.map { schema =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you missing else here, if you want an early exit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. missed the return

}.getOrElse {
throw new AnalysisException("Unable to infer schema. It must be specified manually.")
val exampleFiles = tempFileCatalog.allFiles().take(2).mkString(",")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might generate some unwanted files. For example, in Json, inferSchema filter out the unwanted files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why will it generate the files? at max it may print few file names that the json format actually ignores while inferring schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None the less, to avoid this, could you not just print the allPaths?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to keep it consistent with before. I can remove it.

// backwards compatibility before SPARK-18510. Return the schema of catalog tables as is
return userSpecifiedSchema.get -> partitionSchema.map(_.name)
}
val tableSchema = userSpecifiedSchema.map { schema =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableSchema -> dataSchema. Be consistent with the other codes?

val dataSchema = if (isStreaming) {
schema
} else {
StructType(schema.dropRight(inferredPartitionColumns.length))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is little hacky. How about changing the return of getOrInferFileFormatSchema to (dataSchema, partitionSchema)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks elegant, but it opens up the possibility of a new kind of bug where the types in partitioning columns in the returned partitionSchema and dataSchema are different.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm. with justPartitioning param, its more confusing as it is right now. so I like this suggestion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

@gatorsmile
Copy link
Member

gatorsmile commented Nov 21, 2016

Also cc @ericl @cloud-fan who just recently changed the related codes.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is round 1 of reviewing. I am still looking to understand the code.

private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = {
userSpecifiedSchema.map(_ -> partitionColumns).orElse {
val allPaths = caseInsensitiveOptions.get("path")
private def getOrInferFileFormatSchema(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add param docs to define what justPartitioning means?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This aint right. You cant return something incorrect. Rather return null for the first schema.
also, the docs is confusing in this way. please add @param and @return after the params to clarify what gets returned in both cases.

private def getOrInferFileFormatSchema(
format: FileFormat,
justPartitioning: Boolean = false): (StructType, Seq[String]) = {
lazy val tempFileCatalog = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add docs on why this is lazy. It took me half-a-minute to trace down why this should be lazy.

@@ -126,7 +126,6 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
normalizeColumnName(tableDesc.identifier, schema, colName, "partition")
}
checkDuplication(normalizedPartitionCols, "partition")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undo this change.

@@ -274,7 +274,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
pathToPartitionedTable,
userSpecifiedSchema = Option("num int, str string"),
userSpecifiedPartitionCols = partitionCols,
expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
expectedSchema = new StructType().add("str", StringType).add("num", IntegerType),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so in this PR, for some cases, the order of fields in schema created after resolveRelation is changing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the original test case was incorrect. Although the schema check passes, if you really read rows out of the Dataset, you'll hit an exception, as shown in the following Spark shell session:

import org.apache.spark.sql.types._

val df0 = spark.range(10).select(
  ('id % 4) cast StringType as "part",
  'id cast StringType as "data"
)

val path = "/tmp/part.parquet"
df0.write.mode("overwrite").partitionBy("part").parquet(path)

val df1 = spark.read.schema(
  new StructType()
    .add("part", StringType, nullable = true)
    .add("data", StringType, nullable = true)
).parquet(path)

df1.printSchema()
// root
//  |-- part: string (nullable = true)
//  |-- data: string (nullable = true)

df1.show()
// 16/11/22 22:52:21 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 34)
// java.lang.NullPointerException
//         at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getArrayLength(OnHeapColumnVector.java:375)
//         at org.apache.spark.sql.execution.vectorized.ColumnVector.getArray(ColumnVector.java:554)
//         at org.apache.spark.sql.execution.vectorized.ColumnVector.getByteArray(ColumnVector.java:576)
//         [...]

@@ -532,4 +532,50 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
assert(e.getMessage.contains("does not support recovering"))
assert(e.getMessage.contains("checkpoint location"))
}

test("SPARK-18510: Data corruption from user specified partition column schemas") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say rename the test to something that explains what this test actually tests. For example, "use user-specified schema for partitioning columns in file sources"

@@ -573,4 +573,40 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}
}

test("SPARK-18510: Data corruption from user specified partition column schemas") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above.

@SparkQA
Copy link

SparkQA commented Nov 21, 2016

Test build #68956 has finished for PR 15951 at commit fde4f64.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round 2

val dataSchema = if (isStreaming) {
schema
} else {
StructType(schema.dropRight(inferredPartitionColumns.length))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks elegant, but it opens up the possibility of a new kind of bug where the types in partitioning columns in the returned partitionSchema and dataSchema are different.

return partitionSchema -> partitionSchema.map(_.name)
}
if (catalogTable.isDefined && userSpecifiedSchema.isDefined) {
// backwards compatibility before SPARK-18510. Return the schema of catalog tables as is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its returning the user specified schema, not the catalog table schema.

val dataSchema = if (isStreaming) {
schema
} else {
StructType(schema.dropRight(inferredPartitionColumns.length))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm. with justPartitioning param, its more confusing as it is right now. so I like this suggestion

StructType(schema.dropRight(inferredPartitionColumns.length))
}

val partitionSchema = if (inferredPartitionColumns.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the one i dont understand. why should the behavior be different if this non-streaming source (HadoopFSRelation) is being create by user directly or by the file streaming source.

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #68964 has finished for PR 15951 at commit ff0a2c7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ericl
Copy link
Contributor

ericl commented Nov 22, 2016

This might change the behavior, but how about just raising an error if the partition types differ from those provided by the user, or the user failed to provide a partitioning schema? It seems confusing to partially infer a schema when the user does not provide it.

@brkyvz
Copy link
Contributor Author

brkyvz commented Nov 22, 2016

@ericl I feel that would probably break 90% of production Spark jobs out there, therefore am a bit scared of something radical. I agree, it's confusing and annoying

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #68965 has finished for PR 15951 at commit 330e148.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@brkyvz
Copy link
Contributor Author

brkyvz commented Nov 22, 2016

Thanks @tejasapatil for the review. Addressed your comments

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #68975 has finished for PR 15951 at commit 97003e2.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

spark.read
  .schema(someSchemaWherePartitionColumnsAreStrings)

I don't think this is valid use case, DataFrameReader can't specify partition columns, so we will always infer partitions.

I think the real problem is HadoopFsRelation.schema:

val schema: StructType = {
  val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
  StructType(dataSchema ++ partitionSchema.filterNot { column =>
    dataSchemaColumnNames.contains(column.name.toLowerCase)
  })
}

It sliently drops the partition schema if the partition column names are duplicated in data schema.

I think the best solution is to add partitionBy in DataFrameReader so that we can skip inferring partitions really. But this maybe too late for 2.1, we should define a better semantic for the current "broken" API.

Once we find what the partition columns are, we try to find them in the user specified schema and use the dataType provided there, or fall back to the smallest common data type.

This LGTM

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #68976 has finished for PR 15951 at commit 6f741b6.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #68979 has finished for PR 15951 at commit 08566e7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

shall we update the document of DataSource.partitionColumns? When this ist is empty, the relation is unpartitioned. this is wrong now, the relation can be partitioned even if partitionColumns is empty.

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #69010 has finished for PR 15951 at commit f3b42ff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just a few questions that needs clarification.

val equality = sparkSession.sessionState.conf.resolver
StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
}.orElse {
format.inferSchema(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to confirm, inferschema returns schema without the partition columns?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@@ -144,8 +224,8 @@ case class DataSource(
"you may be able to create a static DataFrame on that directory with " +
"'spark.read.load(directory)' and infer schema from it.")
}
val (schema, partCols) = inferFileFormatSchema(format)
SourceInfo(s"FileSource[$path]", schema, partCols)
val (schema, partCols) = getOrInferFileFormatSchema(format)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to dataSchema and partitionSchema

.getOrElse(throw new AnalysisException(s"Invalid partition column '$c'"))
})
}
val (dataSchema, inferredPartitionSchema) = getOrInferFileFormatSchema(format)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this may not be inferred right? so just partitionSchema would be a better name.

val globbedPaths = allPaths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val fs = hdfsPath.getFileSystem(hadoopConf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any need for this change? hadoopConf is not reused any where else

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any need for this change? hadoopConf is not reused any where else

this is to avoid creating a new hadoopConf for each path.

@@ -281,33 +361,25 @@ case class DataSource(
// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.flatMap { path =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, for paths with globs, this would expand here ONCE, and then expand them AGAIN in getOrInferFileFormatSchema, right?

If so, we dont have to fix it in this PR, but we should document this in a JIRA or something for fixing later.

@tdas
Copy link
Contributor

tdas commented Nov 22, 2016

Would be good if @rxin can take a look.

Copy link
Contributor

@liancheng liancheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM except for a few minor issues.

format: FileFormat,
justPartitioning: Boolean = false): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to
lazy val tempFileCatalog = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: tempFileIndex

val equality = sparkSession.sessionState.conf.resolver
StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
}.orElse {
format.inferSchema(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

inferredPartitions
} else {
val partitionFields = partitionColumns.map { partitionColumn =>
userSpecifiedSchema.flatMap(_.find(_.name == partitionColumn)).orElse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need to use the resolver to handle case sensitivity here.

|Falling back to inferred dataType if it exists.
""".stripMargin)
}
inferredPartitions.find(_.name == partitionColumn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated code?

private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = {
userSpecifiedSchema.map(_ -> partitionColumns).orElse {
val allPaths = caseInsensitiveOptions.get("path")
private def getOrInferFileFormatSchema(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be clearer if we can split this method into two: one for partition schema and the other for data schema. In this way, we can also remove the justPartitioning argument by calling the method you need at the right place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, just realized that it might be hard to split because of the temporary InMemoryFileIndex.

@@ -274,7 +274,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
pathToPartitionedTable,
userSpecifiedSchema = Option("num int, str string"),
userSpecifiedPartitionCols = partitionCols,
expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
expectedSchema = new StructType().add("str", StringType).add("num", IntegerType),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the original test case was incorrect. Although the schema check passes, if you really read rows out of the Dataset, you'll hit an exception, as shown in the following Spark shell session:

import org.apache.spark.sql.types._

val df0 = spark.range(10).select(
  ('id % 4) cast StringType as "part",
  'id cast StringType as "data"
)

val path = "/tmp/part.parquet"
df0.write.mode("overwrite").partitionBy("part").parquet(path)

val df1 = spark.read.schema(
  new StructType()
    .add("part", StringType, nullable = true)
    .add("data", StringType, nullable = true)
).parquet(path)

df1.printSchema()
// root
//  |-- part: string (nullable = true)
//  |-- data: string (nullable = true)

df1.show()
// 16/11/22 22:52:21 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 34)
// java.lang.NullPointerException
//         at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getArrayLength(OnHeapColumnVector.java:375)
//         at org.apache.spark.sql.execution.vectorized.ColumnVector.getArray(ColumnVector.java:554)
//         at org.apache.spark.sql.execution.vectorized.ColumnVector.getByteArray(ColumnVector.java:576)
//         [...]

@tdas
Copy link
Contributor

tdas commented Nov 23, 2016

Thanks @liancheng for your comments. Since these are mostly nits, I am going to merge this PR (since it fixes critical bug for 2.1) and address the final comments in a separate PR.

asfgit pushed a commit that referenced this pull request Nov 23, 2016
…Types

## What changes were proposed in this pull request?

### The Issue

If I specify my schema when doing
```scala
spark.read
  .schema(someSchemaWherePartitionColumnsAreStrings)
```
but if the partition inference can infer it as IntegerType or I assume LongType or DoubleType (basically fixed size types), then once UnsafeRows are generated, your data will be corrupted.

### Proposed solution

The partition handling code path is kind of a mess. In my fix I'm probably adding to the mess, but at least trying to standardize the code path.

The real issue is that a user that uses the `spark.read` code path can never clearly specify what the partition columns are. If you try to specify the fields in `schema`, we practically ignore what the user provides, and fall back to our inferred data types. What happens in the end is data corruption.

My solution tries to fix this by always trying to infer partition columns the first time you specify the table. Once we find what the partition columns are, we try to find them in the user specified schema and use the dataType provided there, or fall back to the smallest common data type.

We will ALWAYS append partition columns to the user's schema, even if they didn't ask for it. We will only use the data type they provided if they specified it. While this is confusing, this has been the behavior since Spark 1.6, and I didn't want to change this behavior in the QA period of Spark 2.1. We may revisit this decision later.

A side effect of this PR is that we won't need #15942 if this PR goes in.

## How was this patch tested?

Regression tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15951 from brkyvz/partition-corruption.

(cherry picked from commit 0d1bf2b)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 0d1bf2b Nov 23, 2016
ghost pushed a commit to dbtsai/spark that referenced this pull request Nov 24, 2016
## What changes were proposed in this pull request?

This PR addressed the rest comments in apache#15951.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#15997 from zsxwing/SPARK-18510-follow-up.
asfgit pushed a commit that referenced this pull request Nov 24, 2016
## What changes were proposed in this pull request?

This PR addressed the rest comments in #15951.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15997 from zsxwing/SPARK-18510-follow-up.

(cherry picked from commit 223fa21)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
…Types

## What changes were proposed in this pull request?

### The Issue

If I specify my schema when doing
```scala
spark.read
  .schema(someSchemaWherePartitionColumnsAreStrings)
```
but if the partition inference can infer it as IntegerType or I assume LongType or DoubleType (basically fixed size types), then once UnsafeRows are generated, your data will be corrupted.

### Proposed solution

The partition handling code path is kind of a mess. In my fix I'm probably adding to the mess, but at least trying to standardize the code path.

The real issue is that a user that uses the `spark.read` code path can never clearly specify what the partition columns are. If you try to specify the fields in `schema`, we practically ignore what the user provides, and fall back to our inferred data types. What happens in the end is data corruption.

My solution tries to fix this by always trying to infer partition columns the first time you specify the table. Once we find what the partition columns are, we try to find them in the user specified schema and use the dataType provided there, or fall back to the smallest common data type.

We will ALWAYS append partition columns to the user's schema, even if they didn't ask for it. We will only use the data type they provided if they specified it. While this is confusing, this has been the behavior since Spark 1.6, and I didn't want to change this behavior in the QA period of Spark 2.1. We may revisit this decision later.

A side effect of this PR is that we won't need apache#15942 if this PR goes in.

## How was this patch tested?

Regression tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes apache#15951 from brkyvz/partition-corruption.
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
## What changes were proposed in this pull request?

This PR addressed the rest comments in apache#15951.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#15997 from zsxwing/SPARK-18510-follow-up.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…Types

## What changes were proposed in this pull request?

### The Issue

If I specify my schema when doing
```scala
spark.read
  .schema(someSchemaWherePartitionColumnsAreStrings)
```
but if the partition inference can infer it as IntegerType or I assume LongType or DoubleType (basically fixed size types), then once UnsafeRows are generated, your data will be corrupted.

### Proposed solution

The partition handling code path is kind of a mess. In my fix I'm probably adding to the mess, but at least trying to standardize the code path.

The real issue is that a user that uses the `spark.read` code path can never clearly specify what the partition columns are. If you try to specify the fields in `schema`, we practically ignore what the user provides, and fall back to our inferred data types. What happens in the end is data corruption.

My solution tries to fix this by always trying to infer partition columns the first time you specify the table. Once we find what the partition columns are, we try to find them in the user specified schema and use the dataType provided there, or fall back to the smallest common data type.

We will ALWAYS append partition columns to the user's schema, even if they didn't ask for it. We will only use the data type they provided if they specified it. While this is confusing, this has been the behavior since Spark 1.6, and I didn't want to change this behavior in the QA period of Spark 2.1. We may revisit this decision later.

A side effect of this PR is that we won't need apache#15942 if this PR goes in.

## How was this patch tested?

Regression tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes apache#15951 from brkyvz/partition-corruption.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

This PR addressed the rest comments in apache#15951.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#15997 from zsxwing/SPARK-18510-follow-up.
@brkyvz brkyvz deleted the partition-corruption branch February 3, 2019 20:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
9 participants