Skip to content

Commit

Permalink
[CARBONDATA-2991]NegativeArraySizeException during query execution
Browse files Browse the repository at this point in the history
Issue :- During Query Execution sometime NegativeArraySizeException Exception in Some Tasks . And sometime Executor is lost (JVM crash)

Root Cause :- It is because existing memoryblock is removed while it was in-use. This happened because duplicate taskid generated. Sometime freed same memory addresses are assigned to another task which will initialize memory block to0 and this cause NegativeSizeArrayException whereas sometime freed memory will not be used any task of executor process but running task will try to access it and as that address is not part of process so JVM crash will happen.

Solution :- Change taskID generation to UUID based instead of System.nanoTime()

This closes #2796
  • Loading branch information
BJangir authored and ravipesala committed Oct 4, 2018
1 parent 30adaa8 commit d392717
Show file tree
Hide file tree
Showing 16 changed files with 44 additions and 34 deletions.
Expand Up @@ -62,7 +62,7 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension
*/
protected boolean isMemoryOccupied;

private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
private final String taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();

/**
* Constructor
Expand Down
Expand Up @@ -51,7 +51,7 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
// size of the allocated memory, in bytes
private int capacity;

private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
private final String taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();

private static final int byteBits = DataTypes.BYTE.getSizeBits();
private static final int shortBits = DataTypes.SHORT.getSizeBits();
Expand Down
Expand Up @@ -45,7 +45,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {

static final double FACTOR = 1.25;

final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
final String taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();

// memory allocated by Unsafe
MemoryBlock memoryBlock;
Expand Down
Expand Up @@ -31,7 +31,7 @@ public abstract class AbstractMemoryDMStore implements Serializable {

protected boolean isMemoryFreed;

protected final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
protected final String taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();

public abstract void addIndexRow(CarbonRowSchema[] schema, DataMapRow indexRow)
throws MemoryException;
Expand Down
Expand Up @@ -36,9 +36,9 @@ public class IntPointerBuffer {

private MemoryBlock pointerMemoryBlock;

private long taskId;
private String taskId;

public IntPointerBuffer(long taskId) {
public IntPointerBuffer(String taskId) {
// TODO can be configurable, it is initial size and it can grow automatically.
this.length = 100000;
pointerBlock = new int[length];
Expand Down
Expand Up @@ -39,7 +39,7 @@ public class UnsafeMemoryManager {
private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
private static Map<Long,Set<MemoryBlock>> taskIdToMemoryBlockMap;
private static Map<String,Set<MemoryBlock>> taskIdToMemoryBlockMap;
static {
long size = 0L;
String defaultWorkingMemorySize = null;
Expand Down Expand Up @@ -107,7 +107,7 @@ private UnsafeMemoryManager(long totalMemory, MemoryType memoryType) {
.info("Working Memory manager is created with size " + totalMemory + " with " + memoryType);
}

private synchronized MemoryBlock allocateMemory(MemoryType memoryType, long taskId,
private synchronized MemoryBlock allocateMemory(MemoryType memoryType, String taskId,
long memoryRequested) {
if (memoryUsed + memoryRequested <= totalMemory) {
MemoryBlock allocate = getMemoryAllocator(memoryType).allocate(memoryRequested);
Expand All @@ -128,7 +128,7 @@ private synchronized MemoryBlock allocateMemory(MemoryType memoryType, long task
return null;
}

public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
public synchronized void freeMemory(String taskId, MemoryBlock memoryBlock) {
if (taskIdToMemoryBlockMap.containsKey(taskId)) {
taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
}
Expand All @@ -144,7 +144,7 @@ public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
}
}

public synchronized void freeMemoryAll(long taskId) {
public synchronized void freeMemoryAll(String taskId) {
Set<MemoryBlock> memoryBlockSet = null;
memoryBlockSet = taskIdToMemoryBlockMap.remove(taskId);
long occuppiedMemory = 0;
Expand Down Expand Up @@ -181,12 +181,12 @@ public long getUsableMemory() {
/**
* It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
*/
public static MemoryBlock allocateMemoryWithRetry(long taskId, long size)
public static MemoryBlock allocateMemoryWithRetry(String taskId, long size)
throws MemoryException {
return allocateMemoryWithRetry(INSTANCE.memoryType, taskId, size);
}

