Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23372][SQL] Writing empty struct in parquet fails during execution. It should fail earlier in the processing. #20579

Closed
wants to merge 10 commits into from
1 change: 1 addition & 0 deletions docs/sql-programming-guide.md
Expand Up @@ -1807,6 +1807,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.
- Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
- Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``.
- Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema.

## Upgrading From Spark SQL 2.2 to 2.3

Expand Down
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -546,6 +546,7 @@ case class DataSource(
case dataSource: CreatableRelationProvider =>
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
case format: FileFormat =>
DataSource.validateSchema(data.schema)
planForWritingFileFormat(format, mode, data)
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
Expand Down Expand Up @@ -719,4 +720,27 @@ object DataSource extends Logging {
}
globPath
}

/**
* Called before writing into a FileFormat based data source to make sure the
* supplied schema is not empty.
* @param schema
*/
private def validateSchema(schema: StructType): Unit = {
def hasEmptySchema(schema: StructType): Boolean = {
schema.size == 0 || schema.find {
case StructField(_, b: StructType, _, _) => hasEmptySchema(b)
case _ => false
}.isDefined
}


if (hasEmptySchema(schema)) {
throw new AnalysisException(
s"""
|Datasource does not support writing empty or nested empty schemas.
|Please make sure the data schema has at least one or more column(s).
""".stripMargin)
}
}
}
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._


class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll {
Expand Down Expand Up @@ -107,6 +108,33 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}

allFileBasedDataSources.foreach { format =>
test(s"SPARK-23372 error while writing empty schema files using $format") {
withTempPath { outputPath =>
val errMsg = intercept[AnalysisException] {
spark.emptyDataFrame.write.format(format).save(outputPath.toString)
}
assert(errMsg.getMessage.contains(
"Datasource does not support writing empty or nested empty schemas"))
}

// Nested empty schema
withTempPath { outputPath =>
val schema = StructType(Seq(
StructField("a", IntegerType),
StructField("b", StructType(Nil)),
StructField("c", IntegerType)
))
val df = spark.createDataFrame(sparkContext.emptyRDD[Row], schema)
val errMsg = intercept[AnalysisException] {
df.write.format(format).save(outputPath.toString)
}
assert(errMsg.getMessage.contains(
"Datasource does not support writing empty or nested empty schemas"))
}
}
}

allFileBasedDataSources.foreach { format =>
test(s"SPARK-22146 read files containing special characters using $format") {
withTempDir { dir =>
Expand Down