Skip to content

Commit

Permalink
DRILL-3732: Drill leaks memory if external sort hits out of disk spac…
Browse files Browse the repository at this point in the history
…e exception

- ExternalSort.mergeAndSpill() cleans all it's data in case an errors occurs while it's spilling to disk
- made BatchGroup AutoCloseable so it can easily be closed with AutoCloseables.close() if an error occurs
- added injection site while External sort is spilling to disk
- added unit test that forces a 2 batch query to spill to disk and injects an exception while it does so

this closes #147
  • Loading branch information
adeneche committed Sep 21, 2015
1 parent 092903d commit 3c89b30
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 13 deletions.
Expand Up @@ -39,7 +39,7 @@

import com.google.common.base.Stopwatch;

public class BatchGroup implements VectorAccessible {
public class BatchGroup implements VectorAccessible, AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchGroup.class);

private VectorContainer currentContainer;
Expand Down Expand Up @@ -142,7 +142,8 @@ public VectorContainer getContainer() {
return currentContainer;
}

public void cleanup() throws IOException {
@Override
public void close() throws IOException {
currentContainer.zeroVectors();
if (sv2 != null) {
sv2.clear();
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
Expand Down Expand Up @@ -127,6 +128,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {

public static final String INTERRUPTION_AFTER_SORT = "after-sort";
public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
public static final String INTERRUPTION_WHILE_SPILLING = "spilling";


public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
Expand Down Expand Up @@ -164,11 +166,11 @@ public SelectionVector4 getSelectionVector4() {
return sv4;
}

private void cleanupBatchGroups(Collection<BatchGroup> groups) {
private void closeBatchGroups(Collection<BatchGroup> groups) {
for (BatchGroup group: groups) {
try {
group.cleanup();
} catch (IOException e) {
group.close();
} catch (Exception e) {
// collect all failure and make sure to cleanup all remaining batches
// Originally we would have thrown a RuntimeException that would propagate to FragmentExecutor.closeOutResources()
// where it would have been passed to context.fail()
Expand All @@ -183,11 +185,11 @@ private void cleanupBatchGroups(Collection<BatchGroup> groups) {
public void close() {
try {
if (batchGroups != null) {
cleanupBatchGroups(batchGroups);
closeBatchGroups(batchGroups);
batchGroups = null;
}
if (spilledBatchGroups != null) {
cleanupBatchGroups(spilledBatchGroups);
closeBatchGroups(spilledBatchGroups);
spilledBatchGroups = null;
}
} finally {
Expand Down Expand Up @@ -534,7 +536,7 @@ public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws Schem

String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++);
BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator());

boolean threw = true; // true if an exception is thrown in the try block below
logger.info("Merging and spilling to {}", outputFile);
try {
while ((count = copier.next(targetRecordCount)) > 0) {
Expand All @@ -543,13 +545,20 @@ public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws Schem
// note that addBatch also clears the outputContainer
newGroup.addBatch(outputContainer);
}
injector.injectChecked(context.getExecutionControls(), INTERRUPTION_WHILE_SPILLING, IOException.class);
newGroup.closeOutputStream();
for (BatchGroup group : batchGroupList) {
group.cleanup();
}
hyperBatch.clear();
threw = false; // this should always be the last statement of this try block to make sure we cleanup properly
} catch (IOException e) {
throw new RuntimeException(e);
throw UserException.resourceError(e)
.message("External Sort encountered an error while spilling to disk")
.build(logger);
} finally {
hyperBatch.clear();
cleanAfterMergeAndSpill(batchGroupList, threw);
if (threw) {
// we only need to cleanup newGroup if spill failed
AutoCloseables.close(newGroup, logger);
}
}
takeOwnership(c1); // transfer ownership from copier allocator to external sort allocator
long bufSize = getBufferSize(c1);
Expand All @@ -559,6 +568,25 @@ public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws Schem
return newGroup;
}

/**
* Make sure we cleanup properly after merge and spill.<br>If there was any error during the spilling,
* we cleanup the resources silently, otherwise we throw any exception we hit during the cleanup
*
* @param batchGroups spilled batch groups
* @param silently true to log any exception that happens during cleanup, false to throw it
*/
private void cleanAfterMergeAndSpill(final List<BatchGroup> batchGroups, boolean silently) {
try {
AutoCloseables.close(batchGroups.toArray(new BatchGroup[batchGroups.size()]));
} catch (Exception e) {
if (silently) {
logger.warn("Error while cleaning up after merge and spill", e);
} else {
throw new RuntimeException("Error while cleaning up after merge and spill", e);
}
}
}

private void takeOwnership(VectorAccessible batch) {
for (VectorWrapper<?> w : batch) {
DrillBuf[] bufs = w.getValueVector().getBuffers(false);
Expand Down
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.xsort;

import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
import org.apache.drill.exec.testing.Controls;
import org.apache.drill.exec.testing.ControlsInjectionUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.Properties;

/**
* Testing External Sort's spilling to disk.
* <br>
* This class changes the following Drill property to force external sort to spill after the 2nd batch:
* {@link ExecConstants#EXTERNAL_SORT_SPILL_THRESHOLD} = 1
* <br>
* {@link ExecConstants#EXTERNAL_SORT_SPILL_GROUP_SIZE} = 1
*/
public class TestSortSpillWithException extends BaseTestQuery {
private static final String TEST_RES_PATH = TestTools.getWorkingPath() + "/src/test/resources";

@BeforeClass
public static void initCluster() {
// make sure memory sorter outputs 20 rows per batch
final Properties props = cloneDefaultTestConfigProperties();
props.put(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, "1");
props.put(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, "1");

updateTestCluster(1, DrillConfig.create(props));
}

@Test
public void testSpilLeak() throws Exception {
// inject exception in sort while spilling
final String controls = Controls.newBuilder()
.addExceptionOnBit(
ExternalSortBatch.class,
ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
IOException.class,
bits[0].getContext().getEndpoint())
.build();
ControlsInjectionUtil.setControls(client, controls);
// run a simple order by query
try {
test("select employee_id from dfs_test.`%s/xsort/2batches` order by employee_id", TEST_RES_PATH);
fail("Query should have failed!");
} catch (UserRemoteException e) {
assertEquals(ErrorType.RESOURCE, e.getErrorType());
assertTrue("Incorrect error message",
e.getMessage().contains("External Sort encountered an error while spilling to disk"));
}
}
}
20 changes: 20 additions & 0 deletions exec/java-exec/src/test/resources/xsort/2batches/0.data.json
@@ -0,0 +1,20 @@
{ "employee_id":24 }
{ "employee_id":36 }
{ "employee_id":11 }
{ "employee_id":27 }
{ "employee_id":23 }
{ "employee_id":25 }
{ "employee_id":8 }
{ "employee_id":12 }
{ "employee_id":29 }
{ "employee_id":33 }
{ "employee_id":19 }
{ "employee_id":22 }
{ "employee_id":31 }
{ "employee_id":30 }
{ "employee_id":35 }
{ "employee_id":5 }
{ "employee_id":4 }
{ "employee_id":16 }
{ "employee_id":13 }
{ "employee_id":9 }
20 changes: 20 additions & 0 deletions exec/java-exec/src/test/resources/xsort/2batches/1.data.json
@@ -0,0 +1,20 @@
{ "employee_id":20 }
{ "employee_id":7 }
{ "employee_id":38 }
{ "employee_id":3 }
{ "employee_id":15 }
{ "employee_id":2 }
{ "employee_id":39 }
{ "employee_id":37 }
{ "employee_id":10 }
{ "employee_id":18 }
{ "employee_id":17 }
{ "employee_id":32 }
{ "employee_id":34 }
{ "employee_id":1 }
{ "employee_id":21 }
{ "employee_id":14 }
{ "employee_id":6 }
{ "employee_id":26 }
{ "employee_id":0 }
{ "employee_id":28 }

0 comments on commit 3c89b30

Please sign in to comment.