diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index a8f07b5a796..7ccb64e6143 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -47,14 +47,14 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE); private final FragmentRoot rootOperator; - private RootExec root; private final FragmentContext context; private final WorkerBee bee; private final StatusReporter listener; - private Thread executionThread; - private AtomicBoolean closed = new AtomicBoolean(false); private final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener(); + private RootExec root; + private boolean closed; + public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener) { this.context = context; this.bee = bee; @@ -99,7 +99,6 @@ public void run() { context.getHandle().getMinorFragmentId() ); Thread.currentThread().setName(newThreadName); - executionThread = Thread.currentThread(); root = ImplCreator.getExec(context, rootOperator); @@ -133,12 +132,15 @@ public void run() { bee.removeFragment(context.getHandle()); context.getDrillbitContext().getClusterCoordinator().removeDrillbitStatusListener(drillbitStatusListener); + // Final check to make sure RecordBatches are cleaned up. + closeOutResources(false); + Thread.currentThread().setName(originalThread); } } private void closeOutResources(boolean throwFailure) { - if (closed.get()) { + if (closed) { return; } @@ -160,7 +162,7 @@ private void closeOutResources(boolean throwFailure) { logger.warn("Failure while closing out resources.", e); } - closed.set(true); + closed = true; } private void internalFail(Throwable excep) {