Skip to content

Commit 86ae53b

Browse files
rahulsmahadevvkorukanti
authored andcommitted
Nullable columns should work when using generated columns
(Cherry-pick of 38945d0) - There was a bug in the generated columns code `addDefaultExprsOrReturnConstraints` that would not allow null columns in the insert DataFrame to be written even if the column was nullable. - added unit test GitOrigin-RevId: effdb5732e7aeaf0da7fa5e18bc2eda7436ecfbc
1 parent 2ff7cc7 commit 86ae53b

File tree

3 files changed

+87
-7
lines changed

3 files changed

+87
-7
lines changed

core/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader
2424
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints}
2525
import org.apache.spark.sql.delta.metering.DeltaLogging
2626
import org.apache.spark.sql.delta.schema.SchemaUtils
27-
import org.apache.spark.sql.delta.sources.DeltaSourceUtils
27+
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
2828

2929
import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder}
3030
import org.apache.spark.sql.catalyst.encoders.RowEncoder
@@ -98,20 +98,28 @@ object ColumnWithDefaultExprUtils extends DeltaLogging {
9898
lazy val metadataOutputNames = CaseInsensitiveMap(schema.map(f => f.name -> f).toMap)
9999
val constraints = mutable.ArrayBuffer[Constraint]()
100100
val track = mutable.Set[String]()
101-
var selectExprs = schema.map { f =>
101+
var selectExprs = schema.flatMap { f =>
102102
GeneratedColumn.getGenerationExpression(f) match {
103103
case Some(expr) =>
104104
if (topLevelOutputNames.contains(f.name)) {
105105
val column = SchemaUtils.fieldToColumn(f)
106106
// Add a constraint to make sure the value provided by the user is the same as the value
107107
// calculated by the generation expression.
108108
constraints += Constraints.Check(s"Generated Column", EqualNullSafe(column.expr, expr))
109-
column.alias(f.name)
109+
Some(column.alias(f.name))
110110
} else {
111-
new Column(expr).alias(f.name)
111+
Some(new Column(expr).alias(f.name))
112112
}
113113
case None =>
114-
SchemaUtils.fieldToColumn(f).alias(f.name)
114+
if (topLevelOutputNames.contains(f.name) ||
115+
!data.sparkSession.conf.get(DeltaSQLConf.GENERATED_COLUMN_ALLOW_NULLABLE)) {
116+
Some(SchemaUtils.fieldToColumn(f).alias(f.name))
117+
} else {
118+
// we only want to consider columns that are in the data's schema or are generated
119+
// to allow DataFrame with null columns to be written.
120+
// The actual check for nullability on data is done in the DeltaInvariantCheckerExec
121+
None
122+
}
115123
}
116124
}
117125
val cdcSelectExprs = CDCReader.CDC_COLUMNS_IN_DATA.flatMap { cdcColumnName =>

core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,14 @@ trait DeltaSQLConfBase {
670670
.booleanConf
671671
.createWithDefault(true)
672672

673+
val GENERATED_COLUMN_ALLOW_NULLABLE =
674+
buildConf("generatedColumn.allowNullableIngest.enabled")
675+
.internal()
676+
.doc("When enabled this will allow tables with generated columns enabled to be able " +
677+
"to write data without providing values for a nullable column via DataFrame.write")
678+
.booleanConf
679+
.createWithDefault(true)
680+
673681
val DELTA_OPTIMIZE_MIN_FILE_SIZE =
674682
buildConf("optimize.minFileSize")
675683
.internal()

core/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ package org.apache.spark.sql.delta
1919
// scalastyle:off import.ordering.noEmptyLine
2020
import java.io.PrintWriter
2121

22+
import scala.collection.JavaConverters._
23+
2224
import org.apache.spark.sql.delta.commands.cdc.CDCReader
23-
import org.apache.spark.sql.delta.schema.{InvariantViolationException, SchemaUtils}
25+
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils}
2426
import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY
27+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2528
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
2629
import io.delta.tables.DeltaTableBuilder
2730

@@ -35,7 +38,7 @@ import org.apache.spark.sql.functions.{current_timestamp, lit}
3538
import org.apache.spark.sql.internal.SQLConf
3639
import org.apache.spark.sql.streaming.{StreamingQueryException, Trigger}
3740
import org.apache.spark.sql.test.SharedSparkSession
38-
import org.apache.spark.sql.types.{ArrayType, DateType, IntegerType, MetadataBuilder, StructField, StructType, TimestampType}
41+
import org.apache.spark.sql.types.{ArrayType, DateType, IntegerType, MetadataBuilder, StringType, StructField, StructType, TimestampType}
3942
import org.apache.spark.unsafe.types.UTF8String
4043

4144
trait GeneratedColumnTest extends QueryTest with SharedSparkSession with DeltaSQLCommandTest {
@@ -1672,6 +1675,67 @@ trait GeneratedColumnSuiteBase extends GeneratedColumnTest {
16721675
)
16731676
}
16741677
}
1678+
1679+
test("not null should be enforced with generated columns") {
1680+
withTableName("tbl") { tbl =>
1681+
createTable(tbl,
1682+
None, "c1 INT, c2 STRING, c3 INT", Map("c3" -> "c1 + 1"), Seq.empty, Set("c1", "c2", "c3"))
1683+
1684+
// try to write data without c2 in the DF
1685+
val schemaWithoutColumnC2 = StructType(
1686+
Seq(StructField("c1", IntegerType, true)))
1687+
val data1 = List(Row(3))
1688+
val df1 = spark.createDataFrame(data1.asJava, schemaWithoutColumnC2)
1689+
1690+
val e1 = intercept[DeltaInvariantViolationException] {
1691+
df1.write.format("delta").mode("append").saveAsTable("tbl")
1692+
}
1693+
assert(e1.getMessage.contains("Column c2, which has a NOT NULL constraint," +
1694+
" is missing from the data being written into the table."))
1695+
}
1696+
}
1697+
1698+
Seq(true, false).foreach { allowNullInsert =>
1699+
test("nullable column should work with generated columns - " +
1700+
"allowNullInsert enabled=" + allowNullInsert) {
1701+
withTableName("tbl") { tbl =>
1702+
withSQLConf(DeltaSQLConf.GENERATED_COLUMN_ALLOW_NULLABLE.key -> allowNullInsert.toString) {
1703+
createTable(
1704+
tbl, None, "c1 INT, c2 STRING, c3 INT", Map("c3" -> "c1 + 1"), Seq.empty)
1705+
1706+
// create data frame that matches the table's schema
1707+
val data1 = List(Row(1, "a1"), Row(2, "a2"))
1708+
val schema = StructType(
1709+
Seq(StructField("c1", IntegerType, true), StructField("c2", StringType, true)))
1710+
val df1 = spark.createDataFrame(data1.asJava, schema)
1711+
df1.write.format("delta").mode("append").saveAsTable("tbl")
1712+
1713+
// create a data frame that does not have c2
1714+
val schemaWithoutOptionalColumnC2 = StructType(
1715+
Seq(StructField("c1", IntegerType, true)))
1716+
1717+
val data2 = List(Row(3))
1718+
val df2 = spark.createDataFrame(data2.asJava, schemaWithoutOptionalColumnC2)
1719+
1720+
if (allowNullInsert) {
1721+
df2.write.format("delta").mode("append").saveAsTable("tbl")
1722+
// check correctness
1723+
val expectedDF = df1
1724+
.union(df2.withColumn("c2", lit(null).cast(StringType)))
1725+
.withColumn("c3", 'c1 + 1)
1726+
checkAnswer(spark.read.table(tbl), expectedDF)
1727+
} else {
1728+
// when allow null insert is not enabled.
1729+
val e = intercept[AnalysisException] {
1730+
df2.write.format("delta").mode("append").saveAsTable("tbl")
1731+
}
1732+
e.getMessage.contains(
1733+
"A column or function parameter with name `c2` cannot be resolved")
1734+
}
1735+
}
1736+
}
1737+
}
1738+
}
16751739
}
16761740

16771741
class GeneratedColumnSuite extends GeneratedColumnSuiteBase

0 commit comments

Comments
 (0)