From 3f98c51237e72987bf5faef4ba3798a57f027f5d Mon Sep 17 00:00:00 2001 From: namanrastogi Date: Wed, 27 Feb 2019 19:45:18 +0530 Subject: [PATCH] [CARBONDATA-3305] Added DDL to drop cache for a table Added CarbonDropCacheCommand to drop all cache entries for a particular table. usage: DROP METACACHE ON TABLE tableName Dropping cache for child table is not supported, the table has to be parent table. Running the above command will clear all the entries belonging to the table, its index entries, its datamap entries and it's forward and reverse dictionary. This closes #3138 --- .../carbondata/core/cache/CarbonLRUCache.java | 23 +- docs/ddl-of-carbondata.md | 13 +- .../commands/TestCarbonDropCacheCommand.scala | 200 ++++++++++++++++++ .../commands/TestCarbonShowCacheCommand.scala | 2 +- .../carbondata/events/DropCacheEvents.scala | 28 +++ .../org/apache/carbondata/events/Events.scala | 7 + .../org/apache/spark/sql/CarbonEnv.scala | 2 + .../cache/CarbonDropCacheCommand.scala | 103 +++++++++ .../cache/CarbonShowCacheCommand.scala | 56 ++--- .../cache/DropCachePreAggEventListener.scala | 70 ++++++ .../sql/parser/CarbonSpark2SqlParser.scala | 10 +- 11 files changed, 473 insertions(+), 41 deletions(-) create mode 100644 integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonDropCacheCommand.scala create mode 100644 integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala create mode 100644 integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala create mode 100644 integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java index 74ff8a0a9e9..0c751730e59 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java @@ -62,10 +62,13 @@ public final class CarbonLRUCache { */ public CarbonLRUCache(String propertyName, String defaultPropertyName) { try { - lruCacheMemorySize = Integer - .parseInt(CarbonProperties.getInstance().getProperty(propertyName, defaultPropertyName)); + lruCacheMemorySize = Long + .parseLong(CarbonProperties.getInstance().getProperty(propertyName, defaultPropertyName)); } catch (NumberFormatException e) { - lruCacheMemorySize = Integer.parseInt(defaultPropertyName); + LOGGER.error(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE + + " is not in a valid format. Falling back to default value: " + + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT); + lruCacheMemorySize = Long.parseLong(defaultPropertyName); } initCache(); if (lruCacheMemorySize > 0) { @@ -148,6 +151,17 @@ public void remove(String key) { } } + /** + * @param keys + */ + public void removeAll(List keys) { + synchronized (lruCacheMap) { + for (String key : keys) { + removeKey(key); + } + } + } + /** * This method will remove the key from lru cache * @@ -302,6 +316,9 @@ public Cacheable get(String key) { */ public void clear() { synchronized (lruCacheMap) { + for (Cacheable cachebleObj : lruCacheMap.values()) { + cachebleObj.invalidate(); + } lruCacheMap.clear(); } } diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md index 34764758450..e6f209e7902 100644 --- a/docs/ddl-of-carbondata.md +++ b/docs/ddl-of-carbondata.md @@ -1095,7 +1095,7 @@ Users can specify which columns to include and exclude for local dictionary gene about current cache used status in memory through the following command: ```sql - SHOW METADATA + SHOW METACACHE ``` This shows the overall memory consumed in the cache by categories - index files, dictionary and @@ -1103,10 +1103,19 @@ Users can specify which columns to include and exclude for local dictionary gene database. ```sql - SHOW METADATA ON TABLE tableName + SHOW METACACHE ON TABLE tableName ``` This shows detailed information on cache usage by the table `tableName` and its carbonindex files, its dictionary files, its datamaps and children tables. This command is not allowed on child tables. + + ```sql + DROP METACACHE ON TABLE tableName + ``` + + This clears any entry in cache by the table `tableName`, its carbonindex files, + its dictionary files, its datamaps and children tables. + + This command is not allowed on child tables. diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonDropCacheCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonDropCacheCommand.scala new file mode 100644 index 00000000000..982ec762f81 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonDropCacheCommand.scala @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sql.commands + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.core.cache.CacheProvider +import org.apache.carbondata.core.constants.CarbonCommonConstants + +class TestCarbonDropCacheCommand extends QueryTest with BeforeAndAfterAll { + + val dbName = "cache_db" + + override protected def beforeAll(): Unit = { + sql(s"DROP DATABASE IF EXISTS $dbName CASCADE") + sql(s"CREATE DATABASE $dbName") + sql(s"USE $dbName") + } + + override protected def afterAll(): Unit = { + sql(s"use default") + sql(s"DROP DATABASE $dbName CASCADE") + } + + + test("Test dictionary") { + val tableName = "t1" + + sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " + + s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + + s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," + + s"attendance int,utilization int, salary int) stored by 'carbondata' " + + s"TBLPROPERTIES('DICTIONARY_INCLUDE'='designation, workgroupcategoryname')") + sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName") + sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName") + sql(s"SELECT * FROM $tableName").collect() + + val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet()) + + sql(s"DROP METACACHE ON TABLE $tableName") + + val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet()) + droppedCacheKeys.removeAll(cacheAfterDrop) + + val tableIdentifier = new TableIdentifier(tableName, Some(dbName)) + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession) + val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + val dictIds = carbonTable.getAllDimensions.asScala.filter(_.isGlobalDictionaryEncoding) + .map(_.getColumnId).toArray + + // Check if table index entries are dropped + assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath))) + + // check if cache does not have any more table index entries + assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath))) + + // check if table dictionary entries are dropped + for (dictId <- dictIds) { + assert(droppedCacheKeys.asScala.exists(key => key.contains(dictId))) + } + + // check if cache does not have any more table dictionary entries + for (dictId <- dictIds) { + assert(!cacheAfterDrop.asScala.exists(key => key.contains(dictId))) + } + } + + + test("Test preaggregate datamap") { + val tableName = "t2" + + sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " + + s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + + s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," + + s"attendance int, utilization int, salary int) stored by 'carbondata'") + sql(s"CREATE DATAMAP dpagg ON TABLE $tableName USING 'preaggregate' AS " + + s"SELECT AVG(salary), workgroupcategoryname from $tableName GROUP BY workgroupcategoryname") + sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName") + sql(s"SELECT * FROM $tableName").collect() + sql(s"SELECT AVG(salary), workgroupcategoryname from $tableName " + + s"GROUP BY workgroupcategoryname").collect() + val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet()) + + sql(s"DROP METACACHE ON TABLE $tableName") + + val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet()) + droppedCacheKeys.removeAll(cacheAfterDrop) + + val tableIdentifier = new TableIdentifier(tableName, Some(dbName)) + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession) + val dbPath = CarbonEnv + .getDatabaseLocation(tableIdentifier.database.get, sqlContext.sparkSession) + val tablePath = carbonTable.getTablePath + val preaggPath = dbPath + CarbonCommonConstants.FILE_SEPARATOR + carbonTable.getTableName + + "_" + carbonTable.getTableInfo.getDataMapSchemaList.get(0).getDataMapName + + CarbonCommonConstants.FILE_SEPARATOR + + // Check if table index entries are dropped + assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath))) + + // check if cache does not have any more table index entries + assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath))) + + // Check if preaggregate index entries are dropped + assert(droppedCacheKeys.asScala.exists(key => key.startsWith(preaggPath))) + + // check if cache does not have any more preaggregate index entries + assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(preaggPath))) + } + + + test("Test bloom filter") { + val tableName = "t3" + + sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " + + s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + + s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," + + s"attendance int, utilization int, salary int) stored by 'carbondata'") + sql(s"CREATE DATAMAP dblom ON TABLE $tableName USING 'bloomfilter' " + + "DMPROPERTIES('INDEX_COLUMNS'='deptno')") + sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName") + sql(s"SELECT * FROM $tableName").collect() + sql(s"SELECT * FROM $tableName WHERE deptno=10").collect() + + val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet()) + + sql(s"DROP METACACHE ON TABLE $tableName") + + val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet()) + droppedCacheKeys.removeAll(cacheAfterDrop) + + val tableIdentifier = new TableIdentifier(tableName, Some(dbName)) + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession) + val tablePath = carbonTable.getTablePath + val bloomPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR + "dblom" + + CarbonCommonConstants.FILE_SEPARATOR + + // Check if table index entries are dropped + assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath))) + + // check if cache does not have any more table index entries + assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath))) + + // Check if bloom entries are dropped + assert(droppedCacheKeys.asScala.exists(key => key.contains(bloomPath))) + + // check if cache does not have any more bloom entries + assert(!cacheAfterDrop.asScala.exists(key => key.contains(bloomPath))) + } + + + test("Test preaggregate datamap fail") { + val tableName = "t4" + + sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " + + s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + + s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," + + s"attendance int, utilization int, salary int) stored by 'carbondata'") + sql(s"CREATE DATAMAP dpagg ON TABLE $tableName USING 'preaggregate' AS " + + s"SELECT AVG(salary), workgroupcategoryname from $tableName GROUP BY workgroupcategoryname") + sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName") + sql(s"SELECT * FROM $tableName").collect() + sql(s"SELECT AVG(salary), workgroupcategoryname from $tableName " + + s"GROUP BY workgroupcategoryname").collect() + + val fail_message = intercept[UnsupportedOperationException] { + sql(s"DROP METACACHE ON TABLE ${tableName}_dpagg") + }.getMessage + assert(fail_message.contains("Operation not allowed on child table.")) + } + + + def clone(oldSet: util.Set[String]): util.HashSet[String] = { + val newSet = new util.HashSet[String] + newSet.addAll(oldSet) + newSet + } +} diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala index 0e1cd00eb98..e999fc7d6b3 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala @@ -128,7 +128,7 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll { sql("use cache_empty_db").collect() val result1 = sql("show metacache").collect() assertResult(2)(result1.length) - assertResult(Row("cache_empty_db", "ALL", "0", "0", "0"))(result1(1)) + assertResult(Row("cache_empty_db", "ALL", "0 bytes", "0 bytes", "0 bytes"))(result1(1)) sql("use cache_db").collect() val result2 = sql("show metacache").collect() diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala new file mode 100644 index 00000000000..2e8b78e129e --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropCacheEvents.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.events + +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.core.metadata.schema.table.CarbonTable + +case class DropCacheEvent( + carbonTable: CarbonTable, + sparkSession: SparkSession, + internalCall: Boolean) + extends Event with DropCacheEventInfo diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala index 1830a359072..c03d3c63232 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala @@ -62,6 +62,13 @@ trait DropTableEventInfo { val ifExistsSet: Boolean } +/** + * event for drop cache + */ +trait DropCacheEventInfo { + val carbonTable: CarbonTable +} + /** * event for alter_table_drop_column */ diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index a7677d7000c..60d896a1229 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener} +import org.apache.spark.sql.execution.command.cache.DropCachePreAggEventListener import org.apache.spark.sql.execution.command.preaaggregate._ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction import org.apache.spark.sql.hive._ @@ -185,6 +186,7 @@ object CarbonEnv { .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener) .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener) .addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener) + .addListener(classOf[DropCacheEvent], DropCachePreAggEventListener) } /** diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala new file mode 100644 index 00000000000..e955ed90627 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.cache + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.MetadataCommand + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.CacheProvider +import org.apache.carbondata.core.cache.dictionary.AbstractColumnDictionaryInfo +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.datamap.bloom.BloomCacheKeyValue +import org.apache.carbondata.events.{DropCacheEvent, OperationContext, OperationListenerBus} + +case class CarbonDropCacheCommand(tableIdentifier: TableIdentifier, internalCall: Boolean = false) + extends MetadataCommand { + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession) + clearCache(carbonTable, sparkSession) + Seq.empty + } + + def clearCache(carbonTable: CarbonTable, sparkSession: SparkSession): Unit = { + LOGGER.info("Drop cache request received for table " + carbonTable.getTableName) + + val dropCacheEvent = DropCacheEvent( + carbonTable, + sparkSession, + internalCall + ) + val operationContext = new OperationContext + OperationListenerBus.getInstance.fireEvent(dropCacheEvent, operationContext) + + val cache = CacheProvider.getInstance().getCarbonCache + if (cache != null) { + val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + + // Dictionary IDs + val dictIds = carbonTable.getAllDimensions.asScala.filter(_.isGlobalDictionaryEncoding) + .map(_.getColumnId).toArray + + // Remove elements from cache + val keysToRemove = ListBuffer[String]() + val cacheIterator = cache.getCacheMap.entrySet().iterator() + while (cacheIterator.hasNext) { + val entry = cacheIterator.next() + val cache = entry.getValue + + if (cache.isInstanceOf[BlockletDataMapIndexWrapper]) { + // index + val indexPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, + CarbonCommonConstants.FILE_SEPARATOR) + if (indexPath.startsWith(tablePath)) { + keysToRemove += entry.getKey + } + } else if (cache.isInstanceOf[BloomCacheKeyValue.CacheValue]) { + // bloom datamap + val shardPath = entry.getKey.replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, + CarbonCommonConstants.FILE_SEPARATOR) + if (shardPath.contains(tablePath)) { + keysToRemove += entry.getKey + } + } else if (cache.isInstanceOf[AbstractColumnDictionaryInfo]) { + // dictionary + val dictId = dictIds.find(id => entry.getKey.startsWith(id)) + if (dictId.isDefined) { + keysToRemove += entry.getKey + } + } + } + cache.removeAll(keysToRemove.asJava) + } + + LOGGER.info("Drop cache request received for table " + carbonTable.getTableName) + } + + override protected def opName: String = "DROP CACHE" + +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala index e937c32c5af..e5f89d8b9fd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala @@ -66,29 +66,24 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier]) val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase val cache = CacheProvider.getInstance().getCarbonCache() if (cache == null) { - Seq(Row("ALL", "ALL", 0L, 0L, 0L), - Row(currentDatabase, "ALL", 0L, 0L, 0L)) + Seq( + Row("ALL", "ALL", byteCountToDisplaySize(0L), + byteCountToDisplaySize(0L), byteCountToDisplaySize(0L)), + Row(currentDatabase, "ALL", byteCountToDisplaySize(0L), + byteCountToDisplaySize(0L), byteCountToDisplaySize(0L))) } else { - val tableIdents = sparkSession.sessionState.catalog.listTables(currentDatabase).toArray - val dbLocation = CarbonEnv.getDatabaseLocation(currentDatabase, sparkSession) - val tempLocation = dbLocation.replace( - CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR) - val tablePaths = tableIdents.map { tableIdent => - (tempLocation + CarbonCommonConstants.FILE_SEPARATOR + - tableIdent.table + CarbonCommonConstants.FILE_SEPARATOR, - CarbonEnv.getDatabaseName(tableIdent.database)(sparkSession) + "." + tableIdent.table) + val carbonTables = CarbonEnv.getInstance(sparkSession).carbonMetaStore + .listAllTables(sparkSession) + .filter { table => + table.getDatabaseName.equalsIgnoreCase(currentDatabase) + } + val tablePaths = carbonTables + .map { table => + (table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR, + table.getDatabaseName + "." + table.getTableName) } - val dictIds = tableIdents - .map { tableIdent => - var table: CarbonTable = null - try { - table = CarbonEnv.getCarbonTable(tableIdent)(sparkSession) - } catch { - case _ => - } - table - } + val dictIds = carbonTables .filter(_ != null) .flatMap { table => table @@ -159,7 +154,8 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier]) Seq( Row("ALL", "ALL", byteCountToDisplaySize(allIndexSize), byteCountToDisplaySize(allDatamapSize), byteCountToDisplaySize(allDictSize)), - Row(currentDatabase, "ALL", "0", "0", "0")) + Row(currentDatabase, "ALL", byteCountToDisplaySize(0), + byteCountToDisplaySize(0), byteCountToDisplaySize(0))) } else { val tableList = tableMapIndexSize .map(_._1) @@ -187,17 +183,11 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier]) } def showTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = { - val tableName = carbonTable.getTableName - val databaseName = carbonTable.getDatabaseName val cache = CacheProvider.getInstance().getCarbonCache() if (cache == null) { Seq.empty } else { - val dbLocation = CarbonEnv - .getDatabaseLocation(databaseName, sparkSession) - .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR) - val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + - tableName + CarbonCommonConstants.FILE_SEPARATOR + val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR var numIndexFilesCached = 0 // Path -> Name, Type @@ -209,8 +199,10 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier]) datamapSize.put(tablePath, 0) // children tables for( schema <- carbonTable.getTableInfo.getDataMapSchemaList.asScala ) { - val path = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + "_" + - schema.getDataMapName + CarbonCommonConstants.FILE_SEPARATOR + val childTableName = carbonTable.getTableName + "_" + schema.getDataMapName + val childTable = CarbonEnv + .getCarbonTable(Some(carbonTable.getDatabaseName), childTableName)(sparkSession) + val path = childTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR val name = schema.getDataMapName val dmType = schema.getProviderName datamapName.put(path, (name, dmType)) @@ -219,9 +211,7 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier]) // index schemas for (schema <- DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable) .asScala) { - val path = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + - CarbonCommonConstants.FILE_SEPARATOR + schema.getDataMapName + - CarbonCommonConstants.FILE_SEPARATOR + val path = tablePath + schema.getDataMapName + CarbonCommonConstants.FILE_SEPARATOR val name = schema.getDataMapName val dmType = schema.getProviderName datamapName.put(path, (name, dmType)) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala new file mode 100644 index 00000000000..3d03c605bbf --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/DropCachePreAggEventListener.scala @@ -0,0 +1,70 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.command.cache + +import scala.collection.JavaConverters._ + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.catalyst.TableIdentifier + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.events.{DropCacheEvent, Event, OperationContext, + OperationEventListener} + +object DropCachePreAggEventListener extends OperationEventListener { + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext): Unit = { + + event match { + case dropCacheEvent: DropCacheEvent => + val carbonTable = dropCacheEvent.carbonTable + val sparkSession = dropCacheEvent.sparkSession + val internalCall = dropCacheEvent.internalCall + if (carbonTable.isChildDataMap && !internalCall) { + throw new UnsupportedOperationException("Operation not allowed on child table.") + } + + if (carbonTable.hasDataMapSchema) { + val childrenSchemas = carbonTable.getTableInfo.getDataMapSchemaList.asScala + .filter(_.getRelationIdentifier != null) + for (childSchema <- childrenSchemas) { + val childTable = + CarbonEnv.getCarbonTable( + TableIdentifier(childSchema.getRelationIdentifier.getTableName, + Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession) + val dropCacheCommandForChildTable = + CarbonDropCacheCommand( + TableIdentifier(childTable.getTableName, Some(childTable.getDatabaseName)), + internalCall = true) + dropCacheCommandForChildTable.processMetadata(sparkSession) + } + } + } + + } +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index a2923b8ff08..5f5cc12e6e1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.types.StructField import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.execution.command.cache.CarbonShowCacheCommand +import org.apache.spark.sql.execution.command.cache.{CarbonDropCacheCommand, CarbonShowCacheCommand} import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand, CarbonShowStreamsCommand} import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils @@ -95,7 +95,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { createStream | dropStream | showStreams protected lazy val cacheManagement: Parser[LogicalPlan] = - showCache + showCache | dropCache protected lazy val alterAddPartition: Parser[LogicalPlan] = ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~> @@ -503,6 +503,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonShowCacheCommand(table) } + protected lazy val dropCache: Parser[LogicalPlan] = + DROP ~> METACACHE ~> ontable <~ opt(";") ^^ { + case table => + CarbonDropCacheCommand(table) + } + protected lazy val cli: Parser[LogicalPlan] = (CARBONCLI ~> FOR ~> TABLE) ~> (ident <~ ".").? ~ ident ~ (OPTIONS ~> "(" ~> commandOptions <~ ")").? <~