Skip to content

Commit

Permalink
ignite-5852 : Introduced BaselineTopology mechanics.
Browse files Browse the repository at this point in the history
  • Loading branch information
ilantukh committed Oct 6, 2017
1 parent 0919d9f commit b32976e
Show file tree
Hide file tree
Showing 28 changed files with 1,930 additions and 967 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,13 @@ public final class IgniteSystemProperties {
*/
public static final String IGNITE_WAL_SERIALIZER_VERSION = "IGNITE_WAL_SERIALIZER_VERSION";

/**
* If the property is set Ignite will use legacy node comparator (based on node order) inste
*
* Default value is {@code false}.
*/
public static final String IGNITE_USE_LEGACY_NODE_COMPARATOR = "IGNITE_USE_LEGACY_NODE_COMPARATOR";

/**
* Enforces singleton.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessorImpl;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessorImpl;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal;

import java.util.Collection;
import java.util.Set;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
Expand Down Expand Up @@ -174,4 +175,12 @@ public <K, V> IgniteBiTuple<IgniteCache<K, V>, Boolean> getOrCreateCache0(CacheC
* @param rebalanceEnabled rebalance enabled flag.
*/
public void rebalanceEnabled(boolean rebalanceEnabled);

/**
* Changes Ignite grid state to active or inactive and sets it's baseline topology.
*
* @param active If {@code True} start activation process. If {@code False} start deactivation process.
* @param nodes Baseline topology nodes.
*/
public void activeEx(boolean active, Collection<ClusterNode> nodes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -119,6 +120,7 @@
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessorImpl;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
Expand Down Expand Up @@ -941,7 +943,7 @@ public void start(
startProcessor(new GridAffinityProcessor(ctx));
startProcessor(createComponent(GridSegmentationProcessor.class, ctx));
startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
startProcessor(new GridClusterStateProcessor(ctx));
startProcessor(createComponent(GridClusterStateProcessor.class, ctx));
startProcessor(new GridCacheProcessor(ctx));
startProcessor(new GridQueryProcessor(ctx));
startProcessor(new ClientListenerProcessor(ctx));
Expand Down Expand Up @@ -3489,6 +3491,21 @@ public IgniteInternalFuture<?> getOrCreateCacheAsync(String cacheName, boolean c
}
}

/** {@inheritDoc} */
@Override public void activeEx(boolean active, Collection<ClusterNode> nodes) {
guard();

try {
context().state().changeGlobalState(active, nodes).get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}

/** {@inheritDoc} */
@Override public void resetLostPartitions(Collection<String> cacheNames) {
CU.validateCacheNames(cacheNames);
Expand Down Expand Up @@ -3944,6 +3961,9 @@ private static <T extends GridComponent> T createComponent(Class<T> cls, GridKer
if (cls.equals(DiscoveryNodeValidationProcessor.class))
return (T)new OsDiscoveryNodeValidationProcessor(ctx);

if (cls.equals(GridClusterStateProcessor.class))
return (T)new GridClusterStateProcessorImpl(ctx);

Class<T> implCls = null;

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.ignite.internal.cluster;

import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.Nullable;

/**
* Representation of cluster node that isn't currently present in cluster.
*/
public class DetachedClusterNode implements ClusterNode {
/** Consistent ID. */
private final Object consistentId;

/** Node attributes. */
private final Map<String, Object> attributes;

/**
* @param consistentId Consistent ID.
* @param attributes Node attributes.
*/
public DetachedClusterNode(Object consistentId, Map<String, Object> attributes) {
this.consistentId = consistentId;
this.attributes = attributes;
}

/** {@inheritDoc} */
@Override public UUID id() {
throw new UnsupportedOperationException("Not implemented");
}

/** {@inheritDoc} */
@Override public Object consistentId() {
return consistentId;
}

/** {@inheritDoc} */
@Nullable @Override public <T> T attribute(String name) {
return (T)attributes.get(name);
}

/** {@inheritDoc} */
@Override public ClusterMetrics metrics() {
throw new UnsupportedOperationException("Not implemented");
}

/** {@inheritDoc} */
@Override public Map<String, Object> attributes() {
return attributes;
}

/** {@inheritDoc} */
@Override public Collection<String> addresses() {
throw new UnsupportedOperationException("Not implemented");
}

/** {@inheritDoc} */
@Override public Collection<String> hostNames() {
throw new UnsupportedOperationException("Not implemented");
}

/** {@inheritDoc} */
@Override public long order() {
throw new UnsupportedOperationException("Not implemented");
}

/** {@inheritDoc} */
@Override public IgniteProductVersion version() {
throw new UnsupportedOperationException("Not implemented");
}

/** {@inheritDoc} */
@Override public boolean isLocal() {
return false;
}

/** {@inheritDoc} */
@Override public boolean isDaemon() {
return false;
}

/** {@inheritDoc} */
@Override public boolean isClient() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.ignite.internal.cluster;

import java.io.Serializable;
import java.util.Comparator;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;

/**
* Node order comparator.
*/
public class NodeOrderComparator implements Comparator<ClusterNode>, Serializable {
/** */
private static final long serialVersionUID = 0L;

/** */
private static final Comparator<ClusterNode> INSTANCE = new NodeOrderComparator();

public static final Comparator<ClusterNode> getInstance() {
return IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_USE_LEGACY_NODE_COMPARATOR) ?
NodeOrderLegacyComparator.INSTANCE : INSTANCE;
}

/**
* Private constructor. Don't create this class, use {@link #getInstance()}.
*/
private NodeOrderComparator() {

}

/** {@inheritDoc} */
@Override public int compare(ClusterNode n1, ClusterNode n2) {
Object consId1 = n1.consistentId();
Object consId2 = n2.consistentId();

if (consId1 instanceof Comparable && consId2 instanceof Comparable) {
return ((Comparable)consId1).compareTo(consId2);
}

return consId1.toString().compareTo(consId2.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.ignite.internal;
package org.apache.ignite.internal.cluster;

import java.io.Serializable;
import java.util.Comparator;
Expand All @@ -24,17 +25,17 @@
/**
* Node order comparator.
*/
public class GridNodeOrderComparator implements Comparator<ClusterNode>, Serializable {
public class NodeOrderLegacyComparator implements Comparator<ClusterNode>, Serializable {
/** */
private static final long serialVersionUID = 0L;

/** */
public static final Comparator<ClusterNode> INSTANCE = new GridNodeOrderComparator();
public static final Comparator<ClusterNode> INSTANCE = new NodeOrderLegacyComparator();

/**
* Private constructor. Don't create this class, use {@link #INSTANCE}.
*/
private GridNodeOrderComparator() {
private NodeOrderLegacyComparator() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridNodeOrderComparator;
import org.apache.ignite.internal.cluster.NodeOrderComparator;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
Expand All @@ -79,6 +79,7 @@
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessorImpl;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
Expand Down Expand Up @@ -2224,7 +2225,7 @@ public void reconnect() {
Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size());

Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(NodeOrderComparator.getInstance());

for (ClusterNode node : allNodes) {
assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
Expand Down

0 comments on commit b32976e

Please sign in to comment.