Skip to content

Commit

Permalink
PIG-4564: Pig can deadlock in POPartialAgg if there is a bag
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1681399 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Jianyong Dai committed May 23, 2015
1 parent 9ecf5ec commit ad8fc7e
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 106 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Expand Up @@ -82,6 +82,8 @@ PIG-4333: Split BigData tests into multiple groups (rohini)

BUG FIXES

PIG-4564: Pig can deadlock in POPartialAgg if there is a bag (rohini via daijy)

PIG-4569: Fix e2e test Rank_1 failure (rohini)

PIG-4490: MIN/MAX builtin UDFs return wrong results when accumulating for strings (xplenty via rohini)
Expand Down
270 changes: 164 additions & 106 deletions src/org/apache/pig/impl/util/SpillableMemoryManager.java
Expand Up @@ -50,7 +50,11 @@ public class SpillableMemoryManager implements NotificationListener {

private final Log log = LogFactory.getLog(getClass());

LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
// References to spillables with size
private LinkedList<SpillablePtr> spillablesSR = null;

private Object spillLock = new Object();

// if we freed at least this much, invoke GC
// (default 40 MB - this can be overridden by user supplied property)
Expand All @@ -62,15 +66,15 @@ public class SpillableMemoryManager implements NotificationListener {

// this will keep track of memory freed across spills
// and between GC invocations
private static long accumulatedFreeSize = 0L;
private long accumulatedFreeSize = 0L;

// fraction of biggest heap for which we want to get
// "memory usage threshold exceeded" notifications
private static double memoryThresholdFraction = 0.7;
private double memoryThresholdFraction = 0.7;

// fraction of biggest heap for which we want to get
// "collection threshold exceeded" notifications
private static double collectionMemoryThresholdFraction = 0.5;
private double collectionMemoryThresholdFraction = 0.5;

// log notification on usage threshold exceeded only the first time
private boolean firstUsageThreshExceededLogged = false;
Expand All @@ -80,10 +84,12 @@ public class SpillableMemoryManager implements NotificationListener {

// fraction of the total heap used for the threshold to determine
// if we want to perform an extra gc before the spill
private static double extraGCThresholdFraction = 0.05;
private static long extraGCSpillSizeThreshold = 0L;
private double extraGCThresholdFraction = 0.05;
private long extraGCSpillSizeThreshold = 0L;

private volatile boolean blockRegisterOnSpill = false;

private static volatile SpillableMemoryManager manager;
private static volatile SpillableMemoryManager manager = new SpillableMemoryManager();

private SpillableMemoryManager() {
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null);
Expand Down Expand Up @@ -129,9 +135,6 @@ private SpillableMemoryManager() {
}

public static SpillableMemoryManager getInstance() {
if (manager == null) {
manager = new SpillableMemoryManager();
}
return manager;
}

Expand Down Expand Up @@ -187,119 +190,135 @@ public void handleNotification(Notification n, Object o) {
}

}
clearSpillables();
if (toFree < 0) {
log.debug("low memory handler returning " +
"because there is nothing to free");
return;
}
synchronized(spillables) {
Collections.sort(spillables, new Comparator<WeakReference<Spillable>>() {

/**
* We don't lock anything, so this sort may not be stable if a WeakReference suddenly
* becomes null, but it will be close enough.
* Also between the time we sort and we use these spillables, they
* may actually change in size - so this is just best effort
*/
@Override
public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) {
Spillable o1 = o1Ref.get();
Spillable o2 = o2Ref.get();
if (o1 == null && o2 == null) {
return 0;
}
if (o1 == null) {
return 1;

// Use a separate spillLock to block multiple handleNotification calls
synchronized (spillLock) {
synchronized(spillables) {
spillablesSR = new LinkedList<SpillablePtr>();
for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
Spillable s = i.next().get();
if (s == null) {
i.remove();
continue;
}
if (o2 == null) {
// Create a list with spillable size for stable sorting. Refer PIG-4012
spillablesSR.add(new SpillablePtr(s, s.getMemorySize()));
}
log.debug("Spillables list size: " + spillablesSR.size());
Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
@Override
public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref) {
Spillable o1 = o1Ref.get();
Spillable o2 = o2Ref.get();
long o1Size = o1.getMemorySize();
long o2Size = o2.getMemorySize();

if (o1Size == o2Size) {
return 0;
}
if (o1Size < o2Size) {
return 1;
}
return -1;
}
long o1Size = o1.getMemorySize();
long o2Size = o2.getMemorySize();
});
// Block new bags from being registered
blockRegisterOnSpill = true;
}

if (o1Size == o2Size) {
return 0;
}
if (o1Size < o2Size) {
return 1;
}
return -1;
}
});
long estimatedFreed = 0;
int numObjSpilled = 0;
boolean invokeGC = false;
boolean extraGCCalled = false;
for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
WeakReference<Spillable> weakRef = i.next();
Spillable s = weakRef.get();
// Still need to check for null here, even after we removed
// above, because the reference may have gone bad on us
// since the last check.
if (s == null) {
i.remove();
continue;
}
long toBeFreed = s.getMemorySize();
log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
// Don't keep trying if the rest of files are too small
if (toBeFreed < spillFileSizeThreshold) {
log.debug("spilling small files - getting out of memory handler");
break ;
}
// If single Spillable is bigger than the threshold,
// we force GC to make sure we really need to keep this
// object before paying for the expensive spill().
// Done at most once per handleNotification.
// Do not invoke extraGC for GroupingSpillable. Its size will always exceed
// extraGCSpillSizeThreshold and the data is always strong referenced.
if( !extraGCCalled && extraGCSpillSizeThreshold != 0
&& toBeFreed > extraGCSpillSizeThreshold && !(s instanceof GroupingSpillable)) {
log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
// this extra assignment to null is needed so that gc can free the
// spillable if nothing else is pointing at it
s = null;
System.gc();
extraGCCalled = true;
// checking again to see if this reference is still valid
s = weakRef.get();
try {
long estimatedFreed = 0;
int numObjSpilled = 0;
boolean invokeGC = false;
boolean extraGCCalled = false;
boolean isGroupingSpillable = false;
for (Iterator<SpillablePtr> i = spillablesSR.iterator(); i.hasNext();) {
SpillablePtr sPtr = i.next();
Spillable s = sPtr.get();
// Still need to check for null here, even after we removed
// above, because the reference may have gone bad on us
// since the last check.
if (s == null) {
i.remove();
accumulatedFreeSize = 0;
invokeGC = false;
continue;
}
long toBeFreed = sPtr.getMemorySize();
log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
// Don't keep trying if the rest of files are too small
if (toBeFreed < spillFileSizeThreshold) {
log.debug("spilling small files - getting out of memory handler");
break ;
}
isGroupingSpillable = (s instanceof GroupingSpillable);
// If single Spillable is bigger than the threshold,
// we force GC to make sure we really need to keep this
// object before paying for the expensive spill().
// Done at most once per handleNotification.
// Do not invoke extraGC for GroupingSpillable. Its size will always exceed
// extraGCSpillSizeThreshold and the data is always strong referenced.
if( !extraGCCalled && extraGCSpillSizeThreshold != 0
&& toBeFreed > extraGCSpillSizeThreshold && !isGroupingSpillable) {
log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
// this extra assignment to null is needed so that gc can free the
// spillable if nothing else is pointing at it
s = null;
System.gc();
extraGCCalled = true;
// checking again to see if this reference is still valid
s = sPtr.get();
if (s == null) {
i.remove();
accumulatedFreeSize = 0;
invokeGC = false;
continue;
}
}
// Unblock registering of new bags temporarily as aggregation
// of POPartialAgg requires new record to be loaded.
blockRegisterOnSpill = !isGroupingSpillable;
try {
s.spill();
} finally {
blockRegisterOnSpill = true;
}

numObjSpilled++;
estimatedFreed += toBeFreed;
accumulatedFreeSize += toBeFreed;
// This should significantly reduce the number of small files
// in case that we have a lot of nested bags
if (accumulatedFreeSize > gcActivationSize) {
invokeGC = true;
}

if (estimatedFreed > toFree) {
log.debug("Freed enough space - getting out of memory handler");
invokeGC = true;
break;
}
}
s.spill();
numObjSpilled++;
estimatedFreed += toBeFreed;
accumulatedFreeSize += toBeFreed;
// This should significantly reduce the number of small files
// in case that we have a lot of nested bags
if (accumulatedFreeSize > gcActivationSize) {
invokeGC = true;
spillablesSR = null;
/* Poke the GC again to see if we successfully freed enough memory */
if(invokeGC) {
System.gc();
// now that we have invoked the GC, reset accumulatedFreeSize
accumulatedFreeSize = 0;
}

if (estimatedFreed > toFree) {
log.debug("Freed enough space - getting out of memory handler");
invokeGC = true;
break;
if(estimatedFreed > 0){
String msg = "Spilled an estimate of " + estimatedFreed +
" bytes from " + numObjSpilled + " objects. " + info.getUsage();;
log.info(msg);
}
} finally {
blockRegisterOnSpill = false;
}
/* Poke the GC again to see if we successfully freed enough memory */
if(invokeGC) {
System.gc();
// now that we have invoked the GC, reset accumulatedFreeSize
accumulatedFreeSize = 0;
}
if(estimatedFreed > 0){
String msg = "Spilled an estimate of " + estimatedFreed +
" bytes from " + numObjSpilled + " objects. " + info.getUsage();;
log.info(msg);
}

}

}

public void clearSpillables() {
Expand All @@ -321,15 +340,54 @@ public void clearSpillables() {
* @param s the spillable to track.
*/
public void registerSpillable(Spillable s) {
synchronized(spillables) {
synchronized (spillables) {
// Cleaing the entire list is too expensive. Just trim off the front while
// we can.
WeakReference<Spillable> first = spillables.peek();
while (first != null && first.get() == null) {
spillables.remove();
first = spillables.peek();
}

if (blockRegisterOnSpill) {
// When the spill is happening we do not want to register new bags
// save for exceptions like POPartialAgg. So block here.
// blockRegisterOnSpill is set to false in the finally block after spill.
// But just in case adding a safeguard of 5 min timeout (assuming a large
// spill completes within 5 mins) instead of infinitely blocking
// in case there are missed corner cases causing deadlock.
try {
int i = 6000;
for (; i > 0 && blockRegisterOnSpill; i--) {
Thread.sleep(50);
}
if (i == 0) {
log.warn("Spill took more than 5 mins. This needs investigation");
}
} catch (InterruptedException e) {
log.warn("Interrupted exception in registerSpillable while blocked on spill", e);
}
blockRegisterOnSpill = false;
}
spillables.add(new WeakReference<Spillable>(s));
}
}

private static class SpillablePtr {
private WeakReference<Spillable> spillable;
private long size;

public SpillablePtr(Spillable p, long s) {
spillable = new WeakReference<Spillable>(p);
size = s;
}

public Spillable get() {
return spillable.get();
}

public long getMemorySize() {
return size;
}
}
}

0 comments on commit ad8fc7e

Please sign in to comment.