Skip to content

Commit

Permalink
[CARBONDATA-3392] Make LRU mandatory for index server
Browse files Browse the repository at this point in the history
Background:
Currently LRU is optional for the user to configure, but this will raise some concerns in case of index server because the invalid segments have to be constantly removed from the cache in case of update/delete/compaction scenarios.

Therefore if clear segment job is failed then the job would not fail bu there has to be a mechanism to prevent that segment from being in cache forever.

To prevent the above mentioned scenario LRU cache size for executor is a mandatory property for the index server application.

This closes #3222
  • Loading branch information
kunal642 authored and ravipesala committed May 28, 2019
1 parent 2a28dba commit df7339c
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,15 @@ private static void executeClearDataMapJob(DataMapJob dataMapJob,
DistributableDataMapFormat dataMapFormat = new DistributableDataMapFormat(carbonTable,
validAndInvalidSegmentsInfo.getValidSegments(), invalidSegment, true,
dataMapToClear);
dataMapJob.execute(dataMapFormat);
try {
dataMapJob.execute(dataMapFormat);
} catch (Exception e) {
if (dataMapJob.getClass().getName().equalsIgnoreCase(DISTRIBUTED_JOB_NAME)) {
LOGGER.warn("Failed to clear distributed cache.", e);
} else {
throw e;
}
}
}

