Skip to content

Commit

Permalink
[HOTFIX] Fixed LRU cache bug to invalidate the cacheable object to cl…
Browse files Browse the repository at this point in the history
…ean up the resources

This PR contains

Fix for LRU cache bug to invalidate the Cacheable object while removing it from LRU cache. This will help in clearing the unsafe memory for cacheable objects like BlockDataMaps
Fix for setting the driver flag for saprkCarbonFileFormat which will used for taking the driver memory for LRU cache and unsafe memory initialization
Modified the logic for properties validation for unsafe working and sort memory. Sort memory now will not consider the value of parameter sort.inmemory.size.inmb as it deprecated from long back. The memory configured for this parameter was divided in 80:20 ratio for sort and working unsafe memory which is now removed. Now only value for parameter carbon.sort.storage.inmemory.size.inmb will be considered.

This closes #2698
  • Loading branch information
manishgupta88 authored and ravipesala committed Sep 10, 2018
1 parent 0528a79 commit 54dcd8d
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 78 deletions.
Expand Up @@ -45,4 +45,11 @@ public interface Cacheable {
* @return
*/
long getMemorySize();

/**
* Method to be used for invalidating the cacheable object. API to be invoked at the time of
* removing the cacheable object from memory. Example at the of removing the cachebale object
* from LRU cache
*/
void invalidate();
}
Expand Up @@ -155,10 +155,10 @@ public void remove(String key) {
private void removeKey(String key) {
Cacheable cacheable = lruCacheMap.get(key);
if (null != cacheable) {
currentSize = currentSize - cacheable.getMemorySize();
}
Cacheable remove = lruCacheMap.remove(key);
if (null != remove) {
long memorySize = cacheable.getMemorySize();
cacheable.invalidate();
lruCacheMap.remove(key);
currentSize = currentSize - memorySize;
LOGGER.info("Removed entry from InMemory lru cache :: " + key);
}
}
Expand Down
Expand Up @@ -296,5 +296,9 @@ protected byte[] getDictionaryBytesFromSurrogate(int surrogateKey) {
byte[] keyData = value.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
return getSurrogateKey(keyData);
}

@Override public void invalidate() {

}
}

Expand Up @@ -100,6 +100,10 @@ public DataRefNode getDataRefNode() {
return this.memorySize;
}

@Override public void invalidate() {

}

/**
* The method is used to set the access count
*/
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.util.List;

import org.apache.carbondata.core.cache.Cacheable;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;

