Skip to content

Commit

Permalink
DRILL-2697: Pauses sites wait indefinitely for a resume signal DrillC…
Browse files Browse the repository at this point in the history
…lient sends a resume signal to UserServer. UserServer triggers a resume call in the correct Foreman. Foreman resumes all pauses related to the query through the Control layer.

+ Better error messages and more tests in TestDrillbitResilience and TestPauseInjection
+ Added execution controls to operator context
+ Removed ControlMessageHandler interface, renamed ControlHandlerImpl to ControlMessageHandler
+ Added CountDownLatchInjection, useful in cases like ParititionedSender that spawns multiple threads
  • Loading branch information
Sudheesh Katkam authored and vkorukanti committed May 10, 2015
1 parent 4e59633 commit f8e5e61
Show file tree
Hide file tree
Showing 34 changed files with 1,106 additions and 454 deletions.
Expand Up @@ -312,10 +312,18 @@ private UserBitShared.UserCredentials getUserCredentials() {
} }


public DrillRpcFuture<Ack> cancelQuery(QueryId id) { public DrillRpcFuture<Ack> cancelQuery(QueryId id) {
logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id)); if(logger.isDebugEnabled()) {
logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
}
return client.send(RpcType.CANCEL_QUERY, id, Ack.class); return client.send(RpcType.CANCEL_QUERY, id, Ack.class);
} }


public DrillRpcFuture<Ack> resumeQuery(final QueryId queryId) {
if(logger.isDebugEnabled()) {
logger.debug("Resuming query {}", QueryIdHelper.getQueryId(queryId));
}
return client.send(RpcType.RESUME_PAUSED_QUERY, queryId, Ack.class);
}


/** /**
* Submits a Logical plan for direct execution (bypasses parsing) * Submits a Logical plan for direct execution (bypasses parsing)
Expand Down
Expand Up @@ -23,6 +23,7 @@


import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.testing.ExecutionControls;


public abstract class OperatorContext { public abstract class OperatorContext {


Expand All @@ -36,6 +37,8 @@ public abstract class OperatorContext {


public abstract OperatorStats getStats(); public abstract OperatorStats getStats();


public abstract ExecutionControls getExecutionControls();

public static int getChildCount(PhysicalOperator popConfig) { public static int getChildCount(PhysicalOperator popConfig) {
Iterator<PhysicalOperator> iter = popConfig.iterator(); Iterator<PhysicalOperator> iter = popConfig.iterator();
int i = 0; int i = 0;
Expand Down
Expand Up @@ -24,11 +24,13 @@
import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalOperator;


import com.carrotsearch.hppc.LongObjectOpenHashMap; import com.carrotsearch.hppc.LongObjectOpenHashMap;
import org.apache.drill.exec.testing.ExecutionControls;


class OperatorContextImpl extends OperatorContext implements AutoCloseable { class OperatorContextImpl extends OperatorContext implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class); static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);


private final BufferAllocator allocator; private final BufferAllocator allocator;
private final ExecutionControls executionControls;
private boolean closed = false; private boolean closed = false;
private PhysicalOperator popConfig; private PhysicalOperator popConfig;
private OperatorStats stats; private OperatorStats stats;
Expand All @@ -42,13 +44,15 @@ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context,


OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig)); OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
this.stats = context.getStats().getOperatorStats(def, allocator); this.stats = context.getStats().getOperatorStats(def, allocator);
executionControls = context.getExecutionControls();
} }


public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException { public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException {
this.applyFragmentLimit=applyFragmentLimit; this.applyFragmentLimit=applyFragmentLimit;
this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit); this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
this.popConfig = popConfig; this.popConfig = popConfig;
this.stats = stats; this.stats = stats;
executionControls = context.getExecutionControls();
} }


public DrillBuf replace(DrillBuf old, int newSize) { public DrillBuf replace(DrillBuf old, int newSize) {
Expand All @@ -70,6 +74,10 @@ public DrillBuf getManagedBuffer(int size) {
return newBuf; return newBuf;
} }


public ExecutionControls getExecutionControls() {
return executionControls;
}

public BufferAllocator getAllocator() { public BufferAllocator getAllocator() {
if (allocator == null) { if (allocator == null) {
throw new UnsupportedOperationException("Operator context does not have an allocator"); throw new UnsupportedOperationException("Operator context does not have an allocator");
Expand Down
Expand Up @@ -52,7 +52,6 @@ public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch




static class ScreenRoot extends BaseRootExec { static class ScreenRoot extends BaseRootExec {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
private final RecordBatch incoming; private final RecordBatch incoming;
private final FragmentContext context; private final FragmentContext context;
private final AccountingUserConnection userConnection; private final AccountingUserConnection userConnection;
Expand Down Expand Up @@ -136,6 +135,11 @@ RecordBatch getIncoming() {
} }




@Override
public void close() throws Exception {
injector.injectPause(context.getExecutionControls(), "send-complete", logger);
super.close();
}
} }




Expand Down
Expand Up @@ -40,16 +40,18 @@ public static RpcConfig getMapping(DrillConfig config) {
.name("CONTROL") .name("CONTROL")
.timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT)) .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
.add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class) .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
.add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_INITIALIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class) .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
.add(RpcType.REQ_UNPAUSE_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
.build(); .build();
} }


public static int RPC_VERSION = 3; public static int RPC_VERSION = 3;


public static final Response OK = new Response(RpcType.ACK, Acks.OK); public static final Response OK = new Response(RpcType.ACK, Acks.OK);
public static final Response FAIL = new Response(RpcType.ACK, Acks.FAIL);
} }
Expand Up @@ -17,12 +17,9 @@
*/ */
package org.apache.drill.exec.rpc.control; package org.apache.drill.exec.rpc.control;


