Skip to content

Commit

Permalink
[CARBONDATA-2090] Fix the error message of alter streaming property
Browse files Browse the repository at this point in the history
Fix the error message of alter streaming property

This closes #1873
  • Loading branch information
QiangCai authored and chenliang613 committed Jan 31, 2018
1 parent 4d3f398 commit c9a501d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
Expand Up @@ -231,9 +231,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
// TODO remove this limitation later
val property = properties.find(_._1.equalsIgnoreCase("streaming"))
if (property.isDefined) {
if (!property.get._2.trim.equalsIgnoreCase("true")) {
if (carbonTable.isStreamingTable) {
throw new MalformedCarbonCommandException(
"Streaming property can not be changed once it is 'true'")
} else {
if (!property.get._2.trim.equalsIgnoreCase("true")) {
throw new MalformedCarbonCommandException(
"Streaming property value is incorrect")
}
}
}
ExecutedCommandExec(CarbonAlterTableSetCommand(tableName, properties, isView)) :: Nil
Expand Down
Expand Up @@ -181,7 +181,11 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
}

// normal table not support streaming ingest
test("normal table not support streaming ingest") {
test("normal table not support streaming ingest and alter normal table's streaming property") {
// alter normal table's streaming property
val msg = intercept[MalformedCarbonCommandException](sql("alter table streaming.batch_table set tblproperties('streaming'='false')"))
assertResult("Streaming property value is incorrect")(msg.getMessage)

val identifier = new TableIdentifier("batch_table", Option("streaming"))
val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
.asInstanceOf[CarbonRelation].metaData.carbonTable
Expand Down Expand Up @@ -518,6 +522,11 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
case _ =>
assert(false, "should support set table to streaming")
}

// alter streaming table's streaming property
val msg = intercept[MalformedCarbonCommandException](sql("alter table streaming.stream_table_handoff set tblproperties('streaming'='false')"))
assertResult("Streaming property can not be changed once it is 'true'")(msg.getMessage)

val segments = sql("show segments for table streaming.stream_table_handoff").collect()
assert(segments.length == 2 || segments.length == 3)
assertResult("Streaming")(segments(0).getString(1))
Expand Down

0 comments on commit c9a501d

Please sign in to comment.