Skip to content

Commit

Permalink
# IGNITE-831 Create DiscoverySpiCustomMessage.
Browse files Browse the repository at this point in the history
  • Loading branch information
sevdokimov-gg committed May 6, 2015
1 parent d0dac7d commit 359680d
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 41 deletions.
Expand Up @@ -34,15 +34,15 @@ public class DiscoveryCustomEvent extends DiscoveryEvent {
/** /**
* Built-in event type: custom event sent. * Built-in event type: custom event sent.
* <br> * <br>
* Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(Serializable)}. * Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)}.
* <p> * <p>
* *
* @see DiscoveryCustomEvent * @see DiscoveryCustomEvent
*/ */
public static final int EVT_DISCOVERY_CUSTOM_EVT = 18; public static final int EVT_DISCOVERY_CUSTOM_EVT = 18;


/** */ /** */
private Serializable data; private DiscoveryCustomMessage customMsg;


/** Affinity topology version. */ /** Affinity topology version. */
private AffinityTopologyVersion affTopVer; private AffinityTopologyVersion affTopVer;
Expand All @@ -57,15 +57,15 @@ public DiscoveryCustomEvent() {
/** /**
* @return Data. * @return Data.
*/ */
public Serializable data() { public DiscoveryCustomMessage customMessage() {
return data; return customMsg;
} }


/** /**
* @param data New data. * @param customMsg New customMessage.
*/ */
public void data(Serializable data) { public void customMessage(DiscoveryCustomMessage customMsg) {
this.data = data; this.customMsg = customMsg;
} }


/** /**
Expand Down
Expand Up @@ -34,11 +34,6 @@ class CustomMessageWrapper implements DiscoverySpiCustomMessage {
this.delegate = delegate; this.delegate = delegate;
} }


/** {@inheritDoc} */
@Override public boolean forwardMinorVersion() {
return delegate.forwardMinorVersion();
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override public DiscoverySpiCustomMessage newMessageOnRingEnd() { @Nullable @Override public DiscoverySpiCustomMessage newMessageOnRingEnd() {
DiscoveryCustomMessage res = delegate.newMessageOnRingEnd(); DiscoveryCustomMessage res = delegate.newMessageOnRingEnd();
Expand Down
Expand Up @@ -344,8 +344,11 @@ private void updateClientNodes(UUID leftNodeId) {
ClusterNode node, ClusterNode node,
Collection<ClusterNode> topSnapshot, Collection<ClusterNode> topSnapshot,
Map<Long, Collection<ClusterNode>> snapshots, Map<Long, Collection<ClusterNode>> snapshots,
@Nullable Serializable data @Nullable DiscoverySpiCustomMessage spiCustomMsg
) { ) {
DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null
: ((CustomMessageWrapper)spiCustomMsg).delegate();

final ClusterNode locNode = localNode(); final ClusterNode locNode = localNode();


if (snapshots != null) if (snapshots != null)
Expand All @@ -356,7 +359,7 @@ private void updateClientNodes(UUID leftNodeId) {
if (type == EVT_NODE_METRICS_UPDATED) if (type == EVT_NODE_METRICS_UPDATED)
verChanged = false; verChanged = false;
else if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { else if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
if (data != null && ((DiscoverySpiCustomMessage)data).forwardMinorVersion()) { if (customMsg != null && customMsg.forwardMinorVersion()) {
minorTopVer++; minorTopVer++;


verChanged = true; verChanged = true;
Expand All @@ -380,9 +383,7 @@ else if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
} }


if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
if (data != null) { if (customMsg != null) {
DiscoveryCustomMessage customMsg = ((CustomMessageWrapper)data).delegate();

for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) { for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls); List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls);


Expand Down Expand Up @@ -435,7 +436,7 @@ else if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
return; return;
} }


discoWrk.addEvent(type, nextTopVer, node, topSnapshot, data); discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
} }
}); });