/**
Expand Down Expand Up @@ -57,6 +58,13 @@ public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps)
return wrapperSize;
}

@Override public void invalidate() {
for (DataMap dataMap : dataMaps) {
dataMap.clear();
}
dataMaps = null;
}

public List<BlockDataMap> getDataMaps() {
return dataMaps;
}
Expand Down
Expand Up @@ -42,45 +42,49 @@ public class UnsafeMemoryManager {
private static Map<Long,Set<MemoryBlock>> taskIdToMemoryBlockMap;
static {
long size = 0L;
String defaultWorkingMemorySize = null;
try {
// check if driver unsafe memory is configured and JVM process is in driver. In that case
// initialize unsafe memory configured for driver
boolean isDriver = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false"));
boolean initializedWithUnsafeDriverMemory = false;
if (isDriver) {
String driverUnsafeMemorySize = CarbonProperties.getInstance()
defaultWorkingMemorySize = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.UNSAFE_DRIVER_WORKING_MEMORY_IN_MB);
if (null != driverUnsafeMemorySize) {
size = Long.parseLong(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.UNSAFE_DRIVER_WORKING_MEMORY_IN_MB,
CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT));
if (null != defaultWorkingMemorySize) {
size = Long.parseLong(defaultWorkingMemorySize);
initializedWithUnsafeDriverMemory = true;
}
}
if (!initializedWithUnsafeDriverMemory) {
size = Long.parseLong(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT));
defaultWorkingMemorySize = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB);
if (null != defaultWorkingMemorySize) {
size = Long.parseLong(defaultWorkingMemorySize);
}
}
} catch (Exception e) {
size = Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
LOGGER.info("Wrong memory size given, "
+ "so setting default value to " + size);
}
if (size < 512) {
size = 512;
LOGGER.info("It is not recommended to keep unsafe memory size less than 512MB, "
+ "so setting default value to " + size);
LOGGER.info("Invalid memory size value: " + defaultWorkingMemorySize);
}
long takenSize = size * 1024 * 1024;
long takenSize = size;
MemoryAllocator allocator;
if (offHeap) {
allocator = MemoryAllocator.UNSAFE;
long defaultSize = Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
if (takenSize < defaultSize) {
takenSize = defaultSize;
}
takenSize = takenSize * 1024 * 1024;
} else {
long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
if (takenSize > maxMemory) {
if (takenSize == 0L) {
takenSize = maxMemory;
} else {
takenSize = takenSize * 1024 * 1024;
if (takenSize > maxMemory) {
takenSize = maxMemory;
}
}
allocator = MemoryAllocator.HEAP;
}
Expand Down
Expand Up @@ -1294,73 +1294,27 @@ public void addPropertyToPropertySet(Set<String> externalPropertySet) {
}

private void validateSortMemorySizeInMB() {
int sortMemorySizeInMBDefault =
Integer.parseInt(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
int sortMemorySizeInMB = 0;
try {
sortMemorySizeInMB = Integer.parseInt(
carbonProperties.getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB));
} catch (NumberFormatException e) {
LOGGER.warn(
"The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB
+ "is Invalid." + " Taking the default value."
+ CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
sortMemorySizeInMB = sortMemorySizeInMBDefault;
}
if (sortMemorySizeInMB < sortMemorySizeInMBDefault) {
LOGGER.warn(
"The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB
+ "is less than default value." + ". Taking the default value."
+ CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
sortMemorySizeInMB = sortMemorySizeInMBDefault;
}
String unsafeWorkingMemoryString =
carbonProperties.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB);
String unsafeSortStorageMemoryString =
carbonProperties.getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB);
int workingMemory = 512;
int sortStorageMemory;
if (null == unsafeWorkingMemoryString && null == unsafeSortStorageMemoryString) {
workingMemory = workingMemory > ((sortMemorySizeInMB * 20) / 100) ?
workingMemory :
((sortMemorySizeInMB * 20) / 100);
sortStorageMemory = sortMemorySizeInMB - workingMemory;
carbonProperties
.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, workingMemory + "");
carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB,
sortStorageMemory + "");
} else if (null != unsafeWorkingMemoryString && null == unsafeSortStorageMemoryString) {
int unsafeSortStorageMemoryString = Integer.parseInt(carbonProperties
.getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB));
carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB,
sortMemorySizeInMB + "");
} else if (null == unsafeWorkingMemoryString && null != unsafeSortStorageMemoryString) {
carbonProperties
.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, sortMemorySizeInMB + "");
unsafeSortStorageMemoryString + "");
} catch (NumberFormatException ne) {
LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + "is invalid.");
}
}

private void validateWorkingMemory() {
int unsafeWorkingMemoryDefault =
Integer.parseInt(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
int unsafeWorkingMemory = 0;
try {
unsafeWorkingMemory = Integer.parseInt(
int unsafeWorkingMemory = Integer.parseInt(
carbonProperties.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB));
carbonProperties
.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, unsafeWorkingMemory + "");
} catch (NumberFormatException e) {
LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is invalid."
+ " Taking the default value."
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
unsafeWorkingMemory = unsafeWorkingMemoryDefault;
}
if (unsafeWorkingMemory < unsafeWorkingMemoryDefault) {
LOGGER.warn("The specified value for property "
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT
+ "is less than the default value." + ". Taking the default value."
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT);
unsafeWorkingMemory = unsafeWorkingMemoryDefault;
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is invalid.");
}
carbonProperties
.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, unsafeWorkingMemory + "");
}

private void validateSortStorageMemory() {
Expand Down
Expand Up @@ -100,6 +100,10 @@ public long getMemorySize() {
return size;
}

@Override public void invalidate() {
bloomFilters = null;
}

public List<CarbonBloomFilter> getBloomFilters() {
return bloomFilters;
}
Expand Down
Expand Up @@ -30,10 +30,12 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.{AtomicType, StructType}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, HDFSCarbonFile}
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}
Expand Down Expand Up @@ -79,6 +81,10 @@ class CarbonFileIndex(

private def prune(dataFilters: Seq[Expression],
directories: Seq[PartitionDirectory]): Seq[PartitionDirectory] = {
// set the driver flag to true which will used for unsafe memory initialization and carbon LRU
// cache instance initialization as per teh driver memory
CarbonProperties.getInstance
.addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
val tablePath = parameters.get("path")
if (tablePath.nonEmpty && dataFilters.nonEmpty) {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
Expand Down

0 comments on commit 54dcd8d

Please sign in to comment.