Skip to content

Commit

Permalink
DRILL-2510: Fix unclosed allocators detected in Java shutdown hook
Browse files Browse the repository at this point in the history
TestWriteToDisk
- fix warnings, including closing unclosed resources with try-with-resources

TestAllocators
- close unclosed resources
  doesn't use try-with-resources because this stresses interleaved allocation
  operations that wouldn't necessarily be nested

StackTrace
- created convenience class for tracking stack traces

Drillbit
- used StackTrace to track location of Drillbit starts for shutdown hook
  error messages so we can more easily identify errant tests from just the logs
  in future
- added an isClosed flag to detect that the Drillbit has already been closed
  to avoid complaints about double closures that happen in many tests that
  explicitly close their drillbits (the shutdown hook then closes them again,
  causing bogus complaints)
- synchronized close()
- made the id generator in the ShutdownThread an AtomicInteger
  • Loading branch information
cwestin authored and jacques-n committed Mar 21, 2015
1 parent bb1d761 commit f1b59ed
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 75 deletions.
73 changes: 73 additions & 0 deletions common/src/main/java/org/apache/drill/common/StackTrace.java
@@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.common;

import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.util.Arrays;

/**
* Convenient way of obtaining and manipulating stack traces for debugging.
*/
public class StackTrace {
private final StackTraceElement[] stackTraceElements;

/**
* Constructor. Captures the current stack trace.
*/
public StackTrace() {
// skip over the first element so that we don't include this constructor call
final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
stackTraceElements = Arrays.copyOfRange(stack, 1, stack.length - 1);
}

/**
* Write the stack trace.
*
* @param writer where to write it
* @param indent how many spaces to indent each line
*/
public void write(final Writer writer, final int indent) {
// create the indentation string
final char[] indentation = new char[indent];
Arrays.fill(indentation, ' ');

try {
// write the stack trace
for(StackTraceElement ste : stackTraceElements) {
writer.write(indentation);
writer.write(ste.getClassName());
writer.write('.');
writer.write(ste.getMethodName());
writer.write(':');
writer.write(ste.getLineNumber());
writer.write('\n');
}
} catch(IOException e) {
throw new RuntimeException("couldn't write", e);
}
}

@Override
public String toString() {
final StringWriter sw = new StringWriter();
write(sw, 0);
return sw.toString();
}
}
Expand Up @@ -17,7 +17,10 @@
*/
package org.apache.drill.exec.server;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.StackTrace;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
Expand Down Expand Up @@ -57,6 +60,8 @@ public class Drillbit implements AutoCloseable {
Environment.logEnv("Drillbit environment:.", logger);
}

private boolean isClosed = false;

public static Drillbit start(final StartupOptions options) throws DrillbitStartupException {
return start(DrillConfig.create(options.getConfigLocation()), null);
}
Expand Down Expand Up @@ -237,11 +242,16 @@ public void run() throws Exception {
registrationHandle = coord.register(md);
startJetty();

Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this, new StackTrace()));
}

