Skip to content

Commit

Permalink
Merge 78e81fb into 718be37
Browse files Browse the repository at this point in the history
  • Loading branch information
NamanRastogi committed Mar 6, 2019
2 parents 718be37 + 78e81fb commit d034bc7
Show file tree
Hide file tree
Showing 9 changed files with 447 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ public final class CarbonLRUCache {
*/
public CarbonLRUCache(String propertyName, String defaultPropertyName) {
try {
lruCacheMemorySize = Integer
.parseInt(CarbonProperties.getInstance().getProperty(propertyName, defaultPropertyName));
lruCacheMemorySize = Long
.parseLong(CarbonProperties.getInstance().getProperty(propertyName, defaultPropertyName));
} catch (NumberFormatException e) {
lruCacheMemorySize = Integer.parseInt(defaultPropertyName);
LOGGER.error(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE
+ " is not in a valid format. Falling back to default value: "
+ CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT);
lruCacheMemorySize = Long.parseLong(defaultPropertyName);
}
initCache();
if (lruCacheMemorySize > 0) {
Expand Down Expand Up @@ -148,6 +151,17 @@ public void remove(String key) {
}
}

/**
* @param keys
*/
public void removeAll(List<String> keys) {
synchronized (lruCacheMap) {
for (String key : keys) {
removeKey(key);
}
}
}

/**
* This method will remove the key from lru cache
*
Expand Down Expand Up @@ -302,6 +316,9 @@ public Cacheable get(String key) {
*/
public void clear() {
synchronized (lruCacheMap) {
for (Cacheable cachebleObj : lruCacheMap.values()) {
cachebleObj.invalidate();
}
lruCacheMap.clear();
}
}
Expand Down
13 changes: 11 additions & 2 deletions docs/ddl-of-carbondata.md
Original file line number Diff line number Diff line change
Expand Up @@ -1095,18 +1095,27 @@ Users can specify which columns to include and exclude for local dictionary gene
about current cache used status in memory through the following command:

```sql
SHOW METADATA
SHOW METACACHE
```

This shows the overall memory consumed in the cache by categories - index files, dictionary and
datamaps. This also shows the cache usage by all the tables and children tables in the current
database.

```sql
SHOW METADATA ON TABLE tableName
SHOW METACACHE ON TABLE tableName
```

This shows detailed information on cache usage by the table `tableName` and its carbonindex files,
its dictionary files, its datamaps and children tables.

This command is not allowed on child tables.

```sql
DROP METACACHE ON TABLE tableName
```

This clears any entry in cache by the table `tableName`, its carbonindex files,
its dictionary files, its datamaps and children tables.

This command is not allowed on child tables.
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.sql.commands

import java.util

import scala.collection.JavaConverters._

import org.apache.spark.sql.CarbonEnv
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.CarbonCommonConstants

class TestCarbonDropCacheCommand extends QueryTest with BeforeAndAfterAll {

val dbName = "cache_db"

override protected def beforeAll(): Unit = {
sql(s"DROP DATABASE IF EXISTS $dbName CASCADE")
sql(s"CREATE DATABASE $dbName")
sql(s"USE $dbName")
}

override protected def afterAll(): Unit = {
sql(s"use default")
sql(s"DROP DATABASE $dbName CASCADE")
}


test("Test dictionary") {
val tableName = "t1"

sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int,utilization int, salary int) stored by 'carbondata' " +
s"TBLPROPERTIES('DICTIONARY_INCLUDE'='designation, workgroupcategoryname')")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
sql(s"SELECT * FROM $tableName").collect()

val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())

sql(s"DROP METACACHE ON TABLE $tableName")

val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
droppedCacheKeys.removeAll(cacheAfterDrop)

val tableIdentifier = new TableIdentifier(tableName, Some(dbName))
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
val tablePath = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR
val dictIds = carbonTable.getAllDimensions.asScala.filter(_.isGlobalDictionaryEncoding)
.map(_.getColumnId).toArray

// Check if table index entries are dropped
assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath)))

// check if cache does not have any more table index entries
assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath)))

// check if table dictionary entries are dropped
for (dictId <- dictIds) {
assert(droppedCacheKeys.asScala.exists(key => key.contains(dictId)))
}

// check if cache does not have any more table dictionary entries
for (dictId <- dictIds) {
assert(!cacheAfterDrop.asScala.exists(key => key.contains(dictId)))
}
}


