Skip to content

Commit

Permalink
[CARBONDATA-3318] Added PreAgg & Bloom Event-Listener for ShowCacheCo…
Browse files Browse the repository at this point in the history
…mmmand

Decoupling of Cache Commands
1. Added PreAgg Event-Listener for ShowCacheCommmand
2. Added Bloom Event-Listener for ShowCacheCommmand
3. Added PreAgg Event-Listener for DropCacheCommand
4. Added Bloom Event-Listener for DropCacheCommmand
5. Updated doc
6. Support external table
6.1 display external table in comments/with table name
6.2 count the index files for external table

This closes #3146
  • Loading branch information
NamanRastogi authored and kunal642 committed Mar 21, 2019
1 parent 66982f3 commit 2980803
Show file tree
Hide file tree
Showing 13 changed files with 581 additions and 346 deletions.
Expand Up @@ -35,7 +35,7 @@ public static class CacheKey implements Serializable {
private String shardPath;
private String indexColumn;

CacheKey(String shardPath, String indexColumn) {
public CacheKey(String shardPath, String indexColumn) {
this.shardPath = shardPath;
this.indexColumn = indexColumn;
}
Expand Down
Expand Up @@ -227,7 +227,8 @@ public DataMapBuilder createBuilder(Segment segment, String shardName,
* returns all shard directories of bloom index files for query
* if bloom index files are merged we should get only one shard path
*/
private Set<String> getAllShardPaths(String tablePath, String segmentId) {
public static Set<String> getAllShardPaths(String tablePath, String segmentId,
String dataMapName) {
String dataMapStorePath = CarbonTablePath.getDataMapStorePath(
tablePath, segmentId, dataMapName);
CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
Expand Down Expand Up @@ -257,7 +258,8 @@ public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException
try {
Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
if (shardPaths == null) {
shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
shardPaths =
getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
segmentMap.put(segment.getSegmentNo(), shardPaths);
}
Set<String> filteredShards = segment.getFilteredIndexShardNames();
Expand Down Expand Up @@ -299,7 +301,8 @@ public List<DataMapDistributable> toDistributable(Segment segment) {
List<DataMapDistributable> dataMapDistributableList = new ArrayList<>();
Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
if (shardPaths == null) {
shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
shardPaths =
getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
segmentMap.put(segment.getSegmentNo(), shardPaths);
}
Set<String> filteredShards = segment.getFilteredIndexShardNames();
Expand Down
13 changes: 13 additions & 0 deletions docs/ddl-of-carbondata.md
Expand Up @@ -1119,3 +1119,16 @@ Users can specify which columns to include and exclude for local dictionary gene
its dictionary files, its datamaps and children tables.

This command is not allowed on child tables.

### Important points

1. Cache information is updated only after the select query is executed.

2. In case of alter table the already loaded cache is invalidated when any subsequent select query
is fired.

3. Dictionary is loaded in cache only when the dictionary columns are queried upon. If we don't do
direct query on dictionary column, cache will not be loaded.
If we do `SELECT * FROM t1`, and even though for this case dictionary is loaded, it is loaded in
executor and not on driver, and the final result rows are returned back to driver, and thus will
produce no trace on driver cache if we do `SHOW METACACHE` or `SHOW METACACHE ON TABLE t1`.
Expand Up @@ -17,10 +17,14 @@

package org.apache.carbondata.sql.commands

import org.apache.spark.sql.Row
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.junit.Assert
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory

class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
override protected def beforeAll(): Unit = {
// use new database
Expand Down Expand Up @@ -133,6 +137,28 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
}

test("test external table show cache") {
sql(s"CREATE TABLE employeeTable(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int, utilization int, salary int) stored by 'carbondata'")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE employeeTable")
val table = CarbonEnv.getCarbonTable(Some("default"), "employeeTable")(sqlContext.sparkSession)
val location = FileFactory
.getUpdatedFilePath(
table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "/Fact/Part0/Segment_0")
sql(s"CREATE EXTERNAL TABLE extTable stored as carbondata LOCATION '${location}'")
sql("select * from extTable").show()
val rows = sql("SHOW METACACHE ON TABLE extTable").collect()
var isPresent = false
rows.foreach(row => {
if (row.getString(2).equalsIgnoreCase("1/1 index files cached (external table)")){
isPresent = true
}
})
Assert.assertTrue(isPresent)
}

override protected def afterAll(): Unit = {
sql("use default").collect()
dropTable
Expand All @@ -145,42 +171,63 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS default.cache_4")
sql("DROP TABLE IF EXISTS default.cache_5")
sql("DROP TABLE IF EXISTS empTable")
sql("DROP TABLE IF EXISTS employeeTable")
sql("DROP TABLE IF EXISTS extTable")
}

test("show cache") {

// Empty database
sql("use cache_empty_db").collect()
val result1 = sql("show metacache").collect()
assertResult(2)(result1.length)
assertResult(Row("cache_empty_db", "ALL", "0 B", "0 B", "0 B"))(result1(1))

// Database with 3 tables but only 2 are in cache
sql("use cache_db").collect()
val result2 = sql("show metacache").collect()
assertResult(4)(result2.length)

// Make sure PreAgg tables are not in SHOW METADATA
sql("use default").collect()
val result3 = sql("show metacache").collect()
val dataMapCacheInfo = result3
.map(row => row.getString(1))
.filter(table => table.equals("cache_4_cache_4_count"))
assertResult(1)(dataMapCacheInfo.length)
assertResult(0)(dataMapCacheInfo.length)
}

test("show metacache on table") {
sql("use cache_db").collect()

// Table with Index, Dictionary & Bloom filter
val result1 = sql("show metacache on table cache_1").collect()
assertResult(3)(result1.length)
assertResult("1/1 index files cached")(result1(0).getString(2))
assertResult("bloomfilter")(result1(2).getString(2))

// Table with Index and Dictionary
val result2 = sql("show metacache on table cache_db.cache_2").collect()
assertResult(2)(result2.length)
assertResult("2/2 index files cached")(result2(0).getString(2))
assertResult("0 B")(result2(1).getString(1))

// Table not in cache
checkAnswer(sql("show metacache on table cache_db.cache_3"),
Seq(Row("Index", "0 B", "0/1 index files cached"), Row("Dictionary", "0 B", "")))

// Table with Index, Dictionary & PreAgg child table
val result4 = sql("show metacache on table default.cache_4").collect()
assertResult(3)(result4.length)
assertResult("1/1 index files cached")(result4(0).getString(2))
assertResult("0 B")(result4(1).getString(1))
assertResult("preaggregate")(result4(2).getString(2))

sql("use default").collect()

// Table with 5 index files
val result5 = sql("show metacache on table cache_5").collect()
assertResult(2)(result5.length)
assertResult("5/5 index files cached")(result5(0).getString(2))
}
}
Expand Up @@ -21,8 +21,15 @@ import org.apache.spark.sql.SparkSession

import org.apache.carbondata.core.metadata.schema.table.CarbonTable

case class DropCacheEvent(
case class DropTableCacheEvent(
carbonTable: CarbonTable,
sparkSession: SparkSession,
internalCall: Boolean)
extends Event with DropCacheEventInfo
extends Event with DropTableCacheEventInfo


case class ShowTableCacheEvent(
carbonTable: CarbonTable,
sparkSession: SparkSession,
internalCall: Boolean)
extends Event with ShowTableCacheEventInfo
Expand Up @@ -62,10 +62,17 @@ trait DropTableEventInfo {
val ifExistsSet: Boolean
}

/**
* event for show cache
*/
trait ShowTableCacheEventInfo {
val carbonTable: CarbonTable
}

/**
* event for drop cache
*/
trait DropCacheEventInfo {
trait DropTableCacheEventInfo {
val carbonTable: CarbonTable
}

Expand Down
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
import org.apache.spark.sql.execution.command.cache.DropCachePreAggEventListener
import org.apache.spark.sql.execution.command.cache._
import org.apache.spark.sql.execution.command.preaaggregate._
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
import org.apache.spark.sql.hive._
Expand Down Expand Up @@ -186,7 +186,10 @@ object CarbonEnv {
.addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener)
.addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
.addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
.addListener(classOf[DropCacheEvent], DropCachePreAggEventListener)
.addListener(classOf[DropTableCacheEvent], DropCachePreAggEventListener)
.addListener(classOf[DropTableCacheEvent], DropCacheBloomEventListener)
.addListener(classOf[ShowTableCacheEvent], ShowCachePreAggEventListener)
.addListener(classOf[ShowTableCacheEvent], ShowCacheBloomEventListener)
}

/**
Expand Down
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command.cache

import org.apache.hadoop.mapred.JobConf
import scala.collection.JavaConverters._

import org.apache.carbondata.core.cache.CacheType
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
import org.apache.carbondata.datamap.bloom.{BloomCacheKeyValue, BloomCoarseGrainDataMapFactory}
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil


object CacheUtil {

/**
* Given a carbonTable, returns the list of all carbonindex files
*
* @param carbonTable
* @return List of all index files
*/
def getAllIndexFiles(carbonTable: CarbonTable): List[String] = {
if (carbonTable.isTransactionalTable) {
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
CarbonDataMergerUtil.getValidSegmentList(absoluteTableIdentifier).asScala.flatMap {
segment =>
segment.getCommittedIndexFile.keySet().asScala
}.toList
} else {
val tablePath = carbonTable.getTablePath
val readCommittedScope = new LatestFilesReadCommittedScope(tablePath,
FileFactory.getConfiguration)
readCommittedScope.getSegmentList.flatMap {
load =>
val seg = new Segment(load.getLoadName, null, readCommittedScope)
seg.getCommittedIndexFile.keySet().asScala
}.toList
}
}

/**
* Given a carbonTable file, returns a list of all dictionary entries which can be in cache
*
* @param carbonTable
* @return List of all dict entries which can in cache
*/
def getAllDictCacheKeys(carbonTable: CarbonTable): List[String] = {
def getDictCacheKey(columnIdentifier: String,
cacheType: CacheType[_, _]): String = {
columnIdentifier + CarbonCommonConstants.UNDERSCORE + cacheType.getCacheName
}

carbonTable.getAllDimensions.asScala
.collect {
case dict if dict.isGlobalDictionaryEncoding =>
Seq(getDictCacheKey(dict.getColumnId, CacheType.FORWARD_DICTIONARY),
getDictCacheKey(dict.getColumnId, CacheType.REVERSE_DICTIONARY))
}.flatten.toList
}

def getBloomCacheKeys(carbonTable: CarbonTable, datamap: DataMapSchema): List[String] = {
val segments = CarbonDataMergerUtil
.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala

// Generate shard Path for the datamap
val shardPaths = segments.flatMap {
segment =>
BloomCoarseGrainDataMapFactory.getAllShardPaths(carbonTable.getTablePath,
segment.getSegmentNo, datamap.getDataMapName).asScala
}

// get index columns
val indexColumns = carbonTable.getIndexedColumns(datamap).asScala.map {
entry =>
entry.getColName
}

// generate cache key using shard path and index columns on which bloom was created.
val datamapKeys = shardPaths.flatMap {
shardPath =>
indexColumns.map {
indexCol =>
new BloomCacheKeyValue.CacheKey(shardPath, indexCol).toString
}
}
datamapKeys.toList
}

}

0 comments on commit 2980803

Please sign in to comment.