Skip to content

Commit

Permalink
Merge f61d3bb into 9854f20
Browse files Browse the repository at this point in the history
  • Loading branch information
NamanRastogi committed Mar 25, 2019
2 parents 9854f20 + f61d3bb commit fe2eb02
Showing 1 changed file with 24 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.command.MetadataCommand
import org.apache.spark.sql.execution.command.{Checker, MetadataCommand}
import org.apache.spark.sql.types.StringType

import org.apache.carbondata.core.cache.{CacheProvider, CacheType}
Expand Down Expand Up @@ -64,7 +65,7 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],

def getAllTablesCache(sparkSession: SparkSession): Seq[Row] = {
val currentDatabase = sparkSession.sessionState.catalog.getCurrentDatabase
val cache = CacheProvider.getInstance().getCarbonCache()
val cache = CacheProvider.getInstance().getCarbonCache
if (cache == null) {
Seq(
Row("ALL", "ALL", 0L, 0L, 0L),
Expand All @@ -74,6 +75,7 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
.listAllTables(sparkSession).filter {
carbonTable =>
carbonTable.getDatabaseName.equalsIgnoreCase(currentDatabase) &&
isValidTable(carbonTable, sparkSession) &&
!carbonTable.isChildDataMap
}

Expand Down Expand Up @@ -131,15 +133,26 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],

def getTableCache(sparkSession: SparkSession, carbonTable: CarbonTable): Seq[Row] = {
val cache = CacheProvider.getInstance().getCarbonCache
val allIndexFiles: List[String] = CacheUtil.getAllIndexFiles(carbonTable)
if (cache == null) {
var comments = 0 + "/" + allIndexFiles.size + " index files cached"
if (!carbonTable.isTransactionalTable) {
comments += " (external table)"
}
return Seq(
Row("Index", 0L, comments),
Row("Dictionary", 0L, "")
)
}

val showTableCacheEvent = ShowTableCacheEvent(carbonTable, sparkSession, internalCall)
val operationContext = new OperationContext
// datamapName -> (datamapProviderName, indexSize, datamapSize)
val currentTableSizeMap = scala.collection.mutable.Map[String, (String, String, Long, Long)]()
operationContext.setProperty(carbonTable.getTableUniqueName, currentTableSizeMap)
OperationListenerBus.getInstance.fireEvent(showTableCacheEvent, operationContext)

// Get all Index files for the specified table.
val allIndexFiles: List[String] = CacheUtil.getAllIndexFiles(carbonTable)
// Get all Index files for the specified table in cache
val indexFilesInCache: List[String] = allIndexFiles.filter {
indexFile =>
cache.get(indexFile) != null
Expand Down Expand Up @@ -190,9 +203,8 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
* Assemble result for table
*/
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier.get)(sparkSession)
if (CacheProvider.getInstance().getCarbonCache == null) {
return Seq.empty
}
Checker
.validateTableExists(tableIdentifier.get.database, tableIdentifier.get.table, sparkSession)
val rawResult = getTableCache(sparkSession, carbonTable)
val result = rawResult.slice(0, 2) ++
rawResult.drop(2).map {
Expand All @@ -205,4 +217,9 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
}
}
}

def isValidTable(carbonTable: CarbonTable, sparkSession: SparkSession): Boolean = {
CarbonEnv.getInstance(sparkSession).carbonMetaStore.tableExists(carbonTable.getTableName,
Some(carbonTable.getDatabaseName))(sparkSession)
}
}

0 comments on commit fe2eb02

Please sign in to comment.