Skip to content

Commit

Permalink
IGNITE-8673 Fixed ClusterNode#isClient method. Fixes #4104
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduard Shangareev authored and agoncharuk committed Aug 14, 2018
1 parent a145603 commit 2fc9073
Show file tree
Hide file tree
Showing 26 changed files with 122 additions and 125 deletions.
8 changes: 2 additions & 6 deletions modules/core/src/main/java/org/apache/ignite/Ignition.java
Expand Up @@ -141,30 +141,26 @@ public static boolean isDaemon() {
}

/**
* Sets client mode static flag.
* Sets client mode thread-local flag.
* <p>
* This flag used when node is started if {@link IgniteConfiguration#isClientMode()}
* is {@code null}. When {@link IgniteConfiguration#isClientMode()} is set this flag is ignored.
* It is recommended to use {@link DiscoverySpi} in client mode too.
*
* @param clientMode Client mode flag.
* @see IgniteConfiguration#isClientMode()
* @see TcpDiscoverySpi#setForceServerMode(boolean)
*/
public static void setClientMode(boolean clientMode) {
IgnitionEx.setClientMode(clientMode);
}

/**
* Gets client mode static flag.
* Gets client mode thread-local flag.
* <p>
* This flag used when node is started if {@link IgniteConfiguration#isClientMode()}
* is {@code null}. When {@link IgniteConfiguration#isClientMode()} is set this flag is ignored.
* It is recommended to use {@link DiscoverySpi} in client mode too.
*
* @return Client mode flag.
* @see IgniteConfiguration#isClientMode()
* @see TcpDiscoverySpi#setForceServerMode(boolean)
*/
public static boolean isClientMode() {
return IgnitionEx.isClientMode();
Expand Down
Expand Up @@ -245,17 +245,11 @@ public interface ClusterNode extends BaselineNode {
public boolean isDaemon();

/**
* Tests whether or not this node is connected to cluster as a client.
* <p>
* Do not confuse client in terms of
* discovery {@link DiscoverySpi#isClientMode()} and client in terms of cache
* {@link IgniteConfiguration#isClientMode()}. Cache clients cannot carry data,
* while topology clients connect to topology in a different way.
* Whether this node is cache client (see {@link IgniteConfiguration#isClientMode()}).
*
* @return {@code True if client}.
*
* @return {@code True} if this node is a client node, {@code false} otherwise.
* @see IgniteConfiguration#isClientMode()
* @see Ignition#isClientMode()
* @see DiscoverySpi#isClientMode()
*/
public boolean isClient();
}
Expand Up @@ -67,11 +67,11 @@ public class DefaultCommunicationFailureResolver implements CommunicationFailure
@Nullable private ClusterPart findLargestConnectedCluster(CommunicationFailureContext ctx) {
List<ClusterNode> srvNodes = ctx.topologySnapshot()
.stream()
.filter(node -> !CU.clientNode(node))
.filter(node -> !node.isClient())
.collect(Collectors.toList());

// Exclude client nodes from analysis.
ClusterGraph graph = new ClusterGraph(ctx, CU::clientNode);
ClusterGraph graph = new ClusterGraph(ctx, ClusterNode::isClient);

List<BitSet> components = graph.findConnectedComponents();

Expand Down Expand Up @@ -153,7 +153,7 @@ private void keepCluster(CommunicationFailureContext ctx, ClusterPart clusterPar
ClusterNode node = allNodes.get(idx);

// Client nodes will be processed separately.
if (CU.clientNode(node))
if (node.isClient())
continue;

if (!clusterPart.srvNodesSet.get(idx))
Expand All @@ -164,7 +164,7 @@ private void keepCluster(CommunicationFailureContext ctx, ClusterPart clusterPar
for (int idx = 0; idx < allNodes.size(); idx++) {
ClusterNode node = allNodes.get(idx);

if (CU.clientNode(node) && !clusterPart.connectedClients.contains(node))
if (node.isClient() && !clusterPart.connectedClients.contains(node))
ctx.killNode(node);
}
}
Expand All @@ -182,7 +182,7 @@ private Set<ClusterNode> findConnectedClients(CommunicationFailureContext ctx, B
List<ClusterNode> allNodes = ctx.topologySnapshot();

for (ClusterNode node : allNodes) {
if (!CU.clientNode(node))
if (!node.isClient())
continue;

boolean hasConnections = true;
Expand Down
Expand Up @@ -1110,7 +1110,7 @@ public void start(
catch (IgniteNeedReconnectException e) {
ClusterNode locNode = ctx.discovery().localNode();

assert CU.clientNode(locNode);
assert locNode.isClient();

if (!locNode.isClient())
throw new IgniteCheckedException("Client node in forceServerMode " +
Expand Down
Expand Up @@ -130,6 +130,7 @@
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.OomExceptionHandler;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -207,7 +208,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Predicate filtering client nodes. */
private static final IgnitePredicate<ClusterNode> FILTER_CLI = new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
return CU.clientNode(n);
return n.isClient();
}
};

Expand Down Expand Up @@ -2313,8 +2314,12 @@ public void failNode(UUID nodeId, @Nullable String warning) {
public boolean reconnectSupported() {
DiscoverySpi spi = getSpi();

return ctx.discovery().localNode().isClient() &&
(spi instanceof IgniteDiscoverySpi) &&
ClusterNode clusterNode = ctx.discovery().localNode();

boolean client = (clusterNode instanceof TcpDiscoveryNode) ?
(((TcpDiscoveryNode) clusterNode).clientRouterNodeId() != null) : clusterNode.isClient();

return client && (spi instanceof IgniteDiscoverySpi) &&
((IgniteDiscoverySpi)spi).clientReconnectSupported();
}

Expand Down Expand Up @@ -2374,7 +2379,7 @@ public void reconnect() {
if (!node.isLocal())
rmtNodes.add(node);

if (!CU.clientNode(node)) {
if (!node.isClient()) {
srvNodes.add(node);

if (minSrvVer == null)
Expand Down
Expand Up @@ -59,11 +59,4 @@ public interface IgniteClusterNode extends ClusterNode {
* @param cacheMetrics Cache metrics.
*/
public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics);

/**
* Whether this node is cache client (see {@link IgniteConfiguration#isClientMode()}).
*
* @return {@code True if client}.
*/
public boolean isCacheClient();
}
Expand Up @@ -168,7 +168,7 @@ void onDiscoveryEvent(int type,
!DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg))
return;

if ((!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) ||
if ((!node.isClient() && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) ||
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg)) {
synchronized (mux) {
assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0 :
Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;

Expand Down Expand Up @@ -127,15 +126,15 @@ void addEvent(AffinityTopologyVersion topVer, DiscoveryEvent evt, DiscoCache cac

ClusterNode node = evt.eventNode();

if (!CU.clientNode(node)) {
if (!node.isClient()) {
lastSrvEvt = evt;

srvEvtTopVer = new AffinityTopologyVersion(evt.topologyVersion(), 0);

if (evt.type()== EVT_NODE_JOINED)
srvJoin = true;
else if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
srvLeft = !CU.clientNode(node);
srvLeft = !node.isClient();
}
}

Expand All @@ -151,15 +150,15 @@ public List<DiscoveryEvent> events() {
* @return {@code True} if given event is {@link EventType#EVT_NODE_FAILED} or {@link EventType#EVT_NODE_LEFT}.
*/
public static boolean serverLeftEvent(DiscoveryEvent evt) {
return ((evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) && !CU.clientNode(evt.eventNode()));
return ((evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) && !evt.eventNode().isClient());
}

/**
* @param evt Event.
* @return {@code True} if given event is {@link EventType#EVT_NODE_JOINED}.
*/
public static boolean serverJoinEvent(DiscoveryEvent evt) {
return (evt.type() == EVT_NODE_JOINED && !CU.clientNode(evt.eventNode()));
return (evt.type() == EVT_NODE_JOINED && !evt.eventNode().isClient());
}

/**
Expand Down
Expand Up @@ -1969,7 +1969,7 @@ public boolean mergeExchanges(final GridDhtPartitionsExchangeFuture curFut, Grid
", mergedFut=" + fut.initialVersion() +
", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
", evtNode=" + fut.firstEvent().eventNode().id()+
", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
", evtNodeClient=" + fut.firstEvent().eventNode().isClient() + ']');
}

DiscoveryEvent evt = fut.firstEvent();
Expand Down Expand Up @@ -2060,7 +2060,7 @@ public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture curFu
", mergedFut=" + fut.initialVersion() +
", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
", evtNode=" + fut.firstEvent().eventNode().id() +
", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
", evtNodeClient=" + fut.firstEvent().eventNode().isClient() + ']');
}

addDiscoEvtForTest(fut.firstEvent());
Expand Down Expand Up @@ -2342,7 +2342,7 @@ boolean hasPendingServerExchange() {
// because only current exchange future can have multiple discovery events (exchange merge).
ClusterNode triggeredBy = ((GridDhtPartitionsExchangeFuture) task).firstEvent().eventNode();

if (!CU.clientNode(triggeredBy))
if (!triggeredBy.isClient())
return true;
}
}
Expand Down
Expand Up @@ -3630,7 +3630,7 @@ private IgniteNodeValidationResult validateRestartingCaches(ClusterNode node) {
* @return Validation result or {@code null} in case of success.
*/
@Nullable private IgniteNodeValidationResult validateHashIdResolvers(ClusterNode node) {
if (!CU.clientNode(node)) {
if (!node.isClient()) {
for (DynamicCacheDescriptor desc : cacheDescriptors().values()) {
CacheConfiguration cfg = desc.cacheConfiguration();

Expand Down
Expand Up @@ -1351,37 +1351,13 @@ public static <T extends CachePluginConfiguration> List<T> cachePluginConfigurat
return res;
}

/**
* @param node Node.
* @return {@code True} if given node is client node (has flag {@link IgniteConfiguration#isClientMode()} set).
*/
public static boolean clientNode(ClusterNode node) {
if (node instanceof IgniteClusterNode)
return ((IgniteClusterNode)node).isCacheClient();
else
return clientNodeDirect(node);
}

/**
* @param node Node.
* @return {@code True} if given node is client node (has flag {@link IgniteConfiguration#isClientMode()} set).
*/
@SuppressWarnings("ConstantConditions")
public static boolean clientNodeDirect(ClusterNode node) {
Boolean clientModeAttr = node.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);

assert clientModeAttr != null : node;

return clientModeAttr != null && clientModeAttr;
}

/**
* @param node Node.
* @param filter Node filter.
* @return {@code True} if node is not client node and pass given filter.
*/
public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) {
return !node.isDaemon() && !clientNode(node) && filter.apply(node);
return !node.isDaemon() && !node.isClient() && filter.apply(node);
}

/**
Expand Down
Expand Up @@ -920,7 +920,7 @@ public IgniteInternalFuture<GridNearLockResponse> lockAllAsync(
GridDhtPartitionTopology top = null;

if (req.firstClientRequest()) {
assert CU.clientNode(nearNode);
assert nearNode.isClient();

top = topology();

Expand Down
Expand Up @@ -727,7 +727,7 @@ else if (msg instanceof WalStateAbstractMessage)
}
}
else {
if (CU.clientNode(firstDiscoEvt.eventNode()))
if (firstDiscoEvt.eventNode().isClient())
exchange = onClientNodeEvent(crdNode);
else
exchange = cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
Expand All @@ -737,7 +737,7 @@ else if (msg instanceof WalStateAbstractMessage)
onLeft();
}
else {
exchange = CU.clientNode(firstDiscoEvt.eventNode()) ? onClientNodeEvent(crdNode) :
exchange = firstDiscoEvt.eventNode().isClient() ? onClientNodeEvent(crdNode) :
onServerNodeEvent(crdNode);
}
}
Expand Down Expand Up @@ -1113,7 +1113,7 @@ private ExchangeType onAffinityChangeRequest(boolean crd) throws IgniteCheckedEx
* @return Exchange type.
*/
private ExchangeType onClientNodeEvent(boolean crd) throws IgniteCheckedException {
assert CU.clientNode(firstDiscoEvt.eventNode()) : this;
assert firstDiscoEvt.eventNode().isClient() : this;

if (firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED) {
onLeft();
Expand All @@ -1134,7 +1134,7 @@ private ExchangeType onClientNodeEvent(boolean crd) throws IgniteCheckedExceptio
* @return Exchange type.
*/
private ExchangeType onServerNodeEvent(boolean crd) throws IgniteCheckedException {
assert !CU.clientNode(firstDiscoEvt.eventNode()) : this;
assert !firstDiscoEvt.eventNode().isClient() : this;

if (firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED) {
onLeft();
Expand Down Expand Up @@ -1965,7 +1965,7 @@ private boolean addMergedJoinExchange(ClusterNode node, @Nullable GridDhtPartiti

boolean wait = false;

if (CU.clientNode(node)) {
if (node.isClient()) {
if (msg != null)
waitAndReplyToNode(nodeId, msg);
}
Expand Down Expand Up @@ -2237,7 +2237,7 @@ public void waitAndReplyToNode(final UUID nodeId, final GridDhtPartitionsSingleM
}

if (finishState0 == null) {
assert firstDiscoEvt.type() == EVT_NODE_JOINED && CU.clientNode(firstDiscoEvt.eventNode()) : this;
assert firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isClient() : this;

ClusterNode node = cctx.node(nodeId);

Expand Down Expand Up @@ -3170,7 +3170,7 @@ public void onReceiveFullMessage(final ClusterNode node, final GridDhtPartitions
*/
public void onReceivePartitionRequest(final ClusterNode node, final GridDhtPartitionsSingleRequest msg) {
assert !cctx.kernalContext().clientNode() || msg.restoreState();
assert !node.isDaemon() && !CU.clientNode(node) : node;
assert !node.isDaemon() && !node.isClient() : node;

initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> fut) {
Expand Down
Expand Up @@ -231,7 +231,7 @@ private Collection<ClusterNode> aliveNodesForTopologyVer(AffinityTopologyVersion
Collection<ClusterNode> histNodes = discovery.topology(topVer.topologyVersion());

if (histNodes != null)
return histNodes.stream().filter(n -> !CU.clientNode(n) && !n.isDaemon() && discovery.alive(n))
return histNodes.stream().filter(n -> !n.isClient() && !n.isDaemon() && discovery.alive(n))
.collect(Collectors.toList());
else
throw new IgniteException("Topology " + topVer + " not found in discovery history "
Expand Down
Expand Up @@ -362,7 +362,7 @@ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTxLocal(final

if (req.firstClientRequest()) {
assert req.concurrency() == OPTIMISTIC : req;
assert CU.clientNode(nearNode) : nearNode;
assert nearNode.isClient() : nearNode;

top = firstEntry.context().topology();

Expand Down
Expand Up @@ -519,7 +519,7 @@ private void checkConfigurationConsistency(IgniteSpiContext spiCtx, ClusterNode
if (!enabled)
return;

if (!checkClient && (CU.clientNode(getLocalNode()) || CU.clientNode(node)))
if (!checkClient && (getLocalNode().isClient() || node.isClient()))
return;

String clsAttr = createSpiAttributeName(IgniteNodeAttributes.ATTR_SPI_CLASS);
Expand Down
Expand Up @@ -3513,7 +3513,7 @@ protected void processClientCreationError(

if (!commErrResolve && enableForcibleNodeKill) {
if (ctx.node(node.id()) != null
&& (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
&& (node.isClient() || !getLocalNode().isClient()) &&
connectionError(errs)) {
String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
"cluster [" + "rmtNode=" + node + ']';
Expand Down

0 comments on commit 2fc9073

Please sign in to comment.