Skip to content

Commit

Permalink
[CARBONDATA-3305] Added DDL to drop cache for a table
Browse files Browse the repository at this point in the history
Added CarbonDropCacheCommand to drop all cache entries for a particular table.

usage: DROP METACACHE ON TABLE tableName
Dropping cache for child table is not supported, the table has to be parent table.
Running the above command will clear all the entries belonging to the table,
its index entries, its datamap entries and it's forward and reverse dictionary.

This closes #3138
  • Loading branch information
NamanRastogi authored and kunal642 committed Mar 7, 2019
1 parent 718be37 commit 3f98c51
Show file tree
Hide file tree
Showing 11 changed files with 473 additions and 41 deletions.
Expand Up @@ -62,10 +62,13 @@ public final class CarbonLRUCache {
*/
public CarbonLRUCache(String propertyName, String defaultPropertyName) {
try {
lruCacheMemorySize = Integer
.parseInt(CarbonProperties.getInstance().getProperty(propertyName, defaultPropertyName));
lruCacheMemorySize = Long
.parseLong(CarbonProperties.getInstance().getProperty(propertyName, defaultPropertyName));
} catch (NumberFormatException e) {
lruCacheMemorySize = Integer.parseInt(defaultPropertyName);
LOGGER.error(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE
+ " is not in a valid format. Falling back to default value: "
+ CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT);
lruCacheMemorySize = Long.parseLong(defaultPropertyName);
}
initCache();
if (lruCacheMemorySize > 0) {
Expand Down Expand Up @@ -148,6 +151,17 @@ public void remove(String key) {
}
}

/**
* @param keys
*/
public void removeAll(List<String> keys) {
synchronized (lruCacheMap) {
for (String key : keys) {
removeKey(key);
}
}
}

/**
* This method will remove the key from lru cache
*
Expand Down Expand Up @@ -302,6 +316,9 @@ public Cacheable get(String key) {
*/
public void clear() {
synchronized (lruCacheMap) {
for (Cacheable cachebleObj : lruCacheMap.values()) {
cachebleObj.invalidate();
}
lruCacheMap.clear();
}
}
Expand Down
13 changes: 11 additions & 2 deletions docs/ddl-of-carbondata.md
Expand Up @@ -1095,18 +1095,27 @@ Users can specify which columns to include and exclude for local dictionary gene
about current cache used status in memory through the following command:

```sql
SHOW METADATA
SHOW METACACHE
```

This shows the overall memory consumed in the cache by categories - index files, dictionary and
datamaps. This also shows the cache usage by all the tables and children tables in the current
database.

```sql
SHOW METADATA ON TABLE tableName
SHOW METACACHE ON TABLE tableName
```

This shows detailed information on cache usage by the table `tableName` and its carbonindex files,
its dictionary files, its datamaps and children tables.

This command is not allowed on child tables.

```sql
DROP METACACHE ON TABLE tableName
```

This clears any entry in cache by the table `tableName`, its carbonindex files,
its dictionary files, its datamaps and children tables.

This command is not allowed on child tables.
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.sql.commands

import java.util

import scala.collection.JavaConverters._

import org.apache.spark.sql.CarbonEnv
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.CarbonCommonConstants

class TestCarbonDropCacheCommand extends QueryTest with BeforeAndAfterAll {

val dbName = "cache_db"

override protected def beforeAll(): Unit = {
sql(s"DROP DATABASE IF EXISTS $dbName CASCADE")
sql(s"CREATE DATABASE $dbName")
sql(s"USE $dbName")
}

override protected def afterAll(): Unit = {
sql(s"use default")
sql(s"DROP DATABASE $dbName CASCADE")
}


test("Test dictionary") {
val tableName = "t1"

sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int,utilization int, salary int) stored by 'carbondata' " +
s"TBLPROPERTIES('DICTIONARY_INCLUDE'='designation, workgroupcategoryname')")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
sql(s"SELECT * FROM $tableName").collect()

val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())

sql(s"DROP METACACHE ON TABLE $tableName")

val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
droppedCacheKeys.removeAll(cacheAfterDrop)

val tableIdentifier = new TableIdentifier(tableName, Some(dbName))
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
val dictIds = carbonTable.getAllDimensions.asScala.filter(_.isGlobalDictionaryEncoding)
.map(_.getColumnId).toArray

// Check if table index entries are dropped
assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath)))

// check if cache does not have any more table index entries
assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath)))

// check if table dictionary entries are dropped
for (dictId <- dictIds) {
assert(droppedCacheKeys.asScala.exists(key => key.contains(dictId)))
}

// check if cache does not have any more table dictionary entries
for (dictId <- dictIds) {
assert(!cacheAfterDrop.asScala.exists(key => key.contains(dictId)))
}
}