import java.util.Collection;

import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.InitializeFragments; import org.apache.drill.exec.proto.BitControl.InitializeFragments;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.BitControl.RpcType; import org.apache.drill.exec.proto.BitControl.RpcType;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
Expand Down Expand Up @@ -56,7 +53,12 @@ public void sendFragments(RpcOutcomeListener<Ack> outcomeListener, InitializeFra
} }


public void cancelFragment(RpcOutcomeListener<Ack> outcomeListener, FragmentHandle handle){ public void cancelFragment(RpcOutcomeListener<Ack> outcomeListener, FragmentHandle handle){
CancelFragment b = new CancelFragment(outcomeListener, handle); final SignalFragment b = new SignalFragment(outcomeListener, handle, RpcType.REQ_CANCEL_FRAGMENT);
manager.runCommand(b);
}

public void resumeFragment(final RpcOutcomeListener<Ack> outcomeListener, final FragmentHandle handle) {
final SignalFragment b = new SignalFragment(outcomeListener, handle, RpcType.REQ_UNPAUSE_FRAGMENT);
manager.runCommand(b); manager.runCommand(b);
} }


Expand Down Expand Up @@ -114,17 +116,19 @@ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection
} }
} }


public static class CancelFragment extends ListeningCommand<Ack, ControlConnection> { public static class SignalFragment extends ListeningCommand<Ack, ControlConnection> {
final FragmentHandle handle; final FragmentHandle handle;
final RpcType type;


public CancelFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle) { public SignalFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle, RpcType type) {
super(listener); super(listener);
this.handle = handle; this.handle = handle;
this.type = type;
} }


@Override @Override
public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
connection.sendUnsafe(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class); connection.sendUnsafe(outcomeListener, type, handle, Ack.class);
} }


} }
Expand All @@ -139,7 +143,7 @@ public SendFragment(RpcOutcomeListener<Ack> listener, InitializeFragments fragme


@Override @Override
public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENTS, fragments, Ack.class); connection.send(outcomeListener, RpcType.REQ_INITIALIZE_FRAGMENTS, fragments, Ack.class);
} }


} }
Expand Down
Expand Up @@ -36,11 +36,12 @@ public static RpcConfig getMapping(DrillConfig config) {
return RpcConfig.newBuilder() return RpcConfig.newBuilder()
.name("USER") .name("USER")
.timeout(config.getInt(ExecConstants.USER_RPC_TIMEOUT)) .timeout(config.getInt(ExecConstants.USER_RPC_TIMEOUT))
.add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit. .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit
.add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) // user to bit .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) // user to bit
.add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
.add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) // bit to user .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) // bit to user
.add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) // bit to user .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) // bit to user
.add(RpcType.RESUME_PAUSED_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
.build(); .build();
} }


Expand Down
Expand Up @@ -113,6 +113,15 @@ protected Response handle(UserClientConnection connection, int rpcType, ByteBuf
throw new RpcException("Failure while decoding QueryId body.", e); throw new RpcException("Failure while decoding QueryId body.", e);
} }


case RpcType.RESUME_PAUSED_QUERY_VALUE:
try {
final QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
final Ack ack = worker.resumeQuery(queryId);
return new Response(RpcType.ACK, ack);
} catch (final InvalidProtocolBufferException e) {
throw new RpcException("Failure while decoding QueryId body.", e);
}

