Skip to content
Permalink
Browse files
[CARBONDATA-4303] Columns mismatch when insert into table with static…
… partition

Why is this PR needed?
When insert into table with static partition, source projects should not contain
static partition column, target table will have all columns, the columns number
comparison between source table and target table is: source table column
number = target table column number - static partition column number.

What changes were proposed in this PR?
Before do the column number comparison, remove the static partition column
from target table.

This Closes #4233
  • Loading branch information
jack86596 authored and Indhumathi27 committed Oct 26, 2021
1 parent 8953cde commit 9dbd2a59ccc92beba8b8a9d8b7834f0e5b2bba8c
Showing 7 changed files with 51 additions and 66 deletions.
@@ -271,7 +271,10 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
val carbonTable = carbonDSRelation.carbonRelation.carbonTable
val tableProperties = carbonTable.getTableInfo.getFactTable.getTableProperties
val spatialProperty = tableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
val expectedOutput = carbonDSRelation.carbonRelation.output
val staticParCols = CarbonToSparkAdapter.getPartitionsFromInsert(p)
.filter(_._2.isDefined).keySet.map(_.toLowerCase())
val expectedOutput = carbonDSRelation.carbonRelation.output.filterNot(
a => staticParCols.contains(a.name.toLowerCase()))
if (expectedOutput.size > CarbonCommonConstants
.DEFAULT_MAX_NUMBER_OF_COLUMNS) {
CarbonException.analysisException(
@@ -291,7 +294,15 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
}
// In spark, PreprocessTableInsertion rule has below cast logic.
// It was missed in carbon when implemented insert into rules.
val actualOutput = newLogicalPlan.output
if (newLogicalPlan.output.size != expectedOutput.size) {
CarbonException.analysisException(
s"${carbonTable.getTableName} requires that the data to be inserted " +
s"have the same number of columns as the target table: " +
s"target table has ${p.table.output.size} column(s) but the " +
s"inserted data has ${p.query.output.length + staticParCols.size} column(s), " +
s"including ${staticParCols.size} partition column(s) having constant value(s)."
)
}
var newChildOutput = newLogicalPlan.output.zip(expectedOutput)
.map {
case (actual, expected) =>
@@ -306,30 +317,24 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
Alias(Cast(actual, expected.dataType), expected.name)(
explicitMetadata = Option(expected.metadata))
}
} ++ actualOutput.takeRight(actualOutput.size - expectedOutput.size)
if (newChildOutput.size >= expectedOutput.size ||
carbonDSRelation.carbonTable.isHivePartitionTable) {
newChildOutput = newChildOutput.zipWithIndex.map { columnWithIndex =>
columnWithIndex._1 match {
case attr: Attribute =>
Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
case attr => attr
}
}
val newChild: LogicalPlan = if (newChildOutput == newLogicalPlan.output) {
throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
} else {
Project(newChildOutput, newLogicalPlan)
newChildOutput = newChildOutput.zipWithIndex.map { columnWithIndex =>
columnWithIndex._1 match {
case attr: Attribute =>
Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
case attr => attr
}

val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)

InsertIntoCarbonTable(carbonDSRelation, CarbonToSparkAdapter.getPartitionsFromInsert(p),
newChild, overwrite, ifNotExists = true, containsMultipleInserts = containsMultipleInserts)
}
val newChild: LogicalPlan = if (newChildOutput == newLogicalPlan.output) {
throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
} else {
CarbonException.analysisException(
"Cannot insert into target table because number of columns mismatch")
Project(newChildOutput, newLogicalPlan)
}

val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)

InsertIntoCarbonTable(carbonDSRelation, CarbonToSparkAdapter.getPartitionsFromInsert(p),
newChild, overwrite, ifNotExists = true, containsMultipleInserts = containsMultipleInserts)
}
}

@@ -472,7 +472,8 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
val e = intercept[Exception] {
sql("insert into table1 select * from table2")
}
assert(e.getMessage.contains("number of columns are different"))
assert(e.getMessage.contains(
"requires that the data to be inserted have the same number of columns as the target table"))
}

test("test insert into partitioned table with int type to double type") {
@@ -486,6 +487,21 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql(s"DROP TABLE IF EXISTS table1")
}

