Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-1656][Streaming] Reject alter table command for streaming table #1448

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.TableInfo
import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}

private[sql] case class AlterTableAddColumnCommand(
private[sql] case class CarbonAlterTableAddColumnCommand(
alterTableAddColumnsModel: AlterTableAddColumnsModel)
extends RunnableCommand {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}

private[sql] case class AlterTableDataTypeChangeCommand(
private[sql] case class CarbonAlterTableDataTypeChangeCommand(
alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
extends RunnableCommand {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD

private[sql] case class AlterTableDropColumnCommand(
private[sql] case class CarbonAlterTableDropColumnCommand(
alterTableDropColumnModel: AlterTableDropColumnModel)
extends RunnableCommand {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException

private[sql] case class AlterTableRenameTableCommand(
private[sql] case class CarbonAlterTableRenameCommand(
alterTableRenameModel: AlterTableRenameModel)
extends RunnableCommand {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CarbonShowLoadsCommand, LoadTableByInsertCommand, LoadTableCommand}
import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand
import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand, AlterTableRenameTableCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand, CarbonAlterTableRenameCommand}
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}

import org.apache.carbondata.core.util.CarbonUtil
Expand Down Expand Up @@ -56,7 +56,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
sparkSession)
if (isCarbonTable) {
val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier)
ExecutedCommandExec(AlterTableRenameTableCommand(renameModel)) :: Nil
ExecutedCommandExec(CarbonAlterTableRenameCommand(renameModel)) :: Nil
} else {
ExecutedCommandExec(alter) :: Nil
}
Expand Down Expand Up @@ -98,7 +98,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
throw new MalformedCarbonCommandException(
"Operation not allowed : " + altertablemodel.alterSql)
}
case dataTypeChange@AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) =>
case dataTypeChange@CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
alterTableChangeDataTypeModel.databaseName))(sparkSession)
Expand All @@ -107,7 +107,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
case addColumn@AlterTableAddColumnCommand(alterTableAddColumnsModel) =>
case addColumn@CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
alterTableAddColumnsModel.databaseName))(sparkSession)
Expand All @@ -116,7 +116,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
case dropColumn@AlterTableDropColumnCommand(alterTableDropColumnModel) =>
case dropColumn@CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
alterTableDropColumnModel.databaseName))(sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.command.{AlterTableRenameCommand, ExecutedCommandExec}
import org.apache.spark.sql.execution.command.mutation.{DeleteExecution, ProjectForDeleteCommand, ProjectForUpdateCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
import org.apache.spark.sql.hive.CarbonRelation

import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
Expand All @@ -34,12 +35,36 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp

override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case update@ProjectForUpdateCommand(_, tableIdentifier) =>
rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Data update")
ExecutedCommandExec(update) :: Nil
case delete@ProjectForDeleteCommand(_, tableIdentifier, _) =>
rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Date delete")
ExecutedCommandExec(delete) :: Nil
case ProjectForUpdateCommand(_, tableIdentifier) =>
rejectIfStreamingTable(
DeleteExecution.getTableIdentifier(tableIdentifier),
"Data update")
Nil
case ProjectForDeleteCommand(_, tableIdentifier, _) =>
rejectIfStreamingTable(
DeleteExecution.getTableIdentifier(tableIdentifier),
"Date delete")
Nil
case CarbonAlterTableAddColumnCommand(model) =>
rejectIfStreamingTable(
new TableIdentifier(model.tableName, model.databaseName),
"Alter table add column")
Nil
case CarbonAlterTableDropColumnCommand(model) =>
rejectIfStreamingTable(
new TableIdentifier(model.tableName, model.databaseName),
"Alter table drop column")
Nil
case CarbonAlterTableDataTypeChangeCommand(model) =>
rejectIfStreamingTable(
new TableIdentifier(model.tableName, model.databaseName),
"Alter table change datatype")
Nil
case AlterTableRenameCommand(oldTableIdentifier, _, _) =>
rejectIfStreamingTable(
oldTableIdentifier,
"Alter rename table")
Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand}
import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand}
import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
import org.apache.spark.sql.types.StructField

import org.apache.carbondata.core.constants.CarbonCommonConstants
Expand Down Expand Up @@ -326,7 +326,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
table.toLowerCase,
columnName.toLowerCase,
columnNameCopy.toLowerCase)
AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
}

protected lazy val alterTableAddColumns: Parser[LogicalPlan] =
Expand Down Expand Up @@ -395,7 +395,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
tableModel.dimCols,
tableModel.msrCols,
tableModel.highcardinalitydims.getOrElse(Seq.empty))
AlterTableAddColumnCommand(alterTableAddColumnsModel)
CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
}

private def checkFieldDefaultValue(fieldName: String, defaultValueColumnName: String): Boolean = {
Expand All @@ -419,7 +419,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
val alterTableDropColumnModel = AlterTableDropColumnModel(convertDbNameToLowerCase(dbName),
table.toLowerCase,
values.map(_.toLowerCase))
AlterTableDropColumnCommand(alterTableDropColumnModel)
CarbonAlterTableDropColumnCommand(alterTableDropColumnModel)
}

def getFields(schema: Seq[StructField]): Seq[Field] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,21 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
}
}

test("test blocking alter table operation on streaming table") {
intercept[MalformedCarbonCommandException] {
sql("""ALTER TABLE source ADD COLUMNS (c6 string)""").show()
}
intercept[MalformedCarbonCommandException] {
sql("""ALTER TABLE source DROP COLUMNS (c1)""").show()
}
intercept[MalformedCarbonCommandException] {
sql("""ALTER TABLE source RENAME to t""").show()
}
intercept[MalformedCarbonCommandException] {
sql("""ALTER TABLE source CHANGE c1 c1 int""").show()
}
}

override def afterAll {
sql("USE default")
sql("DROP DATABASE IF EXISTS streaming CASCADE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
}

test("test to revert table name on failure") {
intercept[RuntimeException] {
val exception = intercept[RuntimeException] {
new File(TestQueryExecutor.warehouse + "/reverttest_fail").mkdir()
sql("alter table reverttest rename to reverttest_fail")
new File(TestQueryExecutor.warehouse + "/reverttest_fail").delete()
Expand Down