Skip to content

Commit

Permalink
[CARBONDATA-3398]Fix drop metacache on table having mv datamap
Browse files Browse the repository at this point in the history
Fixed drop metacache on table having mv datamap

This closes #3274
  • Loading branch information
Indhumathi27 authored and kunal642 committed Jun 24, 2019
1 parent 0d32c6b commit bdb50ed
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 26 deletions.
Expand Up @@ -17,11 +17,16 @@

package org.apache.carbondata.mv.rewrite

import org.apache.spark.sql.Row
import scala.collection.JavaConverters._
import java.util

import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterEach

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.spark.exception.ProcessMetaDataException
Expand Down Expand Up @@ -535,5 +540,46 @@ class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach {
}.getMessage.contains("Operation not allowed on child table.")
}

test("drop meta cache on mv datamap table") {
sql("drop table IF EXISTS maintable")
sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
sql("insert into table maintable select 'abc',21,2000")
sql("drop datamap if exists dm ")
sql("create datamap dm using 'mv' as select name, sum(price) from maintable group by name")
sql("select name, sum(price) from maintable group by name").collect()
val droppedCacheKeys = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())

sql("drop metacache on table maintable").show(false)

val cacheAfterDrop = clone(CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet())
droppedCacheKeys.removeAll(cacheAfterDrop)

val tableIdentifier = new TableIdentifier("maintable", Some("default"))
val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
val dbPath = CarbonEnv
.getDatabaseLocation(tableIdentifier.database.get, sqlContext.sparkSession)
val tablePath = carbonTable.getTablePath
val mvPath = dbPath + CarbonCommonConstants.FILE_SEPARATOR + "dm_table" +
CarbonCommonConstants.FILE_SEPARATOR

// Check if table index entries are dropped
assert(droppedCacheKeys.asScala.exists(key => key.startsWith(tablePath)))

// check if cache does not have any more table index entries
assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(tablePath)))

// Check if mv index entries are dropped
assert(droppedCacheKeys.asScala.exists(key => key.startsWith(mvPath)))

// check if cache does not have any more mv index entries
assert(!cacheAfterDrop.asScala.exists(key => key.startsWith(mvPath)))
}

def clone(oldSet: util.Set[String]): util.HashSet[String] = {
val newSet = new util.HashSet[String]
newSet.addAll(oldSet)
newSet
}

}

Expand Up @@ -191,7 +191,7 @@ object CarbonEnv {
.addListener(classOf[DeleteFromTablePostEvent], LoadPostDataMapListener )
.addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
.addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
.addListener(classOf[DropTableCacheEvent], DropCachePreAggEventListener)
.addListener(classOf[DropTableCacheEvent], DropCacheDataMapEventListener)
.addListener(classOf[DropTableCacheEvent], DropCacheBloomEventListener)
.addListener(classOf[ShowTableCacheEvent], ShowCachePreAggEventListener)
.addListener(classOf[ShowTableCacheEvent], ShowCacheDataMapEventListener)
Expand Down
Expand Up @@ -20,20 +20,19 @@ package org.apache.spark.sql.execution.command.cache
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.CarbonEnv
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.cache.DropCachePreAggEventListener.LOGGER
import org.apache.spark.util.DataMapUtil

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
import org.apache.carbondata.events.{DropTableCacheEvent, Event, OperationContext,
OperationEventListener}
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
import org.apache.carbondata.events.{DropTableCacheEvent, Event, OperationContext, OperationEventListener}
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil

object DropCachePreAggEventListener extends OperationEventListener {
object DropCacheDataMapEventListener extends OperationEventListener {

val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

Expand All @@ -56,25 +55,37 @@ object DropCachePreAggEventListener extends OperationEventListener {
if (carbonTable.hasDataMapSchema) {
val childrenSchemas = carbonTable.getTableInfo.getDataMapSchemaList.asScala
.filter(_.getRelationIdentifier != null)
for (childSchema <- childrenSchemas) {
val childTable =
CarbonEnv.getCarbonTable(
TableIdentifier(childSchema.getRelationIdentifier.getTableName,
Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
try {
val dropCacheCommandForChildTable =
CarbonDropCacheCommand(
TableIdentifier(childTable.getTableName, Some(childTable.getDatabaseName)),
internalCall = true)
dropCacheCommandForChildTable.processMetadata(sparkSession)
}
catch {
case e: Exception =>
LOGGER.warn(
s"Clean cache for PreAgg table ${ childTable.getTableName } failed.", e)
}
}
dropCacheForChildTables(sparkSession, childrenSchemas)
}
if (DataMapUtil.hasMVDataMap(carbonTable)) {
val childrenSchemas = DataMapStoreManager.getInstance
.getDataMapSchemasOfTable(carbonTable).asScala
.filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
!dataMapSchema.isIndexDataMap)
dropCacheForChildTables(sparkSession, childrenSchemas)
}
}
}

private def dropCacheForChildTables(sparkSession: SparkSession,
childrenSchemas: mutable.Buffer[DataMapSchema]): Unit = {
for (childSchema <- childrenSchemas) {
val childTable =
CarbonEnv.getCarbonTable(
TableIdentifier(childSchema.getRelationIdentifier.getTableName,
Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
try {
val dropCacheCommandForChildTable =
CarbonDropCacheCommand(
TableIdentifier(childTable.getTableName, Some(childTable.getDatabaseName)),
internalCall = true)
dropCacheCommandForChildTable.processMetadata(sparkSession)
}
catch {
case e: Exception =>
LOGGER.warn(
s"Clean cache for child table ${ childTable.getTableName } failed.", e)
}
}
}
}
Expand Down

0 comments on commit bdb50ed

Please sign in to comment.