test("Test preaggregate datamap") {
val tableName = "t2"

sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int, utilization int, salary int) stored by 'carbondata'")
sql(s"CREATE DATAMAP dpagg ON TABLE $tableName USING 'preaggregate' AS " +
s"SELECT AVG(salary), workgroupcategoryname from $tableName GROUP BY workgroupcategoryname")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
sql(s"SELECT * FROM $tableName").collect()
sql(s"SELECT AVG(salary), workgroupcategoryname from $tableName " +
s"GROUP BY workgroupcategoryname").collect()
val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())

sql(s"DROP METACACHE ON TABLE $tableName")

val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
droppedCacheKeys.removeAll(cacheAfterDrop)

val tableIdentifier = new TableIdentifier(tableName, Some(dbName))
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
val dbPath = CarbonEnv
.getDatabaseLocation(tableIdentifier.database.get, sqlContext.sparkSession)
val tablePath = carbonTable.getTablePath
val preaggPath = dbPath + CarbonCommonConstants.FILE_SEPARATOR + carbonTable.getTableName +
"_" + carbonTable.getTableInfo.getDataMapSchemaList.get(0).getDataMapName +
CarbonCommonConstants.FILE_SEPARATOR

// Check if table index entries are dropped
assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath)))

// check if cache does not have any more table index entries
assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath)))

// Check if preaggregate index entries are dropped
assert(droppedCacheKeys.asScala.exists(key => key.startsWith(preaggPath)))

// check if cache does not have any more preaggregate index entries
assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(preaggPath)))
}


test("Test bloom filter") {
val tableName = "t3"

sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int, utilization int, salary int) stored by 'carbondata'")
sql(s"CREATE DATAMAP dblom ON TABLE $tableName USING 'bloomfilter' " +
"DMPROPERTIES('INDEX_COLUMNS'='deptno')")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
sql(s"SELECT * FROM $tableName").collect()
sql(s"SELECT * FROM $tableName WHERE deptno=10").collect()

val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())

sql(s"DROP METACACHE ON TABLE $tableName")

val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
droppedCacheKeys.removeAll(cacheAfterDrop)

val tableIdentifier = new TableIdentifier(tableName, Some(dbName))
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
val tablePath = carbonTable.getTablePath
val bloomPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR + "dblom" +
CarbonCommonConstants.FILE_SEPARATOR

// Check if table index entries are dropped
assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath)))

// check if cache does not have any more table index entries
assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath)))

// Check if bloom entries are dropped
assert(droppedCacheKeys.asScala.exists(key => key.contains(bloomPath)))

// check if cache does not have any more bloom entries
assert(!cacheAfterDrop.asScala.exists(key => key.contains(bloomPath)))
}


test("Test preaggregate datamap fail") {
val tableName = "t4"

sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int, utilization int, salary int) stored by 'carbondata'")
sql(s"CREATE DATAMAP dpagg ON TABLE $tableName USING 'preaggregate' AS " +
s"SELECT AVG(salary), workgroupcategoryname from $tableName GROUP BY workgroupcategoryname")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
sql(s"SELECT * FROM $tableName").collect()
sql(s"SELECT AVG(salary), workgroupcategoryname from $tableName " +
s"GROUP BY workgroupcategoryname").collect()

val fail_message = intercept[UnsupportedOperationException] {
sql(s"DROP METACACHE ON TABLE ${tableName}_dpagg")
}.getMessage
assert(fail_message.contains("Operation not allowed on child table."))
}


def clone(oldSet: util.Set[String]): util.HashSet[String] = {
val newSet = new util.HashSet[String]
newSet.addAll(oldSet)
newSet
}
}
Expand Up @@ -128,7 +128,7 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
sql("use cache_empty_db").collect()
val result1 = sql("show metacache").collect()
assertResult(2)(result1.length)
assertResult(Row("cache_empty_db", "ALL", "0", "0", "0"))(result1(1))
assertResult(Row("cache_empty_db", "ALL", "0 bytes", "0 bytes", "0 bytes"))(result1(1))

sql("use cache_db").collect()
val result2 = sql("show metacache").collect()
Expand Down
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.events

import org.apache.spark.sql.SparkSession

import org.apache.carbondata.core.metadata.schema.table.CarbonTable

case class DropCacheEvent(
carbonTable: CarbonTable,
sparkSession: SparkSession,
internalCall: Boolean)
extends Event with DropCacheEventInfo
Expand Up @@ -62,6 +62,13 @@ trait DropTableEventInfo {
val ifExistsSet: Boolean
}

/**
* event for drop cache
*/
trait DropCacheEventInfo {
val carbonTable: CarbonTable
}

/**
* event for alter_table_drop_column
*/
Expand Down
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
import org.apache.spark.sql.execution.command.cache.DropCachePreAggEventListener
import org.apache.spark.sql.execution.command.preaaggregate._
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
import org.apache.spark.sql.hive._
Expand Down Expand Up @@ -185,6 +186,7 @@ object CarbonEnv {
.addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener)
.addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
.addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
.addListener(classOf[DropCacheEvent], DropCachePreAggEventListener)
}

/**
Expand Down

0 comments on commit 3f98c51

Please sign in to comment.