Skip to content

Commit

Permalink
DRILL-2383: Support to inject exceptions and pauses in various compon…
Browse files Browse the repository at this point in the history
…ents of Drill

+ Controls are fired only if assertions are enabled
+ Controls can be introduced in any class that has access to FragmentContext/QueryContext
+ Controls can be fired by altering the DRILLBIT_CONTROL_INJECTIONS session option
+ Renames: SimulatedExceptions => ExecutionControls, ExceptionInjector => ExecutionControlsInjector
+ Added injection sites in Foreman, DrillSqlWorker, FragmentExecutor
+ Unit tests in TestDrillbitResilience, TestExceptionInjection and TestPauseInjection

Other commits included:
+ DRILL-2437: Moved ExecutionControls from DrillbitContext to FragmentContext/QueryContext
+ DRILL-2382: Added address and port to Injection to specify drillbit
+ DRILL-2384: Added QueryState to SingleRowListener and assert that state is COMPLETED while testing

Other edits:
+ Support for short lived session options in SessionOptionManager (using TTL in OptionValidator)
+ Introduced query count in UserSession
+ Added QueryState to queryCompleted() in UserResultsListener to check if COMPLETED/CANCELED
+ Added JSONStringValidator to TypeValidators
+ Log query id as string in DrillClient, WorkEventBus, QueryResultHandler
+ Use try..catch block only around else clause for OptionList in FragmentContext
+ Fixed drillbitContext spelling error in QueryContext
+ Fixed state transition when cancel() before run() in FragmentExecutor
+ Do not call setLocalOption twice in FallbackOptionManager
+ Show explicitly that submitWork() returns queryId in UserServer
+ Updated protocol/readme.txt to include an alternative way to generate sources
  • Loading branch information
Sudheesh Katkam authored and parthchandra committed Apr 21, 2015
1 parent 21dfe7a commit be8d953
Show file tree
Hide file tree
Showing 46 changed files with 1,682 additions and 682 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
import org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidator;
import org.apache.drill.exec.server.options.TypeValidators.StringValidator;
import org.apache.drill.exec.testing.ExecutionControls;

public interface ExecConstants {
public static final String ZK_RETRY_TIMES = "drill.exec.zk.retry.count";
Expand Down Expand Up @@ -216,7 +217,7 @@ public interface ExecConstants {
public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false);

public static final String DRILLBIT_EXCEPTION_INJECTIONS = "drill.exec.testing.exception-injections";
public static final OptionValidator DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR =
new StringValidator(DRILLBIT_EXCEPTION_INJECTIONS, "");
public static final String DRILLBIT_CONTROL_INJECTIONS = "drill.exec.testing.controls";
public static final OptionValidator DRILLBIT_CONTROLS_VALIDATOR =
new ExecutionControls.ControlsOptionValidator(DRILLBIT_CONTROL_INJECTIONS, ExecutionControls.DEFAULT_CONTROLS, 1);
}
Expand Up @@ -43,6 +43,7 @@
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.proto.UserProtos.Property;
Expand Down Expand Up @@ -326,7 +327,7 @@ public void submissionFailed(UserException ex) {
}

