Skip to content
Permalink
Browse files
DRILL-6516: Fix memory leak issue with Sort and StreamingAgg together
  • Loading branch information
Sorabh Hamirwasia authored and parthchandra committed Jul 12, 2018
1 parent b0314a3 commit 7655ec4f54976def63101daabf34e51697978c57
Showing 1 changed file with 28 additions and 26 deletions.
@@ -315,7 +315,9 @@ public IterOutcome innerNext() {
case START:
return load();
case LOAD:
resetSortState();
if (!this.retainInMemoryBatchesOnNone) {
resetSortState();
}
return (sortState == SortState.DONE) ? NONE : load();
case DELIVER:
return nextOutputBatch();
@@ -578,36 +580,20 @@ public static void releaseBatches(RecordBatch incoming) {
}
if (incoming instanceof ExternalSortBatch) {
ExternalSortBatch esb = (ExternalSortBatch) incoming;
esb.releaseResources();
esb.resetSortState();
}
}

private void releaseResources() {
// This means if it has received NONE outcome and flag to retain is false OR if it has seen an EMIT
// then release the resources
if ((sortState == SortState.DONE && !this.retainInMemoryBatchesOnNone) ||
(sortState == SortState.LOAD)) {

// Close the iterator here to release any remaining resources such
// as spill files. This is important when a query has a join: the
// first branch sort may complete before the second branch starts;
// it may be quite a while after returning the last batch before the
// fragment executor calls this operator's close method.
//
// Note however, that the StreamingAgg operator REQUIRES that the sort
// retain the batches behind an SV4 when doing an in-memory sort because
// the StreamingAgg retains a reference to that data that it will use
// after receiving a NONE result code. See DRILL-5656.
//zeroResources();
if (resultsIterator != null) {
resultsIterator.close();
}
// We only zero vectors for actual output container
outputWrapperContainer.clear();
outputSV4.clear();
container.zeroVectors();
if (resultsIterator != null) {
resultsIterator.close();
}

// We only zero vectors for actual output container
outputWrapperContainer.clear();
outputSV4.clear();
container.zeroVectors();

// Close sortImpl for this boundary
if (sortImpl != null) {
sortImpl.close();
@@ -620,6 +606,20 @@ private void releaseResources() {
*/
private void resetSortState() {
sortState = (lastKnownOutcome == EMIT) ? SortState.LOAD : SortState.DONE;
// This means if it has received NONE/EMIT outcome and flag to retain is false which will be the case in presence of
// StreamingAggBatch only since it will explicitly call releaseBacthes on ExternalSort when its done consuming
// all the data buffer.

// Close the iterator here to release any remaining resources such
// as spill files. This is important when a query has a join: the
// first branch sort may complete before the second branch starts;
// it may be quite a while after returning the last batch before the
// fragment executor calls this operator's close method.
//
// Note however, that the StreamingAgg operator REQUIRES that the sort
// retain the batches behind an SV4 when doing an in-memory sort because
// the StreamingAgg retains a reference to that data that it will use
// after receiving a NONE result code. See DRILL-5656.
releaseResources();

if (lastKnownOutcome == EMIT) {
@@ -674,7 +674,9 @@ private IterOutcome getFinalOutcome() {
sortState = SortState.DELIVER;
} else if (getRecordCount() == 0) { // There is no record to send downstream
outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE;
resetSortState();
if (!this.retainInMemoryBatchesOnNone) {
resetSortState();
}
} else if (lastKnownOutcome == EMIT) {
final boolean hasMoreRecords = outputSV4.hasNext();
sortState = hasMoreRecords ? SortState.DELIVER : SortState.LOAD;

0 comments on commit 7655ec4

Please sign in to comment.