Skip to content

Commit

Permalink
DRILL-2245: Clean up query setup and execution kickoff in Foreman/Wor…
Browse files Browse the repository at this point in the history
…kManager in order to ensure consistent handling, and avoid hangs and races, with the goal of improving Drillbit robustness.

I did my best to keep these clean when I split them up, but this core commit
may depend on some minor changes in the hygiene commit that is also
associated with this bug, so either both should be applied, or neither.
The core commit should be applied first.

protocol/pom.xml
- updated protocol buffer compiler version to 2.6
- this made slight modifications to the formats of a few committed protobuf
  files

AutoCloseables
- created org.apache.drill.common.AutoCloseables to handle closing these
  quietly

BaseTestQuery, and derivatives
- factored out pieces into QueryTestUtil so they can be reused

DeferredException:
- created this so we can collect exceptions during the shutdown process

Drillbit
- uses AutoCloseables for the WorkManager and for the storeProvider
- allow start() to take a RemoteServiceSet
- private, final, formatting

Foreman
- added new state CANCELLATION_REQUESTED (via UserBitShared.proto) to represent
  the time between request of a cancellation, and acknowledgement from all
  remote endpoints running fragments on a query's behalf
- created ForemanResult to manage interleaving cleanup effects/failure with
  query result state
- does not need to implement Comparable
- does not need to implement Closeable
- thread blocking fixes
- add resultSent flag
- add code to log plan fragments with endpoint assignments
- added finals, cleaned up formatting
- do queue management in acquireQuerySemaphore; local tests pass
- rename getContext() to getQueryContext()
- retain DrillbitContext
- a couple of exception injections for testing
- minor formatting
- TODOs

FragmentContext
- added a DeferredException to collect errors during startup/shutdown sequences

FragmentExecutor
- eliminated CancelableQuery
- use the FragmentContext's DeferredException for errors
- common subexpression elimination
- cleaned up

QueryContext
- removed unnecessary functions (with some outside classes tweaked for this)
- finals, formatting

QueryManager
- merge in QueryStatus
  - affects Foreman, ../batch/ControlHandlerImpl,
    and ../../server/rest/ProfileResources
- made some methods private
- removed unused imports
- add finals and formatting
- variable renaming to improve readability
- formatting
- comments
- TODOs

QueryStatus
- getAsInfo() private
- member renaming
- member access changes
- formatting
- TODOs

QueryTestUtil, BaseTestQuery, TestDrillbitResilience
- make maxWidth a parameter to server startup

SelfCleaningRunnable
- created org.apache.drill.common.SelfCleaningRunnable

SingleRowListener
- created org.apache.drill.SingleRowListener results listener
- use in TestDrillbitResilience

TestComparisonFunctions
- fix not to close the FragmentContext multiple times

TestDrillbitResilience
- created org.apache.drill.exec.server.TestDrillbitResilience to test drillbit
  resilience in the face of exceptions and failures during queries

TestWithZookeeper
- factor out work into ZookeeperHelper so that it can be reused by
  TestDrillbitResilience

UserBitShared
- get rid of unused UNKNOWN_QUERY

WorkEventBus
- rename methods, affects Foreman and ControlHandlerImpl
- remove unused WorkerBee reference
- most members final
- formatting

WorkManager
- Closeable to AutoCloseable
- removed unused incomingFragments Set
- eliminated unnecessary eventThread and pendingTasks by posting Runnables
  directly to executor
- use SelfCleaningRunnable for Foreman management
- FragmentExecutor management uses SelfCleaningRunnable
- runningFragments to be a ConcurrentHashMap; TestTpchDistributed passes
- other improvements due to bee no longer needed in various places
- most members final
- minor formatting
- comments
- TODOs

(*) Created exception injection classes to simulate exceptions for testing
- ExceptionInjection
- ExceptionInjector
- ExceptionInjectionUtil
- TestExceptionInjection

DRILL-2245-hygiene: General code cleanup encountered while working on the rest
of this commit. This includes
- making members final whenever possible
- making members private whenever possible
- making loggers private
- removing unused imports
- removing unused private functions
- removing unused public functions
- removing unused local variables
- removing unused private members
- deleting unused files
- cleaning up formatting
  - adding spaces before braces in conditionals and loop bodies
  - breaking up overly long lines
  - removing extra blank lines

While I tried to keep this clean, this commit may have minor dependencies on
DRILL-2245-core that I missed. The intention is just to break this up for
review purposes. Either both commits should be applied, or neither.
  • Loading branch information
cwestin authored and jacques-n committed Mar 19, 2015
1 parent ff9882b commit 2da618c
Show file tree
Hide file tree
Showing 93 changed files with 4,644 additions and 3,122 deletions.
45 changes: 45 additions & 0 deletions common/src/main/java/org/apache/drill/common/AutoCloseables.java
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.common;

import org.slf4j.Logger;

