Skip to content

Commit

Permalink
Merge c613cc7 into 0f1d98f
Browse files Browse the repository at this point in the history
  • Loading branch information
qiuchenjian committed Jan 23, 2019
2 parents 0f1d98f + c613cc7 commit 2823774
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.partition

import org.apache.spark.sql.test.util.QueryTest
import org.scalatest. BeforeAndAfterAll

class TestUpdateForPartitionTable extends QueryTest with BeforeAndAfterAll {

override def beforeAll: Unit = {
dropTable

sql("create table test_range_partition_table (id int) partitioned by (name string) " +
"stored by 'carbondata' TBLPROPERTIES('PARTITION_TYPE' = 'RANGE','RANGE_INFO' = 'a,e,f')")
sql("create table test_hive_partition_table (id int) partitioned by (name string) " +
"stored by 'carbondata'")
sql("create table test_hash_partition_table (id int) partitioned by (name string) " +
"stored by 'carbondata' TBLPROPERTIES('PARTITION_TYPE' = 'HASH','NUM_PARTITIONS' = '2')")
sql("create table test_list_partition_table (id int) partitioned by (name string) " +
"stored by 'carbondata' TBLPROPERTIES('PARTITION_TYPE' = 'LIST','LIST_INFO' = 'a,e,f')")
}

def dropTable = {
sql("drop table if exists test_hash_partition_table")
sql("drop table if exists test_list_partition_table")
sql("drop table if exists test_range_partition_table")
sql("drop table if exists test_hive_partition_table")
}

ignore("test update for partition table") {
sql("drop table if exists test_partition_table")
sql("create table test_partition_table (id int) partitioned by (name string) " +
"stored by 'carbondata' TBLPROPERTIES('PARTITION_TYPE' = 'RANGE','RANGE_INFO' = 'a,e,f')")
sql("insert into test_partition_table select 1,'b' ")
sql("insert into test_partition_table select 2,'z' ")

sql("update test_partition_table set (name) = ('c') where id = 1").collect()
sql("update test_partition_table set (name) = ('d') where id = 1").collect()
sql("update test_partition_table set (name) = ('e') where id = 1").collect()
sql("update test_partition_table set (name) = ('f') where id = 1").collect()

// the result is wrong
sql("select * from test_partition_table").collect().foreach(println)
sql("drop table if exists test_partition_table")
}

test ("test update for range partition table") {
sql("insert into test_range_partition_table select 1,'b' ")

val ex = intercept[UnsupportedOperationException] {
sql("update test_range_partition_table set (name) = ('c') where id = 1").collect()
}

assertResult("Unsupported operation on range ,hash or list partition table")(ex.getMessage)
}

test ("test update for list partition table") {
sql("insert into test_list_partition_table select 1,'b' ")
val ex = intercept[UnsupportedOperationException] {
sql("update test_list_partition_table set (name) = ('c') where id = 1").collect()
}
assertResult("Unsupported operation on range ,hash or list partition table")(ex.getMessage)
}

test ("test update for hash partition table") {

sql("insert into test_hash_partition_table select 1,'b' ")
val ex = intercept[UnsupportedOperationException] {
sql("update test_hash_partition_table set (name) = ('c') where id = 1").collect()
}
assertResult("Unsupported operation on range ,hash or list partition table")(ex.getMessage)
}

test ("test update for hive(standard) partition table") {

sql("insert into test_hive_partition_table select 1,'b' ")

sql("update test_hive_partition_table set (name) = ('c') where id = 1").collect()

assertResult(1)(sql("select * from test_hive_partition_table where name = 'c'").collect().length)

}

override def afterAll() : Unit = {
dropTable
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
Expand Down Expand Up @@ -60,6 +60,13 @@ private[sql] case class CarbonProjectForUpdateCommand(
return Seq.empty
}
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
if (carbonTable.getPartitionInfo != null &&
(carbonTable.getPartitionInfo.getPartitionType == PartitionType.RANGE ||
carbonTable.getPartitionInfo.getPartitionType == PartitionType.HASH ||
carbonTable.getPartitionInfo.getPartitionType == PartitionType.LIST)) {
throw new UnsupportedOperationException("Unsupported operation on range ," +
"hash or list partition table")
}
setAuditTable(carbonTable)
setAuditInfo(Map("plan" -> plan.simpleString))
columns.foreach { col =>
Expand Down

0 comments on commit 2823774

Please sign in to comment.