Skip to content

Commit

Permalink
DRILL-5961: For long running queries (> 10 min) Drill may raise Fragm…
Browse files Browse the repository at this point in the history
…entSetupException for completed/cancelled fragments

This closes #1041
  • Loading branch information
vrozov authored and parthchandra committed Jan 12, 2018
1 parent cbb79e5 commit c5af3ae
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 84 deletions.
Expand Up @@ -39,10 +39,6 @@ public class WorkEventBus {
private final ConcurrentMap<FragmentHandle, FragmentManager> managers = Maps.newConcurrentMap();
private final ConcurrentMap<QueryId, FragmentStatusListener> listeners =
new ConcurrentHashMap<>(16, 0.75f, 16);
private final Cache<FragmentHandle, Integer> recentlyFinishedFragments = CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();

public void removeFragmentStatusListener(final QueryId queryId) {
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -74,83 +70,42 @@ public void statusUpdate(final FragmentStatus status) {

public void addFragmentManager(final FragmentManager fragmentManager) {
if (logger.isDebugEnabled()) {
logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
logger.debug("Fragment {} manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()), fragmentManager);
}
final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
if (old != null) {
throw new IllegalStateException(
"Tried to set fragment manager when has already been set for the provided fragment handle.");
}
}

public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
synchronized (this) {
return managers.get(handle);
if (old != null) {
throw new IllegalStateException(
String.format("Manager {} for fragment {} already exists.", old, QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())));
}
}

public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
synchronized (this) {
// Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message.
if (recentlyFinishedFragments.asMap().containsKey(handle)) {
if (logger.isDebugEnabled()) {
logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
}
return null;
}

// since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
final FragmentManager m = managers.get(handle);
if (m != null) {
return m;
}
}
throw new FragmentSetupException("Failed to receive plan fragment that was required for id: "
+ QueryIdHelper.getQueryIdentifier(handle));
public FragmentManager getFragmentManager(final FragmentHandle handle) {
return managers.get(handle);
}

/**
* Removes fragment manager (for the corresponding the handle) from the work event bus. This method can be called
* multiple times. The manager will be removed only once (the first call).
* @param handle the handle to the fragment
*/
public void removeFragmentManager(final FragmentHandle handle) {
if (logger.isDebugEnabled()) {
logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
}

synchronized (this) {
final FragmentManager manager = managers.get(handle);
if (manager != null) {
recentlyFinishedFragments.put(handle, 1);
managers.remove(handle);
} else {
logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
}
}
}

/**
* Cancels and removes fragment manager (for the corresponding the handle) from the work event bus, Currently, used
* for fragments waiting on data (root and intermediate).
* Optionally cancels and removes fragment manager (for the corresponding the handle) from the work event bus. Currently, used
* for fragments waiting on data (root and intermediate). This method can be called multiple times. The manager will be removed
* only once (the first call).
* @param handle the handle to the fragment
* @param cancel
* @return if the fragment was found and removed from the event bus
*/
public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle) {
if (logger.isDebugEnabled()) {
logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
}

synchronized (this) {
final FragmentManager manager = managers.get(handle);
if (manager == null) {
return false;
public boolean removeFragmentManager(final FragmentHandle handle, final boolean cancel) {
final FragmentManager manager = managers.remove(handle);
if (manager != null) {
assert !manager.isCancelled() : String.format("Fragment {} manager {} is already cancelled.", QueryIdHelper.getQueryIdentifier(handle), manager);
if (cancel) {
manager.cancel();
}
if (logger.isDebugEnabled()) {
logger.debug("{} fragment {} manager {} from the work bus.", cancel ? "Cancel and removed" : "Removed",
QueryIdHelper.getQueryIdentifier(handle), manager);
}

manager.cancel();
recentlyFinishedFragments.put(handle, 1);
managers.remove(handle);
return true;
} else if (logger.isWarnEnabled()) {
logger.warn("Fragment {} manager is not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
}
return false;
}
}
Expand Up @@ -325,7 +325,9 @@ public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
workBus.removeFragmentManager(fragmentHandle);
if (!fragmentManager.isCancelled()) {
workBus.removeFragmentManager(fragmentHandle, false);
}
indicateIfSafeToExit();
}
});
Expand Down
Expand Up @@ -193,7 +193,7 @@ private Ack cancelFragment(final FragmentHandle handle) {

// Case 2: Cancel active intermediate fragment. Such a fragment will be in the work bus. Delegate cancel to the
// work bus.
final boolean removed = bee.getContext().getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
final boolean removed = bee.getContext().getWorkBus().removeFragmentManager(handle, true);
if (removed) {
return Acks.OK;
}
Expand All @@ -217,7 +217,7 @@ private Ack cancelFragment(final FragmentHandle handle) {

private Ack resumeFragment(final FragmentHandle handle) {
// resume a pending fragment
final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(handle);
if (manager != null) {
manager.unpause();
return Acks.OK;
Expand All @@ -237,14 +237,12 @@ private Ack resumeFragment(final FragmentHandle handle) {

private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {

final FragmentManager manager =
bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender());

FragmentExecutor executor;
if (manager != null) {
manager.receivingFragmentFinished(finishedReceiver.getReceiver());
} else {
executor = bee.getFragmentRunner(finishedReceiver.getSender());
final FragmentExecutor executor = bee.getFragmentRunner(finishedReceiver.getSender());
if (executor != null) {
executor.receivingFragmentFinished(finishedReceiver.getReceiver());
} else {
Expand Down
Expand Up @@ -57,9 +57,4 @@ public FragmentExecutor getRunnable() {
public void receivingFragmentFinished(final FragmentHandle handle) {
fragmentExecutor.receivingFragmentFinished(handle);
}

@Override
public synchronized void cancel() {
super.cancel();
}
}
Expand Up @@ -225,7 +225,6 @@ public void interrupted(final InterruptedException e) {
public void success(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception {

new NonStrictExpectations() {{
workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
workBus.getFragmentManager( (FragmentHandle) any); result = manager;
}};

Expand Down Expand Up @@ -273,7 +272,6 @@ public void successEncryption(@Injectable WorkerBee bee, @Injectable final WorkE
updateTestCluster(1, newConfig);

new NonStrictExpectations() {{
workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
workBus.getFragmentManager( (FragmentHandle) any); result = manager;
}};

Expand Down Expand Up @@ -322,7 +320,6 @@ public void successEncryptionChunkMode(@Injectable WorkerBee bee, @Injectable fi
updateTestCluster(1, newConfig);

new NonStrictExpectations() {{
workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
workBus.getFragmentManager( (FragmentHandle) any); result = manager;
}};

Expand Down
Expand Up @@ -105,7 +105,6 @@ public FragmentContext getFragmentContext(){


new NonStrictExpectations() {{
workBus.getFragmentManagerIfExists((FragmentHandle) any); result = fman;
workBus.getFragmentManager( (FragmentHandle) any); result = fman;
}};

Expand Down

0 comments on commit c5af3ae

Please sign in to comment.