default: default:
throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type. Type was %d.", rpcType)); throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type. Type was %d.", rpcType));
} }
Expand Down
Expand Up @@ -42,13 +42,16 @@
import org.apache.drill.exec.store.pojo.Writers.NIntWriter; import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter; import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter;
import org.apache.drill.exec.store.pojo.Writers.StringWriter; import org.apache.drill.exec.store.pojo.Writers.StringWriter;
import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.ValueVector;


import com.google.common.collect.Lists; import com.google.common.collect.Lists;


public class PojoRecordReader<T> extends AbstractRecordReader { public class PojoRecordReader<T> extends AbstractRecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
private static final ExecutionControlsInjector injector =
ExecutionControlsInjector.getInjector(PojoRecordReader.class);


public final int forJsonIgnore = 1; public final int forJsonIgnore = 1;


Expand All @@ -64,16 +67,9 @@ public PojoRecordReader(Class<T> pojoClass, Iterator<T> iterator) {
this.iterator = iterator; this.iterator = iterator;
} }


public OperatorContext getOperatorContext() {
return operatorContext;
}

public void setOperatorContext(OperatorContext operatorContext) {
this.operatorContext = operatorContext;
}

@Override @Override
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
operatorContext = context;
try { try {
Field[] fields = pojoClass.getDeclaredFields(); Field[] fields = pojoClass.getDeclaredFields();
List<PojoWriter> writers = Lists.newArrayList(); List<PojoWriter> writers = Lists.newArrayList();
Expand Down Expand Up @@ -147,7 +143,7 @@ private void setValueCount(int i) {
@Override @Override
public int next() { public int next() {
boolean allocated = false; boolean allocated = false;

injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger);
try { try {
int i =0; int i =0;
outside: outside:
Expand Down
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.testing;

/**
* This class is used internally for tracking injected countdown latches. These latches are specified via
* {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
*
* This injection is useful in the case where a thread spawns multiple threads. The parent thread initializes the latch
* with the expected number of countdown and awaits. The child threads count down on the same latch (same site class
* and same descriptor), and once there are enough, the parent thread continues.
*/
public interface CountDownLatchInjection {

/**
* Initializes the underlying latch
* @param count the number of times {@link #countDown} must be invoke before threads can pass through {@link #await}
*/
void initialize(final int count);

/**
* Causes the current thread to wait until the latch has counted down to zero, unless the thread is
* {@link Thread#interrupt interrupted}.
*/
void await() throws InterruptedException;

/**
* Await without interruption. In the case of interruption, log a warning and continue to wait.
*/
void awaitUninterruptibly();

/**
* Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
*/
void countDown();
}
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.testing;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.drill.common.concurrent.ExtendedLatch;

import java.util.concurrent.CountDownLatch;

/**
* See {@link org.apache.drill.exec.testing.CountDownLatchInjection} Degenerates to
* {@link org.apache.drill.exec.testing.PauseInjection#pause}, if initialized to zero count. In any case, this injection
* provides more control than PauseInjection.
*/
@JsonAutoDetect(fieldVisibility = Visibility.ANY)
public class CountDownLatchInjectionImpl extends Injection implements CountDownLatchInjection {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CountDownLatchInjectionImpl.class);

private ExtendedLatch latch = null;

@JsonCreator // ensures instances are created only through JSON
private CountDownLatchInjectionImpl(@JsonProperty("address") final String address,
@JsonProperty("port") final int port,
@JsonProperty("siteClass") final String siteClass,
@JsonProperty("desc") final String desc) throws InjectionConfigurationException {
super(address, port, siteClass, desc, 0, 1);
}

@Override
protected boolean injectNow() {
return true;
}

@Override
public void initialize(final int count) {
Preconditions.checkArgument(latch == null, "Latch can be initialized only once at %s in %s.", desc,
siteClass.getSimpleName());
Preconditions.checkArgument(count > 0, "Count has to be a positive integer at %s in %s.", desc,
siteClass.getSimpleName());
latch = new ExtendedLatch(count);
}

@Override
public void await() throws InterruptedException {
Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
try {
latch.await();
} catch (final InterruptedException e) {
logger.warn("Interrupted while awaiting in %s at %s.", siteClass.getSimpleName(), desc);
throw e;
}
}

@Override
public void awaitUninterruptibly() {
Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
latch.awaitUninterruptibly();
}

@Override
public void countDown() {
Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
Preconditions.checkArgument(latch.getCount() > 0, "Counting down on latch more than intended.");
latch.countDown();
}
}

0 comments on commit f8e5e61

Please sign in to comment.