diff --git a/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java b/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java index d8be12bbf58..c58982d068c 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/Cacheable.java @@ -45,4 +45,11 @@ public interface Cacheable { * @return */ long getMemorySize(); + + /** + * Method to be used for invalidating the cacheable object. API to be invoked at the time of + * removing the cacheable object from memory. Example at the of removing the cachebale object + * from LRU cache + */ + void invalidate(); } diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java index 03838a28dfd..4a0c36cb8b4 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java @@ -155,10 +155,10 @@ public void remove(String key) { private void removeKey(String key) { Cacheable cacheable = lruCacheMap.get(key); if (null != cacheable) { - currentSize = currentSize - cacheable.getMemorySize(); - } - Cacheable remove = lruCacheMap.remove(key); - if (null != remove) { + long memorySize = cacheable.getMemorySize(); + cacheable.invalidate(); + lruCacheMap.remove(key); + currentSize = currentSize - memorySize; LOGGER.info("Removed entry from InMemory lru cache :: " + key); } } diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java index c138cc88e09..f5971a5413e 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java @@ -296,5 +296,9 @@ protected byte[] getDictionaryBytesFromSurrogate(int surrogateKey) { byte[] keyData = value.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); return getSurrogateKey(keyData); } + + @Override public void invalidate() { + + } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java index 7fbef8ac5a7..1972e976f2f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java @@ -100,6 +100,10 @@ public DataRefNode getDataRefNode() { return this.memorySize; } + @Override public void invalidate() { + + } + /** * The method is used to set the access count */ diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java index b0fb13ef158..2cf025941cc 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.carbondata.core.cache.Cacheable; +import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap; /** @@ -57,6 +58,13 @@ public BlockletDataMapIndexWrapper(String segmentId,List dataMaps) return wrapperSize; } + @Override public void invalidate() { + for (DataMap dataMap : dataMaps) { + dataMap.clear(); + } + dataMaps = null; + } + public List getDataMaps() { return dataMaps; } diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java index 9133f0f47af..8fcbb6cd11b 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java @@ -42,6 +42,7 @@ public class UnsafeMemoryManager { private static Map> taskIdToMemoryBlockMap; static { long size = 0L; + String defaultWorkingMemorySize = null; try { // check if driver unsafe memory is configured and JVM process is in driver. In that case // initialize unsafe memory configured for driver @@ -49,38 +50,41 @@ public class UnsafeMemoryManager { .getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false")); boolean initializedWithUnsafeDriverMemory = false; if (isDriver) { - String driverUnsafeMemorySize = CarbonProperties.getInstance() + defaultWorkingMemorySize = CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.UNSAFE_DRIVER_WORKING_MEMORY_IN_MB); - if (null != driverUnsafeMemorySize) { - size = Long.parseLong(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.UNSAFE_DRIVER_WORKING_MEMORY_IN_MB, - CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)); + if (null != defaultWorkingMemorySize) { + size = Long.parseLong(defaultWorkingMemorySize); initializedWithUnsafeDriverMemory = true; } } if (!initializedWithUnsafeDriverMemory) { - size = Long.parseLong(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, - CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)); + defaultWorkingMemorySize = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB); + if (null != defaultWorkingMemorySize) { + size = Long.parseLong(defaultWorkingMemorySize); + } } } catch (Exception e) { - size = Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT); - LOGGER.info("Wrong memory size given, " - + "so setting default value to " + size); - } - if (size < 512) { - size = 512; - LOGGER.info("It is not recommended to keep unsafe memory size less than 512MB, " - + "so setting default value to " + size); + LOGGER.info("Invalid memory size value: " + defaultWorkingMemorySize); } - long takenSize = size * 1024 * 1024; + long takenSize = size; MemoryAllocator allocator; if (offHeap) { allocator = MemoryAllocator.UNSAFE; + long defaultSize = Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT); + if (takenSize < defaultSize) { + takenSize = defaultSize; + } + takenSize = takenSize * 1024 * 1024; } else { long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100; - if (takenSize > maxMemory) { + if (takenSize == 0L) { takenSize = maxMemory; + } else { + takenSize = takenSize * 1024 * 1024; + if (takenSize > maxMemory) { + takenSize = maxMemory; + } } allocator = MemoryAllocator.HEAP; } diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 6305283ba32..559320adb04 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -1294,73 +1294,27 @@ public void addPropertyToPropertySet(Set externalPropertySet) { } private void validateSortMemorySizeInMB() { - int sortMemorySizeInMBDefault = - Integer.parseInt(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT); - int sortMemorySizeInMB = 0; try { - sortMemorySizeInMB = Integer.parseInt( - carbonProperties.getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB)); - } catch (NumberFormatException e) { - LOGGER.warn( - "The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB - + "is Invalid." + " Taking the default value." - + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT); - sortMemorySizeInMB = sortMemorySizeInMBDefault; - } - if (sortMemorySizeInMB < sortMemorySizeInMBDefault) { - LOGGER.warn( - "The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB - + "is less than default value." + ". Taking the default value." - + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT); - sortMemorySizeInMB = sortMemorySizeInMBDefault; - } - String unsafeWorkingMemoryString = - carbonProperties.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB); - String unsafeSortStorageMemoryString = - carbonProperties.getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB); - int workingMemory = 512; - int sortStorageMemory; - if (null == unsafeWorkingMemoryString && null == unsafeSortStorageMemoryString) { - workingMemory = workingMemory > ((sortMemorySizeInMB * 20) / 100) ? - workingMemory : - ((sortMemorySizeInMB * 20) / 100); - sortStorageMemory = sortMemorySizeInMB - workingMemory; - carbonProperties - .setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, workingMemory + ""); - carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB, - sortStorageMemory + ""); - } else if (null != unsafeWorkingMemoryString && null == unsafeSortStorageMemoryString) { + int unsafeSortStorageMemoryString = Integer.parseInt(carbonProperties + .getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB)); carbonProperties.setProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB, - sortMemorySizeInMB + ""); - } else if (null == unsafeWorkingMemoryString && null != unsafeSortStorageMemoryString) { - carbonProperties - .setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, sortMemorySizeInMB + ""); + unsafeSortStorageMemoryString + ""); + } catch (NumberFormatException ne) { + LOGGER.warn("The specified value for property " + + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + "is invalid."); } } private void validateWorkingMemory() { - int unsafeWorkingMemoryDefault = - Integer.parseInt(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT); - int unsafeWorkingMemory = 0; try { - unsafeWorkingMemory = Integer.parseInt( + int unsafeWorkingMemory = Integer.parseInt( carbonProperties.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB)); + carbonProperties + .setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, unsafeWorkingMemory + ""); } catch (NumberFormatException e) { LOGGER.warn("The specified value for property " - + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is invalid." - + " Taking the default value." - + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT); - unsafeWorkingMemory = unsafeWorkingMemoryDefault; - } - if (unsafeWorkingMemory < unsafeWorkingMemoryDefault) { - LOGGER.warn("The specified value for property " - + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT - + "is less than the default value." + ". Taking the default value." - + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT); - unsafeWorkingMemory = unsafeWorkingMemoryDefault; + + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is invalid."); } - carbonProperties - .setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, unsafeWorkingMemory + ""); } private void validateSortStorageMemory() { diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java index 29e94d8bf6f..a66ee633686 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java @@ -100,6 +100,10 @@ public long getMemorySize() { return size; } + @Override public void invalidate() { + bloomFilters = null; + } + public List getBloomFilters() { return bloomFilters; } diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala index d970892bb23..af05613b0a2 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala @@ -30,10 +30,12 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.types.{AtomicType, StructType} +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, HDFSCarbonFile} import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression} import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.CarbonInputSplit import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat} @@ -79,6 +81,10 @@ class CarbonFileIndex( private def prune(dataFilters: Seq[Expression], directories: Seq[PartitionDirectory]): Seq[PartitionDirectory] = { + // set the driver flag to true which will used for unsafe memory initialization and carbon LRU + // cache instance initialization as per teh driver memory + CarbonProperties.getInstance + .addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true") val tablePath = parameters.get("path") if (tablePath.nonEmpty && dataFilters.nonEmpty) { val hadoopConf = sparkSession.sessionState.newHadoopConf()