Skip to content

Commit

Permalink
Minor: moved custom events processing in GridContinuousProcessor's me…
Browse files Browse the repository at this point in the history
…thods.
  • Loading branch information
sboikov committed Nov 29, 2017
1 parent 5fa5ae7 commit eef5afd
Showing 1 changed file with 85 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ public GridContinuousProcessor(GridKernalContext ctx) {
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
ClusterNode snd,
StartRoutineDiscoveryMessage msg) {
if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping())
processStartRequest(snd, msg);
if (ctx.isStopping())
return;

processStartRequest(snd, msg);
}
});

Expand All @@ -186,39 +188,10 @@ public GridContinuousProcessor(GridKernalContext ctx) {
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
ClusterNode snd,
StartRoutineAckDiscoveryMessage msg) {
StartFuture fut = startFuts.remove(msg.routineId());

if (fut != null) {
if (msg.errs().isEmpty()) {
LocalRoutineInfo routine = locInfos.get(msg.routineId());

// Update partition counters.
if (routine != null && routine.handler().isQuery()) {
Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = msg.updateCountersPerNode();
Map<Integer, T2<Long, Long>> cntrs = msg.updateCounters();

GridCacheAdapter<Object, Object> interCache =
ctx.cache().internalCache(routine.handler().cacheName());

GridCacheContext cctx = interCache != null ? interCache.context() : null;

if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
cntrsPerNode.put(ctx.localNodeId(),
toCountersMap(cctx.topology().localUpdateCounters(false)));

routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
}

fut.onRemoteRegistered();
}
else {
IgniteCheckedException firstEx = F.first(msg.errs().values());

fut.onDone(firstEx);
if (ctx.isStopping())
return;

stopRoutine(msg.routineId());
}
}
processStartAckRequest(topVer, msg);
}
});

Expand All @@ -227,16 +200,10 @@ public GridContinuousProcessor(GridKernalContext ctx) {
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
ClusterNode snd,
StopRoutineDiscoveryMessage msg) {
if (!snd.id().equals(ctx.localNodeId())) {
UUID routineId = msg.routineId();

unregisterRemote(routineId);
}
if (ctx.isStopping())
return;

for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) {
if (clientInfo.remove(msg.routineId()) != null)
break;
}
processStopRequest(snd, msg);
}
});

Expand All @@ -245,10 +212,10 @@ public GridContinuousProcessor(GridKernalContext ctx) {
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
ClusterNode snd,
StopRoutineAckDiscoveryMessage msg) {
StopFuture fut = stopFuts.remove(msg.routineId());
if (ctx.isStopping())
return;

if (fut != null)
fut.onDone();
processStopAckRequest(msg);
}
});

Expand Down Expand Up @@ -459,7 +426,7 @@ private Map<UUID, LocalRoutineInfo> copyLocalInfos(Map<UUID, LocalRoutineInfo> l
/** {@inheritDoc} */
@Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
if (log.isDebugEnabled()) {
log.info("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() +
log.debug("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() +
", loc=" + ctx.localNodeId() +
", data=" + data.joiningNodeData() +
']');
Expand Down Expand Up @@ -975,12 +942,83 @@ private void sendNotification(UUID nodeId,
ackC);
}

/**
* @param msg Message.
*/
private void processStopAckRequest(StopRoutineAckDiscoveryMessage msg) {
StopFuture fut = stopFuts.remove(msg.routineId());

if (fut != null)
fut.onDone();
}

/**
* @param snd Sender node.
* @param msg Message/
*/
private void processStopRequest(ClusterNode snd, StopRoutineDiscoveryMessage msg) {
if (!snd.id().equals(ctx.localNodeId())) {
UUID routineId = msg.routineId();

unregisterRemote(routineId);
}

for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) {
if (clientInfo.remove(msg.routineId()) != null)
break;
}
}

/**
* @param topVer Topology version.
* @param msg Message.
*/
private void processStartAckRequest(AffinityTopologyVersion topVer,
StartRoutineAckDiscoveryMessage msg) {
StartFuture fut = startFuts.remove(msg.routineId());

if (fut != null) {
if (msg.errs().isEmpty()) {
LocalRoutineInfo routine = locInfos.get(msg.routineId());

// Update partition counters.
if (routine != null && routine.handler().isQuery()) {
Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = msg.updateCountersPerNode();
Map<Integer, T2<Long, Long>> cntrs = msg.updateCounters();

GridCacheAdapter<Object, Object> interCache =
ctx.cache().internalCache(routine.handler().cacheName());

GridCacheContext cctx = interCache != null ? interCache.context() : null;

if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
cntrsPerNode.put(ctx.localNodeId(),
toCountersMap(cctx.topology().localUpdateCounters(false)));

routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
}

fut.onRemoteRegistered();
}
else {
IgniteCheckedException firstEx = F.first(msg.errs().values());

fut.onDone(firstEx);

stopRoutine(msg.routineId());
}
}
}

/**
* @param node Sender.
* @param req Start request.
*/
private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage req) {
UUID routineId = req.routineId();
if (node.id().equals(ctx.localNodeId()))
return;

StartRequestData data = req.startRequestData();

GridContinuousHandler hnd = data.handler();
Expand Down

0 comments on commit eef5afd

Please sign in to comment.