public static void executeClearDataMapJob(CarbonTable carbonTable, String jobClassName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public static List<TableBlockIndexUniqueIdentifier> getIndexFileIdentifiersFromM
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new ArrayList<>();
String mergeFilePath =
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getMergeIndexFileName();
.getIndexFileName();
segmentIndexFileStore.readMergeFile(mergeFilePath);
List<String> indexFiles =
segmentIndexFileStore.getCarbonMergeFileToIndexFilesMap().get(mergeFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,22 +557,31 @@ segment needs refreshing. same thing need for select count(*) flow also.
}
if (isIUDTable || isUpdateFlow) {
Map<String, Long> blockletToRowCountMap = new HashMap<>();
if (CarbonProperties.getInstance().isDistributedPruningEnabled(table.getDatabaseName(),
table.getTableName())) {
List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit(
getDistributedSplit(table, null, partitions, filteredSegment,
allSegments.getInvalidSegments(), toBeCleanedSegments));
for (InputSplit extendedBlocklet : extendedBlocklets) {
CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
String filePath = blocklet.getFilePath();
String blockName = filePath.substring(filePath.lastIndexOf("/") + 1);
blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName,
(long) blocklet.getDetailInfo().getRowCount());
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) {
try {
List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit(
getDistributedSplit(table, null, partitions, filteredSegment,
allSegments.getInvalidSegments(), toBeCleanedSegments));
for (InputSplit extendedBlocklet : extendedBlocklets) {
CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blocklet.getFilePath(),
(long) blocklet.getDetailInfo().getRowCount());
}
} catch (Exception e) {
// Check if fallback is disabled then directly throw exception otherwise try driver
// pruning.
if (CarbonProperties.getInstance().isFallBackDisabled()) {
throw e;
}
TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
blockletToRowCountMap
.putAll(defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap));
}
} else {
TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
blockletToRowCountMap.putAll(
defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap));
blockletToRowCountMap
.putAll(defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap));
}
// key is the (segmentId","+blockletPath) and key is the row count of that blocklet
for (Map.Entry<String, Long> eachBlocklet : blockletToRowCountMap.entrySet()) {
Expand Down Expand Up @@ -603,8 +612,8 @@ segment needs refreshing. same thing need for select count(*) flow also.
}
} else {
long totalRowCount = 0L;
if (CarbonProperties.getInstance().isDistributedPruningEnabled(table.getDatabaseName(),
table.getTableName())) {
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) {
List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit(
getDistributedSplit(table, null, partitions, filteredSegment,
allSegments.getInvalidSegments(), new ArrayList<String>()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,3 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
}

}

class DistributedClearCacheJob extends AbstractDataMapJob {

val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

override def execute(dataMapFormat: DistributableDataMapFormat): util.List[ExtendedBlocklet] = {
if (LOGGER.isDebugEnabled) {
val messageSize = SizeEstimator.estimate(dataMapFormat)
LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
}
val (response, time) = logTime {
IndexServer.getClient.invalidateCache(dataMapFormat)
new util.ArrayList[ExtendedBlocklet]()
}
LOGGER.info(s"Time taken to get response from server: $time ms")
response
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.datamap.DistributableDataMapFormat
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.ExtendedBlocklet
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.rdd.CarbonRDD
import org.apache.carbondata.spark.util.CarbonScalaUtil

Expand All @@ -60,7 +61,11 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
}

override protected def getPreferredLocations(split: Partition): Seq[String] = {
split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq
if (split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations != null) {
split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq
} else {
Seq()
}
}

override def internalCompute(split: Partition,
Expand Down Expand Up @@ -112,7 +117,10 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
override protected def internalGetPartitions: Array[Partition] = {
val job = Job.getInstance(FileFactory.getConfiguration)
val splits = dataMapFormat.getSplits(job).asScala
if (dataMapFormat.isFallbackJob || splits.isEmpty) {
val isDistributedPruningEnabled = CarbonProperties.getInstance()
.isDistributedPruningEnabled(dataMapFormat.getCarbonTable.getDatabaseName,
dataMapFormat.getCarbonTable.getTableName)
if (!isDistributedPruningEnabled || dataMapFormat.isFallbackJob || splits.isEmpty) {
splits.zipWithIndex.map {
f => new DataMapRDDPartition(id, f._2, f._1)
}.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ trait ServerInterface {
*/
def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet]

/**
* Invalidate the cache for the provided table.
*/
def invalidateCache(request: DistributableDataMapFormat): Unit

/**
* Get the cache size for the specified table.
*/
Expand Down Expand Up @@ -85,6 +80,10 @@ object IndexServer extends ServerInterface {

private val numHandlers: Int = CarbonProperties.getInstance().getNumberOfHandlersForIndexServer

private val isExecutorLRUConfigured: Boolean =
CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE) != null

/**
* Getting sparkSession from ActiveSession because in case of embedded mode the session would
* have already been created whereas in case of distributed mode the session would be
Expand All @@ -101,17 +100,12 @@ object IndexServer extends ServerInterface {
}

def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet] = doAs {
val splits = new DistributedPruneRDD(sparkSession, request).collect()
DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
splits.map(_._2)
}

override def invalidateCache(request: DistributableDataMapFormat): Unit = doAs {
val splits = new DistributedPruneRDD(sparkSession, request).collect()
DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
if (request.isJobToClearDataMaps) {
DistributedRDDUtils.invalidateCache(request.getCarbonTable.getTableUniqueName)
}
splits.map(_._2)
}

override def invalidateSegmentCache(databaseName: String, tableName: String,
Expand All @@ -131,6 +125,9 @@ object IndexServer extends ServerInterface {
throw new RuntimeException(
s"Please set ${ CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER }" +
s" as true to use index server")
} else if (!isExecutorLRUConfigured) {
throw new RuntimeException(s"Executor LRU cache size is not set. Please set using " +
s"${ CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE }")
} else {
createCarbonSession()
LOGGER.info("Starting Index Cache Server")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,14 @@ object CarbonDataRDDFactory {
// Remove compacted segments from executor cache.
if (CarbonProperties.getInstance().isDistributedPruningEnabled(
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)) {
IndexServer.getClient.invalidateSegmentCache(carbonLoadModel.getDatabaseName,
carbonLoadModel.getTableName, compactedSegments.asScala.toArray)
try {
IndexServer.getClient.invalidateSegmentCache(carbonLoadModel.getDatabaseName,
carbonLoadModel.getTableName, compactedSegments.asScala.toArray)
} catch {
case ex: Exception =>
LOGGER.warn(s"Clear cache job has failed for ${carbonLoadModel
.getDatabaseName}.${carbonLoadModel.getTableName}", ex)
}
}
// giving the user his error for telling in the beeline if his triggered table
// compaction is failed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.sql.execution.command.cache

import org.apache.hadoop.mapred.JobConf
import scala.collection.JavaConverters._

import org.apache.log4j.Logger

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.CacheType
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
Expand All @@ -35,6 +37,8 @@ import org.apache.carbondata.processing.merger.CarbonDataMergerUtil

object CacheUtil {

val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

/**
* Given a carbonTable, returns the list of all carbonindex files
*
Expand All @@ -51,8 +55,13 @@ object CacheUtil {
carbonTable.getTableName)) {
val invalidSegmentIds = validAndInvalidSegmentsInfo.getInvalidSegments.asScala
.map(_.getSegmentNo).toArray
IndexServer.getClient.invalidateSegmentCache(carbonTable.getDatabaseName, carbonTable
.getTableName, invalidSegmentIds)
try {
IndexServer.getClient.invalidateSegmentCache(carbonTable.getDatabaseName, carbonTable
.getTableName, invalidSegmentIds)
} catch {
case e: Exception =>
LOGGER.warn("Failed to clear cache from executors. ", e)
}
}
validAndInvalidSegmentsInfo.getValidSegments.asScala.flatMap {
segment =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,21 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
case None => ""
}
val (result, time) = CarbonScalaUtil.logTime {
IndexServer.getClient.showCache(tableUniqueName).map(_.split(":"))
.groupBy(_.head).map { t =>
var sum = 0L
var length = 0
t._2.foreach {
arr =>
sum += arr(2).toLong
length += arr(1).toInt
try {
IndexServer.getClient.showCache(tableUniqueName).map(_.split(":"))
.groupBy(_.head).map { t =>
var sum = 0L
var length = 0
t._2.foreach {
arr =>
sum += arr(2).toLong
length += arr(1).toInt
}
(t._1, length, sum)
}
(t._1, length, sum)
} catch {
case e: Exception =>
throw new RuntimeException("Failed to get Cache Information. ", e)
}
}
LOGGER.info(s"Time taken to get cache results from Index Server is $time ms")
Expand Down

0 comments on commit df7339c

Please sign in to comment.