test("Test preaggregate datamap") {
val tableName = "t2"

sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int, utilization int, salary int) stored by 'carbondata'")
sql(s"CREATE DATAMAP dpagg ON TABLE $tableName USING 'preaggregate' AS " +
s"SELECT AVG(salary), workgroupcategoryname from $tableName GROUP BY workgroupcategoryname")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
sql(s"SELECT * FROM $tableName").collect()
sql(s"SELECT AVG(salary), workgroupcategoryname from $tableName " +
s"GROUP BY workgroupcategoryname").collect()
val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())

sql(s"DROP METACACHE ON TABLE $tableName")

val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
droppedCacheKeys.removeAll(cacheAfterDrop)

val tableIdentifier = new TableIdentifier(tableName, Some(dbName))
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
val dbPath = CarbonEnv
.getDatabaseLocation(tableIdentifier.database.get, sqlContext.sparkSession)
val tablePath = carbonTable.getTablePath
val preaggPath = dbPath + CarbonCommonConstants.FILE_SEPARATOR + carbonTable.getTableName +
"_" + carbonTable.getTableInfo.getDataMapSchemaList.get(0).getDataMapName +
CarbonCommonConstants.FILE_SEPARATOR

// Check if table index entries are dropped
assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath)))

// check if cache does not have any more table index entries
assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath)))

// Check if preaggregate index entries are dropped
assert(droppedCacheKeys.asScala.exists(key => key.startsWith(preaggPath)))

// check if cache does not have any more preaggregate index entries
assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(preaggPath)))
}


test("Test bloom filter") {
val tableName = "t3"

sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int, utilization int, salary int) stored by 'carbondata'")
sql(s"CREATE DATAMAP dblom ON TABLE $tableName USING 'bloomfilter' " +
"DMPROPERTIES('INDEX_COLUMNS'='deptno')")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
sql(s"SELECT * FROM $tableName").collect()
sql(s"SELECT * FROM $tableName WHERE deptno=10").collect()

val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())

sql(s"DROP METACACHE ON TABLE $tableName")

val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
droppedCacheKeys.removeAll(cacheAfterDrop)

val tableIdentifier = new TableIdentifier(tableName, Some(dbName))
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
val tablePath = carbonTable.getTablePath
val bloomPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR + "dblom" +
CarbonCommonConstants.FILE_SEPARATOR

// Check if table index entries are dropped
assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath)))

// check if cache does not have any more table index entries
assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath)))

// Check if bloom entries are dropped
assert(droppedCacheKeys.asScala.exists(key => key.contains(bloomPath)))

// check if cache does not have any more bloom entries
assert(!cacheAfterDrop.asScala.exists(key => key.contains(bloomPath)))
}


test("Test preaggregate datamap fail") {
val tableName = "t4"

sql(s"CREATE TABLE $tableName(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int, utilization int, salary int) stored by 'carbondata'")
sql(s"CREATE DATAMAP dpagg ON TABLE $tableName USING 'preaggregate' AS " +
s"SELECT AVG(salary), workgroupcategoryname from $tableName GROUP BY workgroupcategoryname")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE $tableName")
sql(s"SELECT * FROM $tableName").collect()
sql(s"SELECT AVG(salary), workgroupcategoryname from $tableName " +
s"GROUP BY workgroupcategoryname").collect()

val fail_message = intercept[UnsupportedOperationException] {
sql(s"DROP METACACHE ON TABLE ${tableName}_dpagg")
}.getMessage
assert(fail_message.contains("Operation not allowed on child table."))
}


def clone(oldSet: util.Set[String]): util.HashSet[String] = {
val newSet = new util.HashSet[String]
newSet.addAll(oldSet)
newSet
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.events

import org.apache.spark.sql.SparkSession

import org.apache.carbondata.core.metadata.schema.table.CarbonTable

case class DropCacheEvent(
carbonTable: CarbonTable,
sparkSession: SparkSession,
internalCall: Boolean)
extends Event with DropCacheEventInfo
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ trait DropTableEventInfo {
val ifExistsSet: Boolean
}

/**
* event for drop cache
*/
trait DropCacheEventInfo {
val carbonTable: CarbonTable
}

/**
* event for alter_table_drop_column
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
import org.apache.spark.sql.execution.command.cache.DropCachePreAggEventListener
import org.apache.spark.sql.execution.command.preaaggregate._
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
import org.apache.spark.sql.hive._
Expand Down Expand Up @@ -185,6 +186,7 @@ object CarbonEnv {
.addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener)
.addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
.addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
.addListener(classOf[DropCacheEvent], DropCachePreAggEventListener)
}

/**
Expand Down
Loading

0 comments on commit d034bc7

Please sign in to comment.