/**
* Utilities for AutoCloseable classes.
*/
public class AutoCloseables {
/**
* Close an {@link AutoCloseable}, catching and logging any exceptions at
* INFO level.
*
* <p>This can be dangerous if there is any possibility of recovery. See
* the <a href="https://code.google.com/p/guava-libraries/issues/detail?id=1118">
* notes regarding the deprecation of Guava's
* {@link com.google.common.io.Closeables#closeQuietly}</a>.
*
* @param ac the AutoCloseable to close
* @param logger the logger to use to record the exception if there was one
*/
public static void close(final AutoCloseable ac, final Logger logger) {
try {
ac.close();
} catch(Exception e) {
logger.info("Failure on close(): " + e);
}
}
}
121 changes: 121 additions & 0 deletions common/src/main/java/org/apache/drill/common/DeferredException.java
@@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.common;

import com.google.common.base.Preconditions;

/**
* Collects one or more exceptions that may occur, using
* <a href="http://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html#suppressed-exceptions">
* suppressed exceptions</a>.
* When this AutoCloseable is closed, if there was an exception added, it will be thrown. If more than one
* exception was added, then all but the first will be added to the first as suppressed
* exceptions.
*
* <p>This class is thread safe.
*/
public class DeferredException implements AutoCloseable {
private Exception exception = null;
private boolean isClosed = false;

/**
* Add an exception. If this is the first exception added, it will be the one
* that is thrown when this is closed. If not the first exception, then it will
* be added to the suppressed exceptions on the first exception.
*
* @param exception the exception to add
*/
public void addException(final Exception exception) {
Preconditions.checkNotNull(exception);

synchronized(this) {
Preconditions.checkState(!isClosed);

if (this.exception == null) {
this.exception = exception;
} else {
this.exception.addSuppressed(exception);
}
}
}

public void addThrowable(final Throwable throwable) {
Preconditions.checkNotNull(throwable);

if (throwable instanceof Exception) {
addException((Exception) throwable);
return;
}

addException(new RuntimeException(throwable));
}

/**
* Get the deferred exception, if there is one. Note that if this returns null,
* the result could change at any time.
*
* @return the deferred exception, or null
*/
public Exception getException() {
synchronized(this) {
return exception;
}
}

/**
* Close the given AutoCloseable, suppressing any exceptions that are thrown.
* If an exception is thrown, the rules for {@link #addException(Exception)}
* are followed.
*
* @param autoCloseable the AutoCloseable to close; may be null
*/
public void suppressingClose(final AutoCloseable autoCloseable) {
synchronized(this) {
/*
* For the sake of detecting code that doesn't follow the conventions,
* we want this to complain whether the closeable exists or not.
*/
Preconditions.checkState(!isClosed);

if (autoCloseable == null) {
return;
}

try {
autoCloseable.close();
} catch(Exception e) {
addException(e);
}
}
}

@Override
public void close() throws Exception {
synchronized(this) {
Preconditions.checkState(!isClosed);

try {
if (exception != null) {
throw exception;
}
} finally {
isClosed = true;
}
}
}
}
Expand Up @@ -15,15 +15,38 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.drill.exec.work; package org.apache.drill.common;


import org.apache.drill.exec.proto.BitControl.FragmentStatus; /**
* A wrapper for Runnables that provides a hook to do cleanup.
*/
public abstract class SelfCleaningRunnable implements Runnable {
private final Runnable runnable;

/**
* Constructor.
*
* @param runnable the Runnable to wrap
*/
public SelfCleaningRunnable(final Runnable runnable) {
this.runnable = runnable;
}


public interface StatusProvider { @Override
public void run() {
try {
runnable.run();
} finally {
cleanup();
}
}


/** /**
* Provides the current status of the FragmentExecutor's work. * Cleanup.
* @return Status if currently. Null if in another state. *
* <p>Derived classes should put any necessary cleanup in here. This
* is guaranteed to be called, even if the wrapped Runnable throws an
* exception.
*/ */
public FragmentStatus getStatus(); protected abstract void cleanup();
} }
Expand Up @@ -46,8 +46,7 @@
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;