@Override
public void close() {
public synchronized void close() {
// avoid complaints about double closing
if (isClosed) {
return;
}

if (coord != null && registrationHandle != null) {
coord.unregister(registrationHandle);
}
Expand All @@ -267,22 +277,44 @@ public void close() {
Closeables.closeQuietly(context);

logger.info("Shutdown completed.");
isClosed = true;
}

/**
* Shutdown hook for Drillbit. Closes the drillbit, and reports on errors that
* occur during closure, as well as the location the drillbit was started from.
*/
private static class ShutdownThread extends Thread {
private static int idCounter = 0;
private final static AtomicInteger idCounter = new AtomicInteger(0);
private final Drillbit drillbit;

ShutdownThread( Drillbit drillbit ) {
private final StackTrace stackTrace;

/**
* Constructor.
*
* @param drillbit the drillbit to close down
* @param stackTrace the stack trace from where the Drillbit was started;
* use new StackTrace() to generate this
*/
public ShutdownThread(final Drillbit drillbit, final StackTrace stackTrace) {
this.drillbit = drillbit;
idCounter++;
setName("Drillbit-ShutdownHook#" + idCounter );
this.stackTrace = stackTrace;
/*
* TODO should we try to determine a test class name?
* See https://blogs.oracle.com/tor/entry/how_to_determine_the_junit
*/

setName("Drillbit-ShutdownHook#" + idCounter.getAndIncrement());
}

@Override
public void run() {
logger.info("Received shutdown request.");
drillbit.close();
try {
drillbit.close();
} catch(Exception e) {
throw new RuntimeException("Caught exception closing Drillbit started from\n" + stackTrace, e);
}
}
}

Expand Down
Expand Up @@ -47,69 +47,87 @@

import com.google.common.collect.Lists;

public class TestWriteToDisk extends ExecTest{

public class TestWriteToDisk extends ExecTest {
@Test
@SuppressWarnings("static-method")
public void test() throws Exception {
List<ValueVector> vectorList = Lists.newArrayList();
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
DrillConfig config = DrillConfig.create();
Drillbit bit = new Drillbit(config, serviceSet);
bit.run();
DrillbitContext context = bit.getContext();
final List<ValueVector> vectorList = Lists.newArrayList();
final DrillConfig config = DrillConfig.create();
try (final RemoteServiceSet serviceSet = RemoteServiceSet
.getLocalServiceSet();
final Drillbit bit = new Drillbit(config, serviceSet)) {
bit.run();
final DrillbitContext context = bit.getContext();

MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.INT));
IntVector intVector = (IntVector)TypeHelper.getNewVector(intField, context.getAllocator());
MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.VARBINARY));
VarBinaryVector binVector = (VarBinaryVector)TypeHelper.getNewVector(binField, context.getAllocator());
AllocationHelper.allocate(intVector, 4, 4);
AllocationHelper.allocate(binVector, 4, 5);
vectorList.add(intVector);
vectorList.add(binVector);
final MaterializedField intField = MaterializedField.create(
new SchemaPath("int", ExpressionPosition.UNKNOWN),
Types.required(TypeProtos.MinorType.INT));
final MaterializedField binField = MaterializedField.create(
new SchemaPath("binary", ExpressionPosition.UNKNOWN),
Types.required(TypeProtos.MinorType.VARBINARY));
try (final IntVector intVector = (IntVector) TypeHelper.getNewVector(intField, context.getAllocator());
final VarBinaryVector binVector =
(VarBinaryVector) TypeHelper.getNewVector(binField, context.getAllocator())) {
AllocationHelper.allocate(intVector, 4, 4);
AllocationHelper.allocate(binVector, 4, 5);
vectorList.add(intVector);
vectorList.add(binVector);

intVector.getMutator().setSafe(0, 0); binVector.getMutator().setSafe(0, "ZERO".getBytes());
intVector.getMutator().setSafe(1, 1); binVector.getMutator().setSafe(1, "ONE".getBytes());
intVector.getMutator().setSafe(2, 2); binVector.getMutator().setSafe(2, "TWO".getBytes());
intVector.getMutator().setSafe(3, 3); binVector.getMutator().setSafe(3, "THREE".getBytes());
intVector.getMutator().setValueCount(4);
binVector.getMutator().setValueCount(4);
intVector.getMutator().setSafe(0, 0);
binVector.getMutator().setSafe(0, "ZERO".getBytes());
intVector.getMutator().setSafe(1, 1);
binVector.getMutator().setSafe(1, "ONE".getBytes());
intVector.getMutator().setSafe(2, 2);
binVector.getMutator().setSafe(2, "TWO".getBytes());
intVector.getMutator().setSafe(3, 3);
binVector.getMutator().setSafe(3, "THREE".getBytes());
intVector.getMutator().setValueCount(4);
binVector.getMutator().setValueCount(4);

VectorContainer container = new VectorContainer();
container.addCollection(vectorList);
container.setRecordCount(4);
WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getAllocator());
VectorContainer container = new VectorContainer();
container.addCollection(vectorList);
container.setRecordCount(4);
WritableBatch batch = WritableBatch.getBatchNoHVWrap(
container.getRecordCount(), container, false);
VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(
batch, context.getAllocator());

Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/tmp/drillSerializable");
if (fs.exists(path)) {
fs.delete(path, false);
}
FSDataOutputStream out = fs.create(path);
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");

wrap.writeToStream(out);
out.close();
final VectorAccessibleSerializable newWrap = new VectorAccessibleSerializable(
context.getAllocator());
try (final FileSystem fs = FileSystem.get(conf)) {
final Path path = new Path("/tmp/drillSerializable");
if (fs.exists(path)) {
fs.delete(path, false);
}

FSDataInputStream in = fs.open(path);
VectorAccessibleSerializable newWrap = new VectorAccessibleSerializable(context.getAllocator());
newWrap.readFromStream(in);
fs.close();
try (final FSDataOutputStream out = fs.create(path)) {
wrap.writeToStream(out);
out.close();
}

VectorAccessible newContainer = newWrap.get();
for (VectorWrapper w : newContainer) {
ValueVector vv = w.getValueVector();
int values = vv.getAccessor().getValueCount();
for (int i = 0; i < values; i++) {
Object o = vv.getAccessor().getObject(i);
if (o instanceof byte[]) {
System.out.println(new String((byte[])o));
} else {
System.out.println(o);
try (final FSDataInputStream in = fs.open(path)) {
newWrap.readFromStream(in);
}
}

final VectorAccessible newContainer = newWrap.get();
for (VectorWrapper<?> w : newContainer) {
try (ValueVector vv = w.getValueVector()) {
int values = vv.getAccessor().getValueCount();
for (int i = 0; i < values; i++) {
final Object o = vv.getAccessor().getObject(i);
if (o instanceof byte[]) {
System.out.println(new String((byte[]) o));
} else {
System.out.println(o);
}
}
}
}
}
}
}

}
Expand Up @@ -57,24 +57,16 @@ public class TestAllocators {
}
};

static String planFile="/physical_allocator_test.json";

BufferAllocator rootAllocator;
DrillConfig config;
Drillbit bit;
RemoteServiceSet serviceSet;
DrillbitContext bitContext;
private final static String planFile="/physical_allocator_test.json";

@Test
public void testAllocators() throws Exception {

// Setup a drillbit (initializes a root allocator)

config = DrillConfig.create(TEST_CONFIGURATIONS);
serviceSet = RemoteServiceSet.getLocalServiceSet();
bit = new Drillbit(config, serviceSet);
final DrillConfig config = DrillConfig.create(TEST_CONFIGURATIONS);
final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
final Drillbit bit = new Drillbit(config, serviceSet);
bit.run();
bitContext = bit.getContext();
final DrillbitContext bitContext = bit.getContext();
FunctionImplementationRegistry functionRegistry = bitContext.getFunctionImplementationRegistry();
StoragePluginRegistry storageRegistry = new StoragePluginRegistry(bitContext);

Expand Down Expand Up @@ -185,7 +177,11 @@ public void testAllocators() throws Exception {
b31a.release();
oContext31.close();

}
fragmentContext1.close();
fragmentContext2.close();
fragmentContext3.close();

bit.close();
serviceSet.close();
}
}

0 comments on commit f1b59ed

Please sign in to comment.