Skip to content

Commit

Permalink
[SPARK-34793][SQL] Prohibit saving of day-time and year-month intervals
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
For all built-in datasources, prohibit saving of year-month and day-time intervals that were introduced by SPARK-27793. We plan to support saving of such types at the milestone 2, see SPARK-27790.

### Why are the changes needed?
To improve user experience with Spark SQL, and print nicer error message. Current error message might confuse users:
```
scala> Seq(java.time.Period.ofMonths(1)).toDF.write.mode("overwrite").json("/Users/maximgekk/tmp/123")
21/03/18 22:44:35 ERROR FileFormatWriter: Aborting job 8de402d7-ab69-4dc0-aa8e-14ef06bd2d6b.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (192.168.1.66 executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:418)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:298)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:211)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to convert value 1 (class of class java.lang.Integer}) with the type of YearMonthIntervalType to JSON.
	at scala.sys.package$.error(package.scala:30)
	at org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$23(JacksonGenerator.scala:179)
	at org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$23$adapted(JacksonGenerator.scala:176)
```

### Does this PR introduce _any_ user-facing change?
Yes. After the changes, the example above:
```
scala> Seq(java.time.Period.ofMonths(1)).toDF.write.mode("overwrite").json("/Users/maximgekk/tmp/123")
org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage.
```

### How was this patch tested?
1. Checked nested intervals:
```
scala> spark.range(1).selectExpr("""struct(timestamp'2021-01-02 00:01:02' - timestamp'2021-01-01 00:00:00')""").write.mode("overwrite").parquet("/Users/maximgekk/tmp/123")
org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage.
scala> Seq(Seq(java.time.Period.ofMonths(1))).toDF.write.mode("overwrite").json("/Users/maximgekk/tmp/123")
org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage.
```
2. By running existing test suites:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DataSourceV2DataFrameSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DataSourceV2SQLSuite"
```

Closes #31884 from MaxGekk/ban-save-intervals.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
MaxGekk committed Mar 19, 2021
1 parent 6f89cdf commit 089c3b7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
Expand Up @@ -101,16 +101,16 @@ object TypeUtils {
}

def failWithIntervalType(dataType: DataType): Unit = {
dataType match {
case CalendarIntervalType =>
throw new AnalysisException("Cannot use interval type in the table schema.")
case ArrayType(et, _) => failWithIntervalType(et)
case MapType(kt, vt, _) =>
failWithIntervalType(kt)
failWithIntervalType(vt)
case s: StructType => s.foreach(f => failWithIntervalType(f.dataType))
case u: UserDefinedType[_] => failWithIntervalType(u.sqlType)
case _ =>
invokeOnceForInterval(dataType) {
throw new AnalysisException("Cannot use interval type in the table schema.")
}
}

def invokeOnceForInterval(dataType: DataType)(f: => Unit): Unit = {
def isInterval(dataType: DataType): Boolean = dataType match {
case CalendarIntervalType | DayTimeIntervalType | YearMonthIntervalType => true
case _ => false
}
if (dataType.existsRecursively(isInterval)) f
}
}
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils}
import org.apache.spark.sql.connector.catalog.TableProvider
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.SparkPlan
Expand All @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, Tex
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.{HadoopFSUtils, ThreadUtils, Utils}

Expand Down Expand Up @@ -510,10 +510,7 @@ case class DataSource(
physicalPlan: SparkPlan,
metrics: Map[String, SQLMetric]): BaseRelation = {
val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames)
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw QueryCompilationErrors.cannotSaveIntervalIntoExternalStorageError()
}

disallowWritingIntervals(outputColumns.map(_.dataType))
providingInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(
Expand Down Expand Up @@ -547,10 +544,7 @@ case class DataSource(
* Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]].
*/
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw QueryCompilationErrors.cannotSaveIntervalIntoExternalStorageError()
}

disallowWritingIntervals(data.schema.map(_.dataType))
providingInstance() match {
case dataSource: CreatableRelationProvider =>
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
Expand Down Expand Up @@ -579,6 +573,12 @@ case class DataSource(
DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, newHadoopConfiguration(),
checkEmptyGlobPath, checkFilesExist, enableGlobbing = globPaths)
}

private def disallowWritingIntervals(dataTypes: Seq[DataType]): Unit = {
dataTypes.foreach(TypeUtils.invokeOnceForInterval(_) {
throw QueryCompilationErrors.cannotSaveIntervalIntoExternalStorageError()
})
}
}

object DataSource extends Logging {
Expand Down

0 comments on commit 089c3b7

Please sign in to comment.