Skip to content
Permalink
Browse files
[CARBONDATA-4293] Make Table created without external keyword as Tran…
…sactional table

Why is this PR needed?
Currently, when you create a table with location( without external keyword) in cluster,
the corresponding table is created as transactional table. If External keyword is
present, then it is created as non-transactional table. This scenario is not handled
in local mode.

What changes were proposed in this PR?
Made changes, to check if external keyword is present or not. If present, then make
the corresponding table as transactional table.

This closes #4221
  • Loading branch information
Indhumathi27 authored and kunal642 committed Oct 7, 2021
1 parent bca62cd commit 5a710f9d747ebd8b37bf4338b3e10d601a45dc91
Showing 6 changed files with 75 additions and 12 deletions.
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.{QueryExecution, ShuffledRowRDD, SparkPlan, SQLExecution, UnaryExecNode}
import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, RefreshTable}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy, RefreshTable}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
@@ -430,6 +430,22 @@ trait SparkVersionAdapter {
def evaluateWithPredicate(exp: Expression, schema: Seq[Attribute], row: InternalRow): Any = {
InterpretedPredicate.create(exp, schema).expression.eval(row)
}

def getUpdatedPlan(plan: LogicalPlan, sqlText: String): LogicalPlan = {
plan match {
case create@CreateTable(tableDesc, mode, query) =>
if ( tableDesc.storage.locationUri.isDefined &&
!sqlText.toUpperCase.startsWith("CREATE EXTERNAL TABLE ")) {
// add a property to differentiate if create table statement has external keyword or not
val newProperties = tableDesc.properties. +("hasexternalkeyword" -> "false")
val updatedTableDesc = tableDesc.copy(properties = newProperties)
CreateTable(updatedTableDesc, mode, query)
} else {
create
}
case others => others
}
}
}

case class CarbonBuildSide(buildSide: BuildSide) {
@@ -17,7 +17,7 @@

package org.apache.spark.sql.parser

import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession}
import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, CarbonToSparkAdapter, SparkSession}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlParser
@@ -57,7 +57,8 @@ class CarbonExtensionSqlParser(
throw ce
case at: Throwable =>
try {
val parsedPlan = initialParser.parsePlan(sqlText)
val parsedPlan = CarbonToSparkAdapter.getUpdatedPlan(initialParser.parsePlan(sqlText),
sqlText)
CarbonScalaUtil.cleanParserThreadLocals
parsedPlan
} catch {
@@ -194,7 +194,9 @@ object CarbonSparkSqlParserUtil {
throw new MalformedCarbonCommandException(
"Creating table without column(s) is not supported")
}
if (isExternal && fields.isEmpty && tableProperties.nonEmpty) {
// filter out internally added external keyword property
val newTableProperties = tableProperties.filterNot(_._1.equalsIgnoreCase("hasexternalkeyword"))
if (isExternal && fields.isEmpty && newTableProperties.nonEmpty) {
// as fields are always zero for external table, cannot validate table properties.
throw new MalformedCarbonCommandException(
"Table properties are not supported for external table")
@@ -232,17 +234,23 @@ object CarbonSparkSqlParserUtil {
} catch {
case e: Throwable =>
if (fields.nonEmpty) {
val partitionerFields = fields
.filter(field => partitionColumnNames.contains(field.column))
.map(field => PartitionerField(field.column, field.dataType, null))
val tableModel: TableModel = CarbonParserUtil.prepareTableModel(
ifNotExists,
Some(identifier.getDatabaseName),
identifier.getTableName,
fields,
Seq.empty,
partitionerFields,
tblProperties,
bucketFields,
isAlterFlow = false,
table.comment
)
if(table.properties.contains("hasexternalkeyword")) {
isTransactionalTable = true
}
TableNewProcessor(tableModel)
} else {
throw new MalformedCarbonCommandException(
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide
import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext}
import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableStatement, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType}
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, RebaseDateTime, TimestampFormatter}
import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, ShuffledRowRDD, SimpleMode, SparkPlan, SQLExecution, UnaryExecNode}
@@ -471,6 +471,24 @@ trait SparkVersionAdapter {
overwrite,
ifPartitionNotExists)
}

def getUpdatedPlan(plan: LogicalPlan, sqlText: String): LogicalPlan = {
plan match {
case create@CreateTableStatement(_, _, _, _, properties, _, _,
location, _, _, _, _) =>
if ( location.isDefined &&
!sqlText.toUpperCase.startsWith("CREATE EXTERNAL TABLE ")) {
// add a property to differentiate if create table statement has external keyword or not
val newProperties = properties. +("hasexternalkeyword" -> "false")
CreateTableStatement(create.tableName, create.tableSchema, create.partitioning,
create.bucketSpec, newProperties, create.provider, create.options,
location, create.comment, create.serde, create.external, create.ifNotExists)
} else {
create
}
case others => others
}
}
}

case class CarbonBuildSide(buildSide: BuildSide) {
@@ -17,7 +17,7 @@

package org.apache.spark.sql.parser

import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession}
import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, CarbonToSparkAdapter, SparkSession}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlParser
@@ -57,7 +57,8 @@ class CarbonExtensionSqlParser(
throw ce
case at: Throwable =>
try {
val parsedPlan = initialParser.parsePlan(sqlText)
val parsedPlan = CarbonToSparkAdapter.getUpdatedPlan(initialParser.parsePlan(sqlText),
sqlText)
CarbonScalaUtil.cleanParserThreadLocals
parsedPlan
} catch {
@@ -21,6 +21,7 @@ import java.io.File

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

@@ -201,10 +202,28 @@ class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll {
|stored as carbondata
|LOCATION '$newStoreLocation'
""".stripMargin)
val exception = intercept[Exception] {
sql("select * from source").show(false)
}
assert(exception.getMessage.contains("No Index files are present in the table location"))
val tableIdentifier = new TableIdentifier("source", Some("default"))
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
assert(carbonTable.isTransactionalTable && carbonTable.isHivePartitionTable)
sql("INSERT INTO source select 100,'spark','test1'")
checkAnswer(sql("select * from source"), Seq(Row(100, "spark", "test1")))
sql("drop table if exists source")
}

test("test create table with location") {
// test non-partition table
val newStoreLocation = s"$storeLocation/origin1"
FileUtils.deleteDirectory(new File(newStoreLocation))
sql("drop table if exists source")
sql(
s"""
|CREATE TABLE source(a int, b string,c string)
|stored as carbondata
|LOCATION '$newStoreLocation'
""".stripMargin)
val tableIdentifier = new TableIdentifier("source", Some("default"))
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
assert(carbonTable.isTransactionalTable)
sql("INSERT INTO source select 100,'spark','test1'")
checkAnswer(sql("select * from source"), Seq(Row(100, "spark", "test1")))
sql("drop table if exists source")

0 comments on commit 5a710f9

Please sign in to comment.