Skip to content

Commit

Permalink
[CARBONDATA-4274] Fix create partition table error with spark 3.1
Browse files Browse the repository at this point in the history
Why is this PR needed?
With spark 3.1, we can create a partition table by giving partition
columns from schema.
Like below example:
create table partitionTable(c1 int, c2 int, v1 string, v2 string)
stored as carbondata partitioned by (v2,c2)

When the table is created by SparkSession with CarbonExtension,
catalog table is created with the specified partitions.
But in cluster/ with carbon session, when we create partition
table with above syntax it is creating normal table with no partitions.

What changes were proposed in this PR?
partitionByStructFields is empty when we directly give partition
column names. So it was not creating a partition table. Made
changes to identify the partition column names and get the struct
field and datatype info from table columns.

This closes #4208
  • Loading branch information
ShreelekhyaG authored and kunal642 committed Aug 31, 2021
1 parent f52aa20 commit ca659b5
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,6 @@ trait SparkVersionAdapter {
val partitionerFields = partitionByStructFields.map { structField =>
PartitionerField(structField.name, Some(structField.dataType.toString), null)
}
// validate partition clause
if (partitionerFields.nonEmpty) {
// partition columns should not be part of the schema
val badPartCols = partitionerFields.map(_.partitionColumn.toLowerCase).toSet
.intersect(colNames.map(_.toLowerCase).toSet)
if (badPartCols.nonEmpty) {
operationNotAllowed(s"Partition columns should not be specified in the schema: " +
badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]")
, partitionColumns: PartitionFieldListContext)
}
}
partitionerFields
}

Expand Down Expand Up @@ -280,7 +269,10 @@ trait SparkVersionAdapter {
val options = new CarbonOption(properties)
// validate streaming property
validateStreamingProperty(options)
var fields = parser.getFields(cols ++ partitionByStructFields)
// with Spark 3.1, partitioned columns can be already present in schema.
// Check and remove from fields and add partition columns at last
val updatedCols = cols.filterNot(x => partitionByStructFields.contains(x))
var fields = parser.getFields(updatedCols ++ partitionByStructFields)
// validate for create table as select
selectQuery match {
case Some(q) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable
import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.spark.sql.{CarbonThreadUtil, CarbonToSparkAdapter, SparkSession}
import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, SqlBaseParser}
import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkSqlAstBuilder, SparkSqlParser}
Expand Down Expand Up @@ -130,8 +131,30 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
val tableProperties = convertPropertiesToLowercase(properties)

// validate partition clause
val partitionByStructFields = Option(partitionColumns).toSeq
.flatMap(x => visitPartitionFieldList(x)._2)
// There can be two scenarios for creating partition table with spark 3.1.
// Scenario 1: create partition columns with datatype.In this case we get struct fields from
// visitPartitionFieldList and the transform list is empty.
// Example syntax: create table example(col1 int) partitioned by(col2 int)
// Scenario 2: create partition columns using column names from schema. Then struct fields will
// be empty as datatype is not given and transform list consists of field references with
// partition column names. Search the names in table columns to extract the struct fields.
// Example syntax: create table example(col1 int, col2 int) partitioned by(col2)
var (partitionTransformList,
partitionByStructFields) = visitPartitionFieldList(partitionColumns)
if (partitionByStructFields.isEmpty && partitionTransformList.nonEmpty) {
val partitionNames = partitionTransformList
.flatMap(_.references().flatMap(_.fieldNames()))
partitionNames.foreach(partName => {
val structFiled = cols.find(x => x.name.equals(partName))
if (structFiled != None) {
partitionByStructFields = partitionByStructFields :+ structFiled.get
} else {
operationNotAllowed(s"Partition columns not specified in the schema: " +
partitionNames.mkString("[", ",", "]")
, partitionColumns: PartitionFieldListContext)
}
})
}
val partitionFields = CarbonToSparkAdapter.
validatePartitionFields(partitionColumns, colNames, tableProperties,
partitionByStructFields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,27 @@ test("Creation of partition table should fail if the colname in table schema and
sql("drop table if exists onlyPart")
}

if (CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_SPARK_VERSION_SPARK3,
CarbonCommonConstants.CARBON_SPARK_VERSION_SPARK3_DEFAULT).toBoolean) {
test("test create partition on existing table columns") {
sql("drop table if exists partitionTable")
sql("create table partitionTable(c1 int, c2 int, v1 string, v2 string) " +
"stored as carbondata partitioned by (v2,c2)")
val descTable = sql(s"describe formatted partitionTable").collect
descTable.find(_.get(0).toString.contains("Partition Columns")) match {
case Some(row) => assert(row.get(1).toString.contains("v2:STRING, c2:INT"))
case None => assert(false)
}
sql("insert into partitionTable select 1,'sd','sd',2")
sql("alter table partitionTable add partition (v2='ke', c2=3) location 'loc1'")
checkAnswer(sql("show partitions partitionTable"),
Seq(Row("v2=sd/c2=2"), Row("v2=ke/c2=3")))
checkAnswer(sql("select *from partitionTable"), Seq(Row(1, "sd", "sd", 2)))
sql("drop table if exists partitionTable")
}
}

private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = {
val plan = frame.queryExecution.sparkPlan
val scanRDD = plan collect {
Expand Down

0 comments on commit ca659b5

Please sign in to comment.