Skip to content

Commit

Permalink
Continuous query compatibility fix (topVer can be null for old CacheC…
Browse files Browse the repository at this point in the history
…ontinuousQueryEntry).
  • Loading branch information
sboikov committed Feb 25, 2016
1 parent 10214cc commit dee6190
Showing 1 changed file with 23 additions and 20 deletions.
Expand Up @@ -737,6 +737,12 @@ public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Null
public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) { public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) {
assert entry != null; assert entry != null;


if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
assert entry.updateCounter() == 0L : entry;

return F.asList(entry);
}

List<CacheContinuousQueryEntry> entries; List<CacheContinuousQueryEntry> entries;


synchronized (pendingEvts) { synchronized (pendingEvts) {
Expand Down Expand Up @@ -991,28 +997,25 @@ private void sendBackupAcknowledge(final IgniteBiTuple<Map<Integer, Long>, Set<A
routineId, routineId,
t.get1()); t.get1());


Collection<ClusterNode> nodes = new HashSet<>(); for (AffinityTopologyVersion topVer : t.get2()) {

for (ClusterNode node : ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)) {
for (AffinityTopologyVersion topVer : t.get2()) if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) >= 0) {
nodes.addAll(ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)); try {

cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
for (ClusterNode node : nodes) { }
if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) >= 0) { catch (ClusterTopologyCheckedException e) {
try { IgniteLogger log = ctx.log(getClass());
cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException e) {
IgniteLogger log = ctx.log(getClass());


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Failed to send acknowledge message, node left " + log.debug("Failed to send acknowledge message, node left " +
"[msg=" + msg + ", node=" + node + ']'); "[msg=" + msg + ", node=" + node + ']');
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
IgniteLogger log = ctx.log(getClass()); IgniteLogger log = ctx.log(getClass());


U.error(log, "Failed to send acknowledge message " + U.error(log, "Failed to send acknowledge message " +
"[msg=" + msg + ", node=" + node + ']', e); "[msg=" + msg + ", node=" + node + ']', e);
}
} }
} }
} }
Expand Down

0 comments on commit dee6190

Please sign in to comment.