Expand Down Expand Up @@ -1567,8 +1568,8 @@ public void scheduleSegmentCheck() {
/** Worker for discovery events. */ /** Worker for discovery events. */
private class DiscoveryWorker extends GridWorker { private class DiscoveryWorker extends GridWorker {
/** Event queue. */ /** Event queue. */
private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, Serializable>> evts = private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
new LinkedBlockingQueue<>(); DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();


/** Node segmented event fired flag. */ /** Node segmented event fired flag. */
private boolean nodeSegFired; private boolean nodeSegFired;
Expand Down Expand Up @@ -1634,7 +1635,7 @@ void addEvent(
AffinityTopologyVersion topVer, AffinityTopologyVersion topVer,
ClusterNode node, ClusterNode node,
Collection<ClusterNode> topSnapshot, Collection<ClusterNode> topSnapshot,
@Nullable Serializable data @Nullable DiscoveryCustomMessage data
) { ) {
assert node != null : data; assert node != null : data;


Expand Down Expand Up @@ -1675,7 +1676,8 @@ private String quietNode(ClusterNode node) {
/** @throws InterruptedException If interrupted. */ /** @throws InterruptedException If interrupted. */
@SuppressWarnings("DuplicateCondition") @SuppressWarnings("DuplicateCondition")
private void body0() throws InterruptedException { private void body0() throws InterruptedException {
GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, Serializable> evt = evts.take(); GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
DiscoveryCustomMessage> evt = evts.take();


int type = evt.get1(); int type = evt.get1();


Expand Down Expand Up @@ -1793,7 +1795,7 @@ else if (log.isDebugEnabled())
customEvt.type(type); customEvt.type(type);
customEvt.topologySnapshot(topVer.topologyVersion(), null); customEvt.topologySnapshot(topVer.topologyVersion(), null);
customEvt.affinityTopologyVersion(topVer); customEvt.affinityTopologyVersion(topVer);
customEvt.data(evt.get5()); customEvt.customMessage(evt.get5());


ctx.event().record(customEvt); ctx.event().record(customEvt);
} }
Expand Down
Expand Up @@ -150,8 +150,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
else { else {
DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e; DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;


if (customEvt.data() instanceof DynamicCacheChangeBatch) { if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.data(); DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();


Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size()); Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());


Expand Down
Expand Up @@ -25,13 +25,6 @@
* *
*/ */
public interface DiscoverySpiCustomMessage extends Serializable { public interface DiscoverySpiCustomMessage extends Serializable {
/**
* Whether or not minor version of topology should be increased on message receive.
*
* @return {@code true} if minor topology version should be increased.
*/
public boolean forwardMinorVersion();

/** /**
* Called when message passed the ring. * Called when message passed the ring.
*/ */
Expand Down
Expand Up @@ -18,10 +18,9 @@
package org.apache.ignite.spi.discovery; package org.apache.ignite.spi.discovery;


import org.apache.ignite.cluster.*; import org.apache.ignite.cluster.*;
import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.*;
import org.jetbrains.annotations.*; import org.jetbrains.annotations.*;


import java.io.*;
import java.util.*; import java.util.*;


/** /**
Expand All @@ -47,5 +46,5 @@ public void onDiscovery(
ClusterNode node, ClusterNode node,
Collection<ClusterNode> topSnapshot, Collection<ClusterNode> topSnapshot,
@Nullable Map<Long, Collection<ClusterNode>> topHist, @Nullable Map<Long, Collection<ClusterNode>> topHist,
@Nullable Serializable data); @Nullable DiscoverySpiCustomMessage data);
} }
Expand Up @@ -1332,7 +1332,7 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {


if (node != null && node.visible()) { if (node != null && node.visible()) {
try { try {
Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());


notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allNodes(), msgObj); notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allNodes(), msgObj);
} }
Expand Down Expand Up @@ -1433,7 +1433,7 @@ private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection
* @param top Topology snapshot. * @param top Topology snapshot.
*/ */
private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top, private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top,
@Nullable Serializable data) { @Nullable DiscoverySpiCustomMessage data) {
DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr; DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr;


if (lsnr != null) { if (lsnr != null) {
Expand Down
Expand Up @@ -4524,7 +4524,7 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) {


if (node != null) { if (node != null) {
try { try {
Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());


lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
msg.topologyVersion(), msg.topologyVersion(),
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.ignite.testframework.config.*; import org.apache.ignite.testframework.config.*;
import org.apache.ignite.testframework.junits.*; import org.apache.ignite.testframework.junits.*;
import org.apache.ignite.testframework.junits.spi.*; import org.apache.ignite.testframework.junits.spi.*;
import org.jetbrains.annotations.*;


import javax.management.*; import javax.management.*;
import java.io.*; import java.io.*;
Expand Down Expand Up @@ -132,7 +133,7 @@ public boolean isMetricsUpdated() {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, @Override public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot,
Map<Long, Collection<ClusterNode>> topHist, Serializable data) { Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage data) {
if (type == EVT_NODE_METRICS_UPDATED) if (type == EVT_NODE_METRICS_UPDATED)
isMetricsUpdate = true; isMetricsUpdate = true;
} }
Expand Down Expand Up @@ -205,7 +206,7 @@ public void testLocalHeartbeat() throws Exception {
DiscoverySpiListener locHeartbeatLsnr = new DiscoverySpiListener() { DiscoverySpiListener locHeartbeatLsnr = new DiscoverySpiListener() {
@Override public void onDiscovery(int type, long topVer, ClusterNode node, @Override public void onDiscovery(int type, long topVer, ClusterNode node,
Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist, Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist,
Serializable data) { @Nullable DiscoverySpiCustomMessage data) {
// If METRICS_UPDATED came from local node // If METRICS_UPDATED came from local node
if (type == EVT_NODE_METRICS_UPDATED if (type == EVT_NODE_METRICS_UPDATED
&& node.id().equals(spi.getLocalNode().id())) && node.id().equals(spi.getLocalNode().id()))
Expand Down Expand Up @@ -369,7 +370,8 @@ protected long getMaxMetricsWaitTime() {
spi.setListener(new DiscoverySpiListener() { spi.setListener(new DiscoverySpiListener() {
@SuppressWarnings({"NakedNotify"}) @SuppressWarnings({"NakedNotify"})
@Override public void onDiscovery(int type, long topVer, ClusterNode node, @Override public void onDiscovery(int type, long topVer, ClusterNode node,
Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist, Serializable data) { Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist,
@Nullable DiscoverySpiCustomMessage data) {
info("Discovery event [type=" + type + ", node=" + node + ']'); info("Discovery event [type=" + type + ", node=" + node + ']');


synchronized (mux) { synchronized (mux) {
Expand Down

0 comments on commit 359680d

Please sign in to comment.