public final class DrillConfig extends NestedConfig{ public final class DrillConfig extends NestedConfig{

// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
private final ObjectMapper mapper; private final ObjectMapper mapper;
private final ImmutableList<String> startupArguments; private final ImmutableList<String> startupArguments;
@SuppressWarnings("restriction") private static final long MAX_DIRECT_MEMORY = sun.misc.VM.maxDirectMemory(); @SuppressWarnings("restriction") private static final long MAX_DIRECT_MEMORY = sun.misc.VM.maxDirectMemory();
Expand Down Expand Up @@ -219,14 +218,12 @@ public String toString() {
return this.root().render(); return this.root().render();
} }


public static void main(String[] args) throws Exception{ public static void main(String[] args) throws Exception {
//"-XX:MaxDirectMemorySize" //"-XX:MaxDirectMemorySize"
DrillConfig config = DrillConfig.create(); DrillConfig config = DrillConfig.create();

} }


public static long getMaxDirectMemory() { public static long getMaxDirectMemory() {
return MAX_DIRECT_MEMORY; return MAX_DIRECT_MEMORY;
} }

} }
Expand Up @@ -202,4 +202,8 @@ public interface ExecConstants {


public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable"; public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false); public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false);

public static final String DRILLBIT_EXCEPTION_INJECTIONS = "drill.exec.testing.exception-injections";
public static final OptionValidator DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR =
new StringValidator(DRILLBIT_EXCEPTION_INJECTIONS, "");
} }
Expand Up @@ -98,7 +98,7 @@ public DrillClient(DrillConfig config, ClusterCoordinator coordinator) {
public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) { public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) {
this.ownsZkConnection = coordinator == null; this.ownsZkConnection = coordinator == null;
this.ownsAllocator = allocator == null; this.ownsAllocator = allocator == null;
this.allocator = allocator == null ? new TopLevelAllocator(config) : allocator; this.allocator = ownsAllocator ? new TopLevelAllocator(config) : allocator;
this.config = config; this.config = config;
this.clusterCoordinator = coordinator; this.clusterCoordinator = coordinator;
this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES); this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES);
Expand Down Expand Up @@ -131,7 +131,7 @@ public void setSupportComplexTypes(boolean supportComplexTypes) {
/** /**
* Connects the client to a Drillbit server * Connects the client to a Drillbit server
* *
* @throws IOException * @throws RpcException
*/ */
public void connect() throws RpcException { public void connect() throws RpcException {
connect(null, new Properties()); connect(null, new Properties());
Expand Down Expand Up @@ -176,7 +176,7 @@ public synchronized void connect(String connect, Properties props) throws RpcExc
connected = true; connected = true;
} }


protected EventLoopGroup createEventLoop(int size, String prefix) { protected static EventLoopGroup createEventLoop(int size, String prefix) {
return TransportCheck.createEventLoopGroup(size, prefix); return TransportCheck.createEventLoopGroup(size, prefix);
} }


Expand Down Expand Up @@ -204,12 +204,8 @@ public synchronized boolean reconnect() {


private void connect(DrillbitEndpoint endpoint) throws RpcException { private void connect(DrillbitEndpoint endpoint) throws RpcException {
FutureHandler f = new FutureHandler(); FutureHandler f = new FutureHandler();
try { client.connect(f, endpoint, props, getUserCredentials());
client.connect(f, endpoint, props, getUserCredentials()); f.checkedGet();
f.checkedGet();
} catch (InterruptedException e) {
throw new RpcException(e);
}
} }


public BufferAllocator getAllocator() { public BufferAllocator getAllocator() {
Expand All @@ -219,6 +215,7 @@ public BufferAllocator getAllocator() {
/** /**
* Closes this client's connection to the server * Closes this client's connection to the server
*/ */
@Override
public void close() { public void close() {
if (this.client != null) { if (this.client != null) {
this.client.close(); this.client.close();
Expand Down Expand Up @@ -286,15 +283,13 @@ public DrillRpcFuture<Ack> cancelQuery(QueryId id) {
* Submits a Logical plan for direct execution (bypasses parsing) * Submits a Logical plan for direct execution (bypasses parsing)
* *
* @param plan the plan to execute * @param plan the plan to execute
* @return a handle for the query result
* @throws RpcException
*/ */
public void runQuery(QueryType type, String plan, UserResultsListener resultsListener) { public void runQuery(QueryType type, String plan, UserResultsListener resultsListener) {
client.submitQuery(resultsListener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build()); client.submitQuery(resultsListener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
} }


private class ListHoldingResultsListener implements UserResultsListener { private class ListHoldingResultsListener implements UserResultsListener {
private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>(); private Vector<QueryResultBatch> results = new Vector<>();
private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create(); private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create();
private UserProtos.RunQuery query ; private UserProtos.RunQuery query ;


Expand Down
Expand Up @@ -294,14 +294,11 @@ public Class<?> getImplementationClass(
if (templateDefinition.getExternalInterface().isAssignableFrom(c)) { if (templateDefinition.getExternalInterface().isAssignableFrom(c)) {
logger.debug("Done compiling (bytecode size={}, time:{} millis).", DrillStringUtils.readable(totalBytecodeSize), (System.nanoTime() - t1) / 1000000); logger.debug("Done compiling (bytecode size={}, time:{} millis).", DrillStringUtils.readable(totalBytecodeSize), (System.nanoTime() - t1) / 1000000);
return c; return c;
} else {
throw new ClassTransformationException("The requested class did not implement the expected interface.");
} }

throw new ClassTransformationException("The requested class did not implement the expected interface.");
} catch (CompileException | IOException | ClassNotFoundException e) { } catch (CompileException | IOException | ClassNotFoundException e) {
throw new ClassTransformationException(String.format("Failure generating transformation classes for value: \n %s", entireClass), e); throw new ClassTransformationException(String.format("Failure generating transformation classes for value: \n %s", entireClass), e);
} }

} }

} }

0 comments on commit 2da618c

Please sign in to comment.