test("test insert into partitioned table with static partition") {
sql("DROP TABLE IF EXISTS table1")
sql("DROP TABLE IF EXISTS select_from")
sql("CREATE TABLE select_from (i int, b string) stored as carbondata")
sql("CREATE TABLE table1 (i int) partitioned by (a int, b string) stored as carbondata")
sql("insert into table select_from select 1, 'a'")
sql("insert into table table1 partition(a='100',b) select 1, b from select_from")
checkAnswer(
sql("select * from table1"),
sql("select 1, 100, 'a'")
)
sql("DROP TABLE IF EXISTS table1")
sql("DROP TABLE IF EXISTS select_from")
}

test("test loading data into partitioned table with segment's updateDeltaEndTimestamp not change") {
val tableName = "test_partitioned_table"
sql(s"drop table if exists $tableName")
@@ -81,22 +81,6 @@ class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with
)
}

test("Inserting and selecting table: create one column boolean table and insert two columns") {
// send to old flow, as for one column two values are inserted.
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
sql("insert into boolean_one_column values(true,false)")
sql("insert into boolean_one_column values(True)")
sql("insert into boolean_one_column values(false,true)")
checkAnswer(
sql("select * from boolean_one_column"),
Seq(Row(true), Row(true), Row(false))
)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT_DEFAULT)
}

test("Inserting and selecting table: two columns boolean and many rows, should support") {
sql("CREATE TABLE if not exists boolean_table2(" +
"col1 BOOLEAN, col2 BOOLEAN) STORED AS carbondata")
@@ -441,7 +425,7 @@ class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with
sql("insert into boolean_table2 select * from boolean_table")
}
assert(exception_insert.getMessage.contains(
"Cannot insert into target table because number of columns mismatch"))
"requires that the data to be inserted have the same number of columns as the target table"))
}

test("Inserting into Hive table from carbon table: support boolean data type and other format") {
@@ -643,26 +627,6 @@ class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with
)
}

test("Inserting overwrite: create one column boolean table and insert two columns") {
// send to old flow, as for one column two values are inserted.
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
sql("insert overwrite table boolean_one_column values(true,false)")
checkAnswer(
sql("select * from boolean_one_column"),
Seq(Row(true))
)
sql("insert overwrite table boolean_one_column values(True)")
sql("insert overwrite table boolean_one_column values(false,true)")
checkAnswer(
sql("select * from boolean_one_column"),
Seq(Row(false))
)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT_DEFAULT)
}

test("Inserting overwrite: two columns boolean and many rows, should support") {
sql("CREATE TABLE if not exists boolean_table2(" +
"col1 BOOLEAN, col2 BOOLEAN) STORED AS carbondata")
@@ -383,7 +383,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql(
"""insert overwrite table deleteinpartition
| partition (dtm=20200908)
| select * from deleteinpartition
| select id, sales from deleteinpartition
| where dtm = 20200907""".stripMargin)
checkAnswer(
sql("""select count(1), dtm from deleteinpartition group by dtm"""),
@@ -395,7 +395,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql(
"""insert overwrite table deleteinpartition
| partition (dtm=20200909)
| select * from deleteinpartition
| select id, sales from deleteinpartition
| where dtm = 20200907""".stripMargin)
checkAnswer(
sql("""select count(1), dtm from deleteinpartition group by dtm"""),
@@ -127,7 +127,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql(
"""insert overwrite table iud.updateinpartition
| partition (dtm=20200908)
| select * from iud.updateinpartition where dtm = 20200907""".stripMargin)
| select id, sales from iud.updateinpartition where dtm = 20200907""".stripMargin)
checkAnswer(
sql(
"""select sales from iud.updateinpartition
@@ -772,7 +772,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
sql(
"""insert overwrite table target
| partition (value=3)
| select * from target where value = 100""".stripMargin)
| select key from target where value = 100""".stripMargin)
checkAnswer(sql("select * from target order by key"),
Seq(Row("c", "200"), Row("e", "100"), Row("e", "3")))
sql("""alter table target drop partition (value=3)""")
@@ -172,13 +172,13 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl
sql(
"""insert overwrite table droppartition
| partition (dtm=20200908)
| select * from droppartition
| select id, sales from droppartition
| where dtm = 20200907""".stripMargin)
// insert overwrite an non-existing partition
sql(
"""insert overwrite table droppartition
| partition (dtm=20200909)
| select * from droppartition
| select id, sales from droppartition
| where dtm = 20200907""".stripMargin)

// make sure drop one partition won't effect other partitions

0 comments on commit 9dbd2a5

Please sign in to comment.