Skip to content

Commit

Permalink
IGNITE-7177 Correctly handle custom messages which do not change affi…
Browse files Browse the repository at this point in the history
…nity
  • Loading branch information
agoncharuk committed Dec 12, 2017
1 parent c10aa0c commit 7cf049e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,28 @@ private void scheduleClientChangeMessage(Map<Integer, Boolean> startedCaches, Se
cctx.time().addTimeoutObject(timeoutObj);
}

/**
* @param fut Exchange future.
* @param crd Coordinator flag.
* @param exchActions Exchange actions.
*/
public void onCustomMessageNoAffinityChange(
GridDhtPartitionsExchangeFuture fut,
boolean crd,
@Nullable final ExchangeActions exchActions
) {
final ExchangeDiscoveryEvents evts = fut.context().events();

forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@Override public void applyx(GridAffinityAssignmentCache aff) {
if (exchActions != null && exchActions.cacheGroupStopping(aff.groupId()))
return;

aff.clientEventTopologyChange(evts.lastEvent(), evts.topologyVersion());
}
});
}

/**
* Called on exchange initiated for cache start/stop request.
*
Expand All @@ -703,14 +725,7 @@ public void onCacheChangeRequest(
caches.updateCachesInfo(exchActions);

// Affinity did not change for existing caches.
forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
if (exchActions.cacheGroupStopping(aff.groupId()))
return;

aff.clientEventTopologyChange(evts.lastEvent(), evts.topologyVersion());
}
});
onCustomMessageNoAffinityChange(fut, crd, exchActions);

for (ExchangeActions.CacheActionData action : exchActions.cacheStartRequests()) {
DynamicCacheDescriptor cacheDesc = action.descriptor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,9 +588,7 @@ else if (msg instanceof DynamicCacheChangeBatch) {
exchange = onCacheChangeRequest(crdNode);
}
else if (msg instanceof SnapshotDiscoveryMessage) {
exchange = CU.clientNode(firstDiscoEvt.eventNode()) ?
onClientNodeEvent(crdNode) :
onServerNodeEvent(crdNode);
exchange = onCustomMessageNoAffinityChange(crdNode);
}
else {
assert affChangeMsg != null : this;
Expand Down Expand Up @@ -892,6 +890,16 @@ private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedExcep
return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
}

/**
* @param crd Coordinator flag.
* @return Exchange type.
*/
private ExchangeType onCustomMessageNoAffinityChange(boolean crd) {
cctx.affinity().onCustomMessageNoAffinityChange(this, crd, exchActions);

return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
}

/**
* @param crd Coordinator flag.
* @throws IgniteCheckedException If failed.
Expand Down

0 comments on commit 7cf049e

Please sign in to comment.