public static MemoryBlock allocateMemoryWithRetry(MemoryType memoryType, long taskId,
public static MemoryBlock allocateMemoryWithRetry(MemoryType memoryType, String taskId,
long size) throws MemoryException {
MemoryBlock baseBlock = null;
int tries = 0;
Expand Down
Expand Up @@ -49,7 +49,7 @@ public class UnsafeSortMemoryManager {
/**
* map to keep taskid to memory blocks
*/
private static Map<Long, Set<MemoryBlock>> taskIdToMemoryBlockMap;
private static Map<String, Set<MemoryBlock>> taskIdToMemoryBlockMap;

/**
* singleton instance
Expand Down Expand Up @@ -142,7 +142,7 @@ public synchronized void allocateDummyMemory(long size) {
}
}

public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
public synchronized void freeMemory(String taskId, MemoryBlock memoryBlock) {
if (taskIdToMemoryBlockMap.containsKey(taskId)) {
taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
}
Expand All @@ -164,7 +164,7 @@ public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
* when in case of task failure we need to clear all the memory occupied
* @param taskId
*/
public synchronized void freeMemoryAll(long taskId) {
public synchronized void freeMemoryAll(String taskId) {
Set<MemoryBlock> memoryBlockSet = null;
memoryBlockSet = taskIdToMemoryBlockMap.remove(taskId);
long occuppiedMemory = 0;
Expand Down Expand Up @@ -196,7 +196,7 @@ public synchronized void freeMemoryAll(long taskId) {
* @param memoryRequested
* @return memory block
*/
public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryRequested) {
public synchronized MemoryBlock allocateMemoryLazy(String taskId, long memoryRequested) {
MemoryBlock allocate = allocator.allocate(memoryRequested);
Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId);
if (null == listOfMemoryBlock) {
Expand All @@ -210,7 +210,8 @@ public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryReque
/**
* It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
*/
public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException {
public static MemoryBlock allocateMemoryWithRetry(String taskId, long size)
throws MemoryException {
MemoryBlock baseBlock = null;
int tries = 0;
while (tries < 100) {
Expand All @@ -232,7 +233,7 @@ public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws
return baseBlock;
}

private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) {
private synchronized MemoryBlock allocateMemory(String taskId, long memoryRequested) {
if (memoryUsed + memoryRequested <= totalMemory) {
MemoryBlock allocate = allocator.allocate(memoryRequested);
memoryUsed += allocate.size();
Expand Down
Expand Up @@ -28,13 +28,13 @@ public class CarbonTaskInfo implements Serializable {
*/
private static final long serialVersionUID = 1L;

public long taskId;
public String taskId;

public long getTaskId() {
public String getTaskId() {
return taskId;
}

public void setTaskId(long taskId) {
public void setTaskId(String taskId) {
this.taskId = taskId;
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
Expand Up @@ -3358,4 +3358,14 @@ public static int[] getInvertedReverseIndex(int[] invertedIndex) {
}
return columnIndexTemp;
}

/**
* Below method is to generateUUID (Random Based)
* later it will be extened for TimeBased,NameBased
*
* @return UUID as String
*/
public static String generateUUID() {
return UUID.randomUUID().toString();
}
}
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.carbondata.core.util;


/**
* Class to keep all the thread local variable for task
*/
Expand All @@ -30,7 +31,7 @@ public static void setCarbonTaskInfo(CarbonTaskInfo carbonTaskInfo) {
public static CarbonTaskInfo getCarbonTaskInfo() {
if (null == threadLocal.get()) {
CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
carbonTaskInfo.setTaskId(System.nanoTime());
carbonTaskInfo.setTaskId(CarbonUtil.generateUUID());
ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
}
return threadLocal.get();
Expand Down
Expand Up @@ -71,7 +71,7 @@ abstract class CarbonRDD[T: ClassTag](
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
val carbonTaskInfo = new CarbonTaskInfo
carbonTaskInfo.setTaskId(System.nanoTime)
carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
carbonSessionInfo.getSessionParams.getAddedProps.asScala.
map(f => CarbonProperties.getInstance().addProperty(f._1, f._2))
Expand Down
Expand Up @@ -639,7 +639,7 @@ object CommonUtil {
* Method to clear the memory for a task
* if present
*/
def clearUnsafeMemory(taskId: Long) {
def clearUnsafeMemory(taskId: String) {
UnsafeMemoryManager.
INSTANCE.freeMemoryAll(taskId)
UnsafeSortMemoryManager.
Expand Down
Expand Up @@ -46,14 +46,14 @@ public class UnsafeCarbonRowPage {

private MemoryManagerType managerType;

private long taskId;
private String taskId;

private TableFieldStat tableFieldStat;
private SortStepRowHandler sortStepRowHandler;
private boolean convertNoSortFields;

public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBlock,
boolean saveToDisk, long taskId) {
boolean saveToDisk, String taskId) {
this.tableFieldStat = tableFieldStat;
this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
this.saveToDisk = saveToDisk;
Expand Down
Expand Up @@ -93,7 +93,7 @@ public class UnsafeSortDataRows {
*/
private Semaphore semaphore;

private final long taskId;
private final String taskId;

public UnsafeSortDataRows(SortParameters parameters,
UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.CarbonTaskInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;

import org.apache.hadoop.mapreduce.RecordReader;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class CarbonReader<T> {
this.index = 0;
this.currentReader = readers.get(0);
CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
carbonTaskInfo.setTaskId(System.nanoTime());
carbonTaskInfo.setTaskId(CarbonUtil.generateUUID());
ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
}

Expand Down
Expand Up @@ -18,11 +18,7 @@
package org.apache.carbondata.store.worker;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.*;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.LogService;
Expand All @@ -48,6 +44,7 @@
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonTaskInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
Expand Down Expand Up @@ -110,7 +107,7 @@ private DataMapExprWrapper chooseFGDataMap(
private List<CarbonRow> handleRequest(SearchRequest request)
throws IOException, InterruptedException {
CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
carbonTaskInfo.setTaskId(System.nanoTime());
carbonTaskInfo.setTaskId(CarbonUtil.generateUUID());
ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
TableInfo tableInfo = request.tableInfo();
CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
Expand Down

0 comments on commit d392717

Please sign in to comment.