Skip to content

Commit

Permalink
Added parser for drop datamap and handled events
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Nov 12, 2017
1 parent cac54d9 commit aebcdc0
Show file tree
Hide file tree
Showing 18 changed files with 378 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,39 @@ private TableDataMap getTableDataMap(String dataMapName,
* Clear the datamap/datamaps of a table from memory
* @param identifier Table identifier
*/
public void clearDataMap(AbsoluteTableIdentifier identifier) {
public void clearDataMaps(AbsoluteTableIdentifier identifier) {
List<TableDataMap> tableDataMaps = allDataMaps.get(identifier.uniqueName());
segmentRefreshMap.remove(identifier.uniqueName());
if (tableDataMaps != null) {
int i = 0;
for (TableDataMap tableDataMap: tableDataMaps) {
if (tableDataMap != null) {
tableDataMap.clear();
break;
}
i++;
}
allDataMaps.remove(identifier.uniqueName());
}
}

/**
* Clear the datamap/datamaps of a table from memory
* @param identifier Table identifier
*/
public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
List<TableDataMap> tableDataMaps = allDataMaps.get(identifier.uniqueName());
if (tableDataMaps != null) {
int i = 0;
for (TableDataMap tableDataMap: tableDataMaps) {
if (tableDataMap != null && dataMapName.equalsIgnoreCase(tableDataMap.getDataMapName())) {
tableDataMap.clear();
tableDataMaps.remove(i);
break;
}
i++;
}
}
}

/**
* Get the blocklet datamap factory to get the detail information of blocklets
* @param identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.service.impl.PathFactory;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.OperationEventListener;
import org.apache.carbondata.events.OperationListenerBus;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;

Expand Down Expand Up @@ -137,7 +135,7 @@ public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
}

private void removeTableFromCache(SchemaTableName table) {
DataMapStoreManager.getInstance().clearDataMap(cc.get(table).carbonTable.getAbsoluteTableIdentifier());
DataMapStoreManager.getInstance().clearDataMaps(cc.get(table).carbonTable.getAbsoluteTableIdentifier());
cc.remove(table);
tableList.remove(table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,28 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
sql("create datamap preagg1 on table PreAggMain using 'preaggregate' as select a,sum(b) from PreAggMain group by a")
checkExistence(sql("DESCRIBE FORMATTED preagg1"), true, "preaggmain_a")
checkExistence(sql("DESCRIBE FORMATTED preagg1"), true, "preaggmain_b_sum")
sql("drop table preagg1")
sql("drop datamap preagg1 on table PreAggMain")
}

test("test pre agg create table Two") {
sql("create datamap preagg2 on table PreAggMain using 'preaggregate' as select a as a1,sum(b) from PreAggMain group by a")
checkExistence(sql("DESCRIBE FORMATTED preagg2"), true, "preaggmain_a")
checkExistence(sql("DESCRIBE FORMATTED preagg2"), true, "preaggmain_b_sum")
sql("drop table preagg2")
sql("drop datamap preagg2 on table PreAggMain")
}

test("test pre agg create table Three") {
sql("create datamap preagg3 on table PreAggMain using 'preaggregate' as select a,sum(b) as sum from PreAggMain group by a")
checkExistence(sql("DESCRIBE FORMATTED preagg3"), true, "preaggmain_a")
checkExistence(sql("DESCRIBE FORMATTED preagg3"), true, "preaggmain_b_sum")
sql("drop table preagg3")
sql("drop datamap preagg3 on table PreAggMain")
}

test("test pre agg create table four") {
sql("create datamap preagg4 on table PreAggMain using 'preaggregate' as select a as a1,sum(b) as sum from PreAggMain group by a")
checkExistence(sql("DESCRIBE FORMATTED preagg4"), true, "preaggmain_a")
checkExistence(sql("DESCRIBE FORMATTED preagg4"), true, "preaggmain_b_sum")
sql("drop table preagg4")
sql("drop datamap preagg4 on table PreAggMain")
}


Expand All @@ -49,31 +49,31 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "preaggmain1_a")
checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "preaggmain1_b_sum")
checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "DICTIONARY")
sql("drop table preagg11")
sql("drop datamap preagg11 on table PreAggMain1")
}

test("test pre agg create table six") {
sql("create datamap preagg12 on table PreAggMain1 using 'preaggregate' as select a as a1,sum(b) from PreAggMain1 group by a")
checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "preaggmain1_a")
checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "preaggmain1_b_sum")
checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "DICTIONARY")
sql("drop table preagg12")
sql("drop datamap preagg12 on table PreAggMain1")
}

test("test pre agg create table seven") {
sql("create datamap preagg13 on table PreAggMain1 using 'preaggregate' as select a,sum(b) as sum from PreAggMain1 group by a")
checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "preaggmain1_a")
checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "preaggmain1_b_sum")
checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "DICTIONARY")
sql("drop table preagg13")
sql("drop datamap preagg13 on table PreAggMain1")
}

test("test pre agg create table eight") {
sql("create datamap preagg14 on table PreAggMain1 using 'preaggregate' as select a as a1,sum(b) as sum from PreAggMain1 group by a")
checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "preaggmain1_a")
checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "preaggmain1_b_sum")
checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "DICTIONARY")
sql("drop table preagg14")
sql("drop datamap preagg14 on table PreAggMain1")
}


Expand All @@ -82,28 +82,28 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_a")
checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_b_sum")
checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_b_count")
sql("drop table preagg15")
sql("drop datamap preagg15 on table PreAggMain2")
}

test("test pre agg create table ten") {
sql("create datamap preagg16 on table PreAggMain2 using 'preaggregate' as select a as a1,max(b) from PreAggMain2 group by a")
checkExistence(sql("DESCRIBE FORMATTED preagg16"), true, "preaggmain2_a")
checkExistence(sql("DESCRIBE FORMATTED preagg16"), true, "preaggmain2_b_max")
sql("drop table preagg16")
sql("drop datamap preagg16 on table PreAggMain2")
}

test("test pre agg create table eleven") {
sql("create datamap preagg17 on table PreAggMain2 using 'preaggregate' as select a,min(b) from PreAggMain2 group by a")
checkExistence(sql("DESCRIBE FORMATTED preagg17"), true, "preaggmain2_a")
checkExistence(sql("DESCRIBE FORMATTED preagg17"), true, "preaggmain2_b_min")
sql("drop table preagg17")
sql("drop datamap preagg17 on table PreAggMain2")
}

test("test pre agg create table twelve") {
sql("create datamap preagg18 on table PreAggMain2 using 'preaggregate' as select a as a1,count(b) from PreAggMain2 group by a")
checkExistence(sql("DESCRIBE FORMATTED preagg18"), true, "preaggmain2_a")
checkExistence(sql("DESCRIBE FORMATTED preagg18"), true, "preaggmain2_b_count")
sql("drop table preagg18")
sql("drop datamap preagg18 on table PreAggMain2")
}

test("test pre agg create table thirteen") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {

override def beforeAll {
sql("drop table if exists maintable")
sql("drop table if exists preagg1")
sql("drop table if exists preagg2")
sql("drop datamap if exists preagg1 on table maintable")
sql("drop datamap if exists preagg2 on table maintable")
sql("create table maintable (a string, b string, c string) stored by 'carbondata'")
}

test("create and drop preaggregate table") {
sql(
"create datamap preagg1 on table maintable using 'preaggregate' as select" +
" a,sum(b) from maintable group by a")
sql("drop table if exists preagg1")
sql("drop datamap if exists preagg1 on table maintable")
checkExistence(sql("show tables"), false, "preagg1")
}

Expand All @@ -42,14 +42,14 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
"create datamap preagg1 on table maintable using 'preaggregate' as select" +
" a,sum(b) from maintable group by a")
sql(
"create datamap preagg2 on table maintable using 'preaggregate' as select" +
"create datamap preagg2 on table maintable using 'preaggregate' as select" +
" a,sum(c) from maintable group by a")
sql("drop table if exists preagg2")
sql("drop datamap if exists preagg2 on table maintable")
val showTables = sql("show tables")
checkExistence(showTables, false, "preagg2")
checkExistence(showTables, true, "preagg1")
}

test("drop main table and check if preaggreagte is deleted") {
sql(
"create datamap preagg2 on table maintable using 'preaggregate' as select" +
Expand All @@ -60,8 +60,8 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {

override def afterAll() {
sql("drop table if exists maintable")
sql("drop table if exists preagg1")
sql("drop table if exists preagg2")
sql("drop datamap if exists preagg1 on table maintable")
sql("drop datamap if exists preagg2 on table maintable")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,19 @@ class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll {

private def createAllAggregateTables(parentTableName: String): Unit = {
sql(
s"""create table ${ parentTableName }_preagg_sum stored BY 'carbondata' tblproperties
|('parent'='$parentTableName') as select id,sum(age) from $parentTableName group by id"""
s"""create datamap ${ parentTableName }_preagg_sum on table $parentTableName using 'preaggregate' as select id,sum(age) from $parentTableName group by id"""
.stripMargin)
sql(
s"""create table ${ parentTableName }_preagg_avg stored BY 'carbondata' tblproperties
|('parent'='$parentTableName') as select id,avg(age) from $parentTableName group by id"""
s"""create datamap ${ parentTableName }_preagg_avg on table $parentTableName using 'preaggregate' as select id,avg(age) from $parentTableName group by id"""
.stripMargin)
sql(
s"""create table ${ parentTableName }_preagg_count stored BY 'carbondata' tblproperties
|('parent'='$parentTableName') as select id,count(age) from $parentTableName group by id"""
s"""create datamap ${ parentTableName }_preagg_count on table $parentTableName using 'preaggregate' as select id,count(age) from $parentTableName group by id"""
.stripMargin)
sql(
s"""create table ${ parentTableName }_preagg_min stored BY 'carbondata' tblproperties
|('parent'='$parentTableName') as select id,min(age) from $parentTableName group by id"""
s"""create datamap ${ parentTableName }_preagg_min on table $parentTableName using 'preaggregate' as select id,min(age) from $parentTableName group by id"""
.stripMargin)
sql(
s"""create table ${ parentTableName }_preagg_max stored BY 'carbondata' tblproperties
|('parent'='$parentTableName') as select id,max(age) from $parentTableName group by id"""
s"""create datamap ${ parentTableName }_preagg_max on table $parentTableName using 'preaggregate' as select id,max(age) from $parentTableName group by id"""
.stripMargin)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
}

test("test datamap create with preagg") {
sql("drop table if exists datamap3")
sql("drop datamap if exists datamap3 on table datamaptest")
sql(
"create datamap datamap3 on table datamaptest using 'preaggregate' dmproperties('key'='value') as select count(a) from datamaptest")
val table = CarbonMetadata.getInstance().getCarbonTable("default_datamaptest")
Expand All @@ -60,7 +60,6 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {

test("test datamap create with preagg with duplicate name") {
intercept[Exception] {
sql("drop table if exists datamap2")
sql(
"create datamap datamap2 on table datamaptest using 'preaggregate' dmproperties('key'='value') as select count(a) from datamaptest")

Expand All @@ -71,6 +70,17 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
assert(dataMapSchemaList.size() == 3)
}

test("test datamap drop with preagg") {
intercept[Exception] {
sql("drop table datamap3")

}
val table = CarbonMetadata.getInstance().getCarbonTable("default_datamaptest")
assert(table != null)
val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
assert(dataMapSchemaList.size() == 3)
}


override def afterAll {
sql("drop table if exists datamaptest")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.events

import org.apache.spark.sql.SparkSession

import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}


/**
*
* @param ifExistsSet
* @param sparkSession
*/
case class DropDataMapPreEvent(
dataMapSchema: Option[DataMapSchema],
ifExistsSet: Boolean,
sparkSession: SparkSession) extends Event


/**
*
* @param ifExistsSet
* @param sparkSession
*/
case class DropDataMapPostEvent(
dataMapSchema: Option[DataMapSchema],
ifExistsSet: Boolean,
sparkSession: SparkSession) extends Event


/**
*
* @param ifExistsSet
* @param sparkSession
*/
case class DropDataMapAbortEvent(
dataMapSchema: Option[DataMapSchema],
ifExistsSet: Boolean,
sparkSession: SparkSession) extends Event
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events.{DropTablePostEvent, DropTablePreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.events._

case class CarbonDropTableCommand(
ifExistsSet: Boolean,
databaseNameOp: Option[String],
tableName: String)
tableName: String,
dropChildTable: Boolean = false)
extends RunnableCommand with SchemaProcessCommand with DataProcessCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
Expand Down Expand Up @@ -76,6 +77,19 @@ case class CarbonDropTableCommand(
None
}
}
if (carbonTable.isDefined) {
val relationIdentifiers = carbonTable.get.getTableInfo.getParentRelationIdentifiers
if (relationIdentifiers != null && !relationIdentifiers.isEmpty) {
if (!dropChildTable) {
if (!ifExistsSet) {
throw new Exception("Child table which is associated with datamap cannot " +
"be dropped, use DROP DATAMAP command to drop")
} else {
return Seq.empty
}
}
}
}
val operationContext = new OperationContext
val dropTablePreEvent: DropTablePreEvent =
DropTablePreEvent(
Expand Down
Loading

0 comments on commit aebcdc0

Please sign in to comment.