Skip to content

Commit

Permalink
Fix NPE in LRU cache when entry from the same table is gettign evicte…
Browse files Browse the repository at this point in the history
…d to load another entry
  • Loading branch information
manishgupta88 committed Sep 26, 2018
1 parent 1d4d240 commit 89b3a94
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps,
for (DataMap dataMap : dataMaps) {
dataMap.clear();
}
dataMaps = null;
}

public List<BlockDataMap> getDataMaps() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public void freeMemory() {
if (!isMemoryFreed) {
if (null != dataMapRows) {
dataMapRows.clear();
dataMapRows = null;
}
isMemoryFreed = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
import org.apache.carbondata.core.memory.MemoryAllocator;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
Expand Down Expand Up @@ -49,7 +50,8 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {

public UnsafeMemoryDMStore() throws MemoryException {
this.allocatedSize = capacity;
this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize);
this.memoryBlock =
UnsafeMemoryManager.allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, allocatedSize);
this.pointers = new int[1000];
}

Expand All @@ -71,11 +73,11 @@ private void ensureSize(int rowSize) throws MemoryException {
}

private void increaseMemory(int requiredMemory) throws MemoryException {
MemoryBlock newMemoryBlock =
UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + requiredMemory);
MemoryBlock newMemoryBlock = UnsafeMemoryManager
.allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, allocatedSize + requiredMemory);
getUnsafe().copyMemory(this.memoryBlock.getBaseObject(), this.memoryBlock.getBaseOffset(),
newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(), runningLength);
UnsafeMemoryManager.INSTANCE.freeMemory(taskId, this.memoryBlock);
UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, this.memoryBlock);
allocatedSize = allocatedSize + requiredMemory;
this.memoryBlock = newMemoryBlock;
}
Expand Down Expand Up @@ -188,10 +190,10 @@ public DataMapRow getDataMapRow(CarbonRowSchema[] schema, int index) {
public void finishWriting() throws MemoryException {
if (runningLength < allocatedSize) {
MemoryBlock allocate =
UnsafeMemoryManager.allocateMemoryWithRetry(taskId, runningLength);
UnsafeMemoryManager.allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, runningLength);
getUnsafe().copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, memoryBlock);
memoryBlock = allocate;
}
// Compact pointers.
Expand All @@ -204,7 +206,7 @@ public void finishWriting() throws MemoryException {

public void freeMemory() {
if (!isMemoryFreed) {
UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, memoryBlock);
isMemoryFreed = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -943,12 +943,10 @@ private String[] getFileDetails() {
@Override public void clear() {
if (memoryDMStore != null) {
memoryDMStore.freeMemory();
memoryDMStore = null;
}
// clear task min/max unsafe memory
if (null != taskSummaryDMStore) {
taskSummaryDMStore.freeMemory();
taskSummaryDMStore = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import javax.annotation.concurrent.GuardedBy;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.CarbonProperties;

/**
Expand All @@ -39,8 +40,11 @@ public class HeapMemoryAllocator implements MemoryAllocator {

public HeapMemoryAllocator() {
poolingThresholdBytes = CarbonProperties.getInstance().getHeapMemoryPoolingThresholdBytes();
// if set 'poolingThresholdBytes' to -1, it should not go through the pooling mechanism.
if (poolingThresholdBytes == -1) {
boolean isDriver = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false"));
// if set 'poolingThresholdBytes' to -1 or the object creation call is in driver,
// it should not go through the pooling mechanism.
if (poolingThresholdBytes == -1 || isDriver) {
shouldPooling = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
.info("Working Memory manager is created with size " + totalMemory + " with " + allocator);
}

private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
private synchronized MemoryBlock allocateMemory(MemoryAllocator memoryAllocator, long taskId,
long memoryRequested) {
if (memoryUsed + memoryRequested <= totalMemory) {
MemoryBlock allocate = allocator.allocate(memoryRequested);
MemoryBlock allocate = memoryAllocator.allocate(memoryRequested);
memoryUsed += allocate.size();
Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
if (null == listOfMemoryBlock) {
Expand All @@ -128,11 +129,16 @@ private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequeste
}

public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
freeMemory(allocator, taskId, memoryBlock);
}

public synchronized void freeMemory(MemoryAllocator memoryAllocator, long taskId,
MemoryBlock memoryBlock) {
if (taskIdToMemoryBlockMap.containsKey(taskId)) {
taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
}
if (!memoryBlock.isFreedStatus()) {
allocator.free(memoryBlock);
memoryAllocator.free(memoryBlock);
memoryUsed -= memoryBlock.size();
memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
if (LOGGER.isDebugEnabled()) {
Expand Down Expand Up @@ -182,10 +188,15 @@ public long getUsableMemory() {
*/
public static MemoryBlock allocateMemoryWithRetry(long taskId, long size)
throws MemoryException {
return allocateMemoryWithRetry(INSTANCE.allocator, taskId, size);
}

public static MemoryBlock allocateMemoryWithRetry(MemoryAllocator memoryAllocator, long taskId,
long size) throws MemoryException {
MemoryBlock baseBlock = null;
int tries = 0;
while (tries < 300) {
baseBlock = INSTANCE.allocateMemory(taskId, size);
baseBlock = INSTANCE.allocateMemory(memoryAllocator, taskId, size);
if (baseBlock == null) {
try {
LOGGER.info("Memory is not available, retry after 500 millis");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ protected BlockletIndex getBlockletIndexForDataFileFooter(List<BlockletIndex> bl
blockletMinMaxFlag = blockletIndexList.get(i).getMinMaxIndex().getIsMinMaxSet();
for (int j = 0; j < maxValue.length; j++) {
// can be null for stores < 1.5.0 version
if (null != blockletMinMaxFlag && !blockletMinMaxFlag[i]) {
blockMinMaxFlag[i] = blockletMinMaxFlag[i];
if (null != blockletMinMaxFlag && !blockletMinMaxFlag[j]) {
blockMinMaxFlag[j] = blockletMinMaxFlag[j];
currentMaxValue[j] = new byte[0];
currentMinValue[j] = new byte[0];
continue;
Expand Down

0 comments on commit 89b3a94

Please sign in to comment.