Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ public class WorkEventBus {
private final ConcurrentMap<FragmentHandle, FragmentManager> managers = Maps.newConcurrentMap();
private final ConcurrentMap<QueryId, FragmentStatusListener> listeners =
new ConcurrentHashMap<>(16, 0.75f, 16);
private final Cache<FragmentHandle, Integer> recentlyFinishedFragments = CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();

public void removeFragmentStatusListener(final QueryId queryId) {
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -74,83 +70,42 @@ public void statusUpdate(final FragmentStatus status) {

public void addFragmentManager(final FragmentManager fragmentManager) {
if (logger.isDebugEnabled()) {
logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
logger.debug("Fragment {} manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()), fragmentManager);
}
final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
if (old != null) {
throw new IllegalStateException(
"Tried to set fragment manager when has already been set for the provided fragment handle.");
}
}

public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
synchronized (this) {
return managers.get(handle);
if (old != null) {
throw new IllegalStateException(
String.format("Manager {} for fragment {} already exists.", old, QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())));
}
}

public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
synchronized (this) {
// Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message.
if (recentlyFinishedFragments.asMap().containsKey(handle)) {
if (logger.isDebugEnabled()) {
logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
}
return null;
}

// since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
final FragmentManager m = managers.get(handle);
if (m != null) {
return m;
}
}
throw new FragmentSetupException("Failed to receive plan fragment that was required for id: "
+ QueryIdHelper.getQueryIdentifier(handle));
public FragmentManager getFragmentManager(final FragmentHandle handle) {
return managers.get(handle);
}

/**
* Removes fragment manager (for the corresponding the handle) from the work event bus. This method can be called
* multiple times. The manager will be removed only once (the first call).
* @param handle the handle to the fragment
*/
public void removeFragmentManager(final FragmentHandle handle) {
if (logger.isDebugEnabled()) {
logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
}

synchronized (this) {
final FragmentManager manager = managers.get(handle);
if (manager != null) {
recentlyFinishedFragments.put(handle, 1);
managers.remove(handle);
} else {
logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
}
}
}

/**
* Cancels and removes fragment manager (for the corresponding the handle) from the work event bus, Currently, used
* for fragments waiting on data (root and intermediate).
* Optionally cancels and removes fragment manager (for the corresponding the handle) from the work event bus. Currently, used
* for fragments waiting on data (root and intermediate). This method can be called multiple times. The manager will be removed
* only once (the first call).
* @param handle the handle to the fragment
* @param cancel
* @return if the fragment was found and removed from the event bus
*/
public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle) {
if (logger.isDebugEnabled()) {
logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
}

synchronized (this) {
final FragmentManager manager = managers.get(handle);
if (manager == null) {
return false;
public boolean removeFragmentManager(final FragmentHandle handle, final boolean cancel) {
final FragmentManager manager = managers.remove(handle);
if (manager != null) {
assert !manager.isCancelled() : String.format("Fragment {} manager {} is already cancelled.", QueryIdHelper.getQueryIdentifier(handle), manager);
if (cancel) {
manager.cancel();
}
if (logger.isDebugEnabled()) {
logger.debug("{} fragment {} manager {} from the work bus.", cancel ? "Cancel and removed" : "Removed",
QueryIdHelper.getQueryIdentifier(handle), manager);
}

manager.cancel();
recentlyFinishedFragments.put(handle, 1);
managers.remove(handle);
return true;
} else if (logger.isWarnEnabled()) {
logger.warn("Fragment {} manager is not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
}
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is redundant. Doesn't seem like we ever return false, and the callers never seem to check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ControlMessageHandler checks the result on line 196.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
workBus.removeFragmentManager(fragmentHandle);
if (!fragmentManager.isCancelled()) {
workBus.removeFragmentManager(fragmentHandle, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why you don't want to cancel here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If cleanup is not called as a result of FragmentManager cancellation, it is part of the regular cleanup after run is complete.

}
indicateIfSafeToExit();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private Ack cancelFragment(final FragmentHandle handle) {

// Case 2: Cancel active intermediate fragment. Such a fragment will be in the work bus. Delegate cancel to the
// work bus.
final boolean removed = bee.getContext().getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
final boolean removed = bee.getContext().getWorkBus().removeFragmentManager(handle, true);
if (removed) {
return Acks.OK;
}
Expand All @@ -217,7 +217,7 @@ private Ack cancelFragment(final FragmentHandle handle) {

private Ack resumeFragment(final FragmentHandle handle) {
// resume a pending fragment
final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(handle);
if (manager != null) {
manager.unpause();
return Acks.OK;
Expand All @@ -237,14 +237,12 @@ private Ack resumeFragment(final FragmentHandle handle) {

private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {

final FragmentManager manager =
bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender());

FragmentExecutor executor;
if (manager != null) {
manager.receivingFragmentFinished(finishedReceiver.getReceiver());
} else {
executor = bee.getFragmentRunner(finishedReceiver.getSender());
final FragmentExecutor executor = bee.getFragmentRunner(finishedReceiver.getSender());
if (executor != null) {
executor.receivingFragmentFinished(finishedReceiver.getReceiver());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,4 @@ public FragmentExecutor getRunnable() {
public void receivingFragmentFinished(final FragmentHandle handle) {
fragmentExecutor.receivingFragmentFinished(handle);
}

@Override
public synchronized void cancel() {
super.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ public void interrupted(final InterruptedException e) {
public void success(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception {

new NonStrictExpectations() {{
workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
workBus.getFragmentManager( (FragmentHandle) any); result = manager;
}};

Expand Down Expand Up @@ -272,7 +271,6 @@ public void successEncryption(@Injectable WorkerBee bee, @Injectable final WorkE
updateTestCluster(1, newConfig);

new NonStrictExpectations() {{
workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
workBus.getFragmentManager( (FragmentHandle) any); result = manager;
}};

Expand Down Expand Up @@ -321,7 +319,6 @@ public void successEncryptionChunkMode(@Injectable WorkerBee bee, @Injectable fi
updateTestCluster(1, newConfig);

new NonStrictExpectations() {{
workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
workBus.getFragmentManager( (FragmentHandle) any); result = manager;
}};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ public FragmentContext getFragmentContext(){


new NonStrictExpectations() {{
workBus.getFragmentManagerIfExists((FragmentHandle) any); result = fman;
workBus.getFragmentManager( (FragmentHandle) any); result = fman;
}};

Expand Down