Skip to content

Commit

Permalink
[CARBONDATA-1656][Streaming] Reject alter table command for streaming…
Browse files Browse the repository at this point in the history
… table

This closes apache#1448
  • Loading branch information
jackylk authored and anubhav100 committed Jun 22, 2018
1 parent 6e9329c commit 67ffdc3
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.TableInfo
import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}

private[sql] case class AlterTableAddColumnCommand(
private[sql] case class CarbonAlterTableAddColumnCommand(
alterTableAddColumnsModel: AlterTableAddColumnsModel)
extends RunnableCommand {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}

private[sql] case class AlterTableDataTypeChangeCommand(
private[sql] case class CarbonAlterTableDataTypeChangeCommand(
alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
extends RunnableCommand {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD

private[sql] case class AlterTableDropColumnCommand(
private[sql] case class CarbonAlterTableDropColumnCommand(
alterTableDropColumnModel: AlterTableDropColumnModel)
extends RunnableCommand {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException

private[sql] case class AlterTableRenameTableCommand(
private[sql] case class CarbonAlterTableRenameCommand(
alterTableRenameModel: AlterTableRenameModel)
extends RunnableCommand {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CarbonShowLoadsCommand, LoadTableByInsertCommand, LoadTableCommand}
import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand
import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand, AlterTableRenameTableCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand, CarbonAlterTableRenameCommand}
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}

import org.apache.carbondata.core.util.CarbonUtil
Expand Down Expand Up @@ -56,7 +56,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
sparkSession)
if (isCarbonTable) {
val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier)
ExecutedCommandExec(AlterTableRenameTableCommand(renameModel)) :: Nil
ExecutedCommandExec(CarbonAlterTableRenameCommand(renameModel)) :: Nil
} else {
ExecutedCommandExec(alter) :: Nil
}
Expand Down Expand Up @@ -98,7 +98,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
throw new MalformedCarbonCommandException(
"Operation not allowed : " + altertablemodel.alterSql)
}
case dataTypeChange@AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) =>
case dataTypeChange@CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
alterTableChangeDataTypeModel.databaseName))(sparkSession)
Expand All @@ -107,7 +107,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
case addColumn@AlterTableAddColumnCommand(alterTableAddColumnsModel) =>
case addColumn@CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
alterTableAddColumnsModel.databaseName))(sparkSession)
Expand All @@ -116,7 +116,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
case dropColumn@AlterTableDropColumnCommand(alterTableDropColumnModel) =>
case dropColumn@CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
alterTableDropColumnModel.databaseName))(sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.command.{AlterTableRenameCommand, ExecutedCommandExec}
import org.apache.spark.sql.execution.command.mutation.{DeleteExecution, ProjectForDeleteCommand, ProjectForUpdateCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
import org.apache.spark.sql.hive.CarbonRelation

import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
Expand All @@ -34,12 +35,36 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp

override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case update@ProjectForUpdateCommand(_, tableIdentifier) =>
rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Data update")
ExecutedCommandExec(update) :: Nil
case delete@ProjectForDeleteCommand(_, tableIdentifier, _) =>
rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Date delete")
ExecutedCommandExec(delete) :: Nil
case ProjectForUpdateCommand(_, tableIdentifier) =>
rejectIfStreamingTable(
DeleteExecution.getTableIdentifier(tableIdentifier),
"Data update")
Nil
case ProjectForDeleteCommand(_, tableIdentifier, _) =>
rejectIfStreamingTable(
DeleteExecution.getTableIdentifier(tableIdentifier),
"Date delete")
Nil
case CarbonAlterTableAddColumnCommand(model) =>
rejectIfStreamingTable(
new TableIdentifier(model.tableName, model.databaseName),
"Alter table add column")
Nil
case CarbonAlterTableDropColumnCommand(model) =>
rejectIfStreamingTable(
new TableIdentifier(model.tableName, model.databaseName),
"Alter table drop column")
Nil
case CarbonAlterTableDataTypeChangeCommand(model) =>
rejectIfStreamingTable(
new TableIdentifier(model.tableName, model.databaseName),
"Alter table change datatype")
Nil
case AlterTableRenameCommand(oldTableIdentifier, _, _) =>
rejectIfStreamingTable(
oldTableIdentifier,
"Alter rename table")
Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand}
import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand}
import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
import org.apache.spark.sql.types.StructField

import org.apache.carbondata.core.constants.CarbonCommonConstants
Expand Down Expand Up @@ -326,7 +326,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
table.toLowerCase,
columnName.toLowerCase,
columnNameCopy.toLowerCase)
AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
}

protected lazy val alterTableAddColumns: Parser[LogicalPlan] =
Expand Down Expand Up @@ -395,7 +395,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
tableModel.dimCols,
tableModel.msrCols,
tableModel.highcardinalitydims.getOrElse(Seq.empty))
AlterTableAddColumnCommand(alterTableAddColumnsModel)
CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
}

private def checkFieldDefaultValue(fieldName: String, defaultValueColumnName: String): Boolean = {
Expand All @@ -419,7 +419,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
val alterTableDropColumnModel = AlterTableDropColumnModel(convertDbNameToLowerCase(dbName),
table.toLowerCase,
values.map(_.toLowerCase))
AlterTableDropColumnCommand(alterTableDropColumnModel)
CarbonAlterTableDropColumnCommand(alterTableDropColumnModel)
}

def getFields(schema: Seq[StructField]): Seq[Field] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,21 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
}
}

test("test blocking alter table operation on streaming table") {
intercept[MalformedCarbonCommandException] {
sql("""ALTER TABLE source ADD COLUMNS (c6 string)""").show()
}
intercept[MalformedCarbonCommandException] {
sql("""ALTER TABLE source DROP COLUMNS (c1)""").show()
}
intercept[MalformedCarbonCommandException] {
sql("""ALTER TABLE source RENAME to t""").show()
}
intercept[MalformedCarbonCommandException] {
sql("""ALTER TABLE source CHANGE c1 c1 int""").show()
}
}

override def afterAll {
sql("USE default")
sql("DROP DATABASE IF EXISTS streaming CASCADE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
}

test("test to revert table name on failure") {
intercept[RuntimeException] {
val exception = intercept[RuntimeException] {
new File(TestQueryExecutor.warehouse + "/reverttest_fail").mkdir()
sql("alter table reverttest rename to reverttest_fail")
new File(TestQueryExecutor.warehouse + "/reverttest_fail").delete()
Expand Down

0 comments on commit 67ffdc3

Please sign in to comment.