@Override
public void queryCompleted() {
public void queryCompleted(QueryState state) {
future.set(results);
}

Expand All @@ -352,7 +353,9 @@ public List<QueryDataBatch> getResults() throws RpcException{

@Override
public void queryIdArrived(QueryId queryId) {
logger.debug( "Query ID arrived: {}", queryId );
if (logger.isDebugEnabled()) {
logger.debug("Query ID arrived: {}", QueryIdHelper.getQueryId(queryId));
}
}

}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
Expand Down Expand Up @@ -60,7 +61,7 @@ public void submissionFailed(UserException ex) {
}

@Override
public void queryCompleted() {
public void queryCompleted(QueryState state) {
allocator.close();
latch.countDown();
System.out.println("Total rows returned: " + count.get());
Expand Down
Expand Up @@ -49,6 +49,7 @@
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.PartitionExplorer;
import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.drill.exec.work.batch.IncomingBuffers;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -63,7 +64,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {

private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
private final DrillbitContext context;
private final UserClientConnection connection;
private final UserClientConnection connection; // is null if attached to non-root fragment
private final FragmentStats stats;
private final FunctionImplementationRegistry funcRegistry;
private final BufferAllocator allocator;
Expand All @@ -73,6 +74,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
private final OptionManager fragmentOptions;
private final BufferManager bufferManager;
private ExecutorState executorState;
private final ExecutionControls executionControls;

private final SendingAccountor sendingAccountor = new SendingAccountor();
private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
Expand All @@ -98,17 +100,19 @@ public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragm
logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
logger.debug("Fragment max allocation: {}", fragment.getMemMax());

try {
final OptionList list;
if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
list = new OptionList();
} else {
final OptionList list;
if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
list = new OptionList();
} else {
try {
list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
} catch (final Exception e) {
throw new ExecutionSetupException("Failure while reading plan options.", e);
}
fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
} catch (final Exception e) {
throw new ExecutionSetupException("Failure while reading plan options.", e);
}
fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);

executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());

// Add the fragment context to the root allocator.
// The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
Expand Down Expand Up @@ -288,6 +292,10 @@ public void setFragmentLimit(final long limit) {
allocator.setFragmentLimit(limit);
}

public ExecutionControls getExecutionControls() {
return executionControls;
}

@Override
public void close() {
waitForSendComplete();
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.drill.exec.store.PartitionExplorer;
import org.apache.drill.exec.store.PartitionExplorerImpl;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.testing.ExecutionControls;

// TODO except for a couple of tests, this is only created by Foreman
// TODO the many methods that just return drillbitContext.getXxx() should be replaced with getDrillbitContext()
Expand All @@ -52,6 +53,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
private final OptionManager queryOptions;
private final PlannerSettings plannerSettings;
private final DrillOperatorTable table;
private final ExecutionControls executionControls;

private final BufferAllocator allocator;
private final BufferManager bufferManager;
Expand All @@ -65,10 +67,11 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
*/
private boolean closed = false;

public QueryContext(final UserSession session, final DrillbitContext drllbitContext) {
this.drillbitContext = drllbitContext;
public QueryContext(final UserSession session, final DrillbitContext drillbitContext) {
this.drillbitContext = drillbitContext;
this.session = session;
queryOptions = new QueryOptionManager(session.getOptions());
executionControls = new ExecutionControls(queryOptions, drillbitContext.getEndpoint());
plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry());
plannerSettings.setNumEndPoints(drillbitContext.getBits().size());
table = new DrillOperatorTable(getFunctionRegistry());
Expand All @@ -78,7 +81,7 @@ public QueryContext(final UserSession session, final DrillbitContext drllbitCont
queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone);

try {
allocator = drllbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES,
allocator = drillbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES,
MAX_OFF_HEAP_ALLOCATION_IN_BYTES, false);
} catch (OutOfMemoryException e) {
throw new DrillRuntimeException("Error creating off-heap allocator for planning context.",e);
Expand All @@ -87,7 +90,6 @@ public QueryContext(final UserSession session, final DrillbitContext drllbitCont
bufferManager = new BufferManager(this.allocator, null);
}


public PlannerSettings getPlannerSettings() {
return plannerSettings;
}
Expand Down Expand Up @@ -120,6 +122,10 @@ public OptionManager getOptions() {
return queryOptions;
}

public ExecutionControls getExecutionControls() {
return executionControls;
}

public DrillbitEndpoint getCurrentEndpoint() {
return drillbitContext.getEndpoint();
}
Expand Down
Expand Up @@ -34,9 +34,11 @@
import org.apache.drill.exec.record.RecordBatch.IterOutcome;

import com.google.common.base.Preconditions;
import org.apache.drill.exec.testing.ExecutionControlsInjector;

public class ScreenCreator implements RootCreator<Screen>{
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(ScreenCreator.class);



Expand Down Expand Up @@ -107,6 +109,7 @@ public boolean innerNext() {
materializer = new VectorRecordMaterializer(context, incoming);
//$FALL-THROUGH$
case OK:
injector.injectPause(context.getExecutionControls(), "sending-data", logger);
QueryWritableBatch batch = materializer.convertNext();
updateStats(batch);
stats.startWait();
Expand Down Expand Up @@ -139,6 +142,7 @@ public void stop() {
if (!oContext.isClosed()) {
internalStop();
}
injector.injectPause(context.getExecutionControls(), "send-complete", logger);
}

RecordBatch getIncoming() {
Expand Down
Expand Up @@ -45,7 +45,9 @@
import org.apache.drill.exec.planner.sql.parser.SqlCreateTable;
import org.apache.drill.exec.planner.sql.parser.impl.DrillParserWithCompoundIdConverter;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.foreman.ForemanException;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.eigenbase.rel.RelCollationTraitDef;
import org.eigenbase.rel.rules.ReduceExpressionsRule;
Expand All @@ -60,7 +62,8 @@
import org.eigenbase.sql.parser.SqlParser;

public class DrillSqlWorker {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class);
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class);
private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DrillSqlWorker.class);

private final Planner planner;
private final HepPlanner hepPlanner;
Expand Down Expand Up @@ -119,6 +122,7 @@ public PhysicalPlan getPlan(String sql) throws SqlParseException, ValidationExce
public PhysicalPlan getPlan(String sql, Pointer<String> textPlan) throws ForemanSetupException {
SqlNode sqlNode;
try {
injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class);
sqlNode = planner.parse(sql);
} catch (SqlParseException e) {
throw new QueryInputException("Failure parsing SQL. " + e.getMessage(), e);
Expand Down
Expand Up @@ -45,13 +45,17 @@ public class WorkEventBus {
.build();

public void removeFragmentStatusListener(final QueryId queryId) {
logger.debug("Removing fragment status listener for queryId {}.", queryId);
if (logger.isDebugEnabled()) {
logger.debug("Removing fragment status listener for queryId {}.", QueryIdHelper.getQueryId(queryId));
}
listeners.remove(queryId);
}

public void addFragmentStatusListener(final QueryId queryId, final FragmentStatusListener listener)
throws ForemanSetupException {
logger.debug("Adding fragment status listener for queryId {}.", queryId);
if (logger.isDebugEnabled()) {
logger.debug("Adding fragment status listener for queryId {}.", QueryIdHelper.getQueryId(queryId));
}
final FragmentStatusListener old = listeners.putIfAbsent(queryId, listener);
if (old != null) {
throw new ForemanSetupException (
Expand All @@ -69,7 +73,9 @@ public void statusUpdate(final FragmentStatus status) {
}

public void addFragmentManager(final FragmentManager fragmentManager) {
logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
if (logger.isDebugEnabled()) {
logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
}
final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
if (old != null) {
throw new IllegalStateException(
Expand All @@ -84,7 +90,9 @@ public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
// check if this was a recently canceled fragment. If so, throw away message.
if (recentlyFinishedFragments.asMap().containsKey(handle)) {
logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
if (logger.isDebugEnabled()) {
logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
}
return null;
}

Expand All @@ -98,7 +106,9 @@ public FragmentManager getFragmentManager(final FragmentHandle handle) throws Fr
}

public void removeFragmentManager(final FragmentHandle handle) {
logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
if (logger.isDebugEnabled()) {
logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
}
recentlyFinishedFragments.put(handle, 1);
managers.remove(handle);
}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcException;
Expand Down Expand Up @@ -114,7 +115,7 @@ public void resultArrived( ByteBuf pBody ) throws RpcException {
// A successful completion/canceled case--pass on via resultArrived

try {
resultsListener.queryCompleted();
resultsListener.queryCompleted(queryState);
} catch ( Exception e ) {
resultsListener.submissionFailed(UserException.systemError(e).build());
}
Expand Down Expand Up @@ -198,8 +199,8 @@ private void failAll() {
private static class BufferingResultsListener implements UserResultsListener {

private ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
private volatile boolean finished = false;
private volatile UserException ex;
private volatile QueryState queryState;
private volatile UserResultsListener output;
private volatile ConnectionThrottle throttle;

Expand All @@ -212,20 +213,22 @@ public boolean transferTo(UserResultsListener l) {
if (ex != null) {
l.submissionFailed(ex);
return true;
} else if (finished) {
l.queryCompleted();
} else if (queryState != null) {
l.queryCompleted(queryState);
return true;
}

return finished;
return false;
}
}

@Override
public void queryCompleted() {
finished = true;
public void queryCompleted(QueryState state) {
assert queryState == null;
this.queryState = state;
synchronized (this) {
if (output != null) {
output.queryCompleted();
output.queryCompleted(state);
}
}
}
Expand All @@ -245,7 +248,11 @@ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {

@Override
public void submissionFailed(UserException ex) {
finished = true;
assert queryState == null;
// there is one case when submissionFailed() is called even though the query didn't fail on the server side
// it happens when UserResultsListener.batchArrived() throws an exception that will be passed to
// submissionFailed() by QueryResultHandler.dataArrived()
queryState = QueryState.FAILED;
synchronized (this) {
if (output == null) {
this.ex = ex;
Expand All @@ -255,10 +262,6 @@ public void submissionFailed(UserException ex) {
}
}

public boolean isFinished() {
return finished;
}

@Override
public void queryIdArrived(QueryId queryId) {
}
Expand All @@ -281,8 +284,10 @@ public void failed(RpcException ex) {
@Override
public void success(QueryId queryId, ByteBuf buf) {
resultsListener.queryIdArrived(queryId);
logger.debug("Received QueryId {} successfully. Adding results listener {}.",
queryId, resultsListener);
if (logger.isDebugEnabled()) {
logger.debug("Received QueryId {} successfully. Adding results listener {}.",
QueryIdHelper.getQueryId(queryId), resultsListener);
}
UserResultsListener oldListener =
queryIdToResultsListenersMap.putIfAbsent(queryId, resultsListener);

Expand Down

0 comments on commit be8d953

Please sign in to comment.