Skip to content

Commit

Permalink
IGNITE-5392 - Joining node must accept cluster active status
Browse files Browse the repository at this point in the history
  • Loading branch information
dgovorukhin authored and agoncharuk committed Jun 11, 2017
1 parent f9ed612 commit bdbba0e
Show file tree
Hide file tree
Showing 73 changed files with 4,031 additions and 516 deletions.
Expand Up @@ -69,7 +69,7 @@ enum DiscoveryDataExchangeType {
* *
* @throws IgniteCheckedException Throws in case of any errors. * @throws IgniteCheckedException Throws in case of any errors.
*/ */
public void start(boolean activeOnStart) throws IgniteCheckedException; public void start() throws IgniteCheckedException;


/** /**
* Stops grid component. * Stops grid component.
Expand All @@ -86,7 +86,7 @@ enum DiscoveryDataExchangeType {
* *
* @throws IgniteCheckedException Thrown in case of any errors. * @throws IgniteCheckedException Thrown in case of any errors.
*/ */
public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException; public void onKernalStart() throws IgniteCheckedException;


/** /**
* Callback to notify that kernal is about to stop. * Callback to notify that kernal is about to stop.
Expand Down
Expand Up @@ -51,7 +51,7 @@ public PluginProvider plugin() {


/** {@inheritDoc} */ /** {@inheritDoc} */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override public void start(boolean activeOnStart) throws IgniteCheckedException { @Override public void start() throws IgniteCheckedException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


Expand All @@ -61,7 +61,7 @@ public PluginProvider plugin() {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException { @Override public void onKernalStart() throws IgniteCheckedException {
plugin.onIgniteStart(); plugin.onIgniteStart();
} }


Expand Down
Expand Up @@ -926,7 +926,8 @@ public void start(
startProcessor(new GridAffinityProcessor(ctx)); startProcessor(new GridAffinityProcessor(ctx));
startProcessor(createComponent(GridSegmentationProcessor.class, ctx)); startProcessor(createComponent(GridSegmentationProcessor.class, ctx));
startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx)); startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
startProcessor(new GridCacheProcessor(ctx));startProcessor(new GridClusterStateProcessor(ctx)); startProcessor(new GridClusterStateProcessor(ctx));
startProcessor(new GridCacheProcessor(ctx));
startProcessor(new GridQueryProcessor(ctx)); startProcessor(new GridQueryProcessor(ctx));
startProcessor(new SqlListenerProcessor(ctx)); startProcessor(new SqlListenerProcessor(ctx));
startProcessor(new GridServiceProcessor(ctx)); startProcessor(new GridServiceProcessor(ctx));
Expand Down Expand Up @@ -991,10 +992,10 @@ public void start(
ctx.performance().addAll(OsConfigurationSuggestions.getSuggestions()); ctx.performance().addAll(OsConfigurationSuggestions.getSuggestions());


// Notify discovery manager the first to make sure that topology is discovered. // Notify discovery manager the first to make sure that topology is discovered.
ctx.discovery().onKernalStart(activeOnStart); ctx.discovery().onKernalStart();


// Notify IO manager the second so further components can send and receive messages. // Notify IO manager the second so further components can send and receive messages.
ctx.io().onKernalStart(activeOnStart); ctx.io().onKernalStart();


// Start plugins. // Start plugins.
for (PluginProvider provider : ctx.plugins().allProviders()) for (PluginProvider provider : ctx.plugins().allProviders())
Expand All @@ -1017,7 +1018,7 @@ public void start(


if (!skipDaemon(comp)) { if (!skipDaemon(comp)) {
try { try {
comp.onKernalStart(activeOnStart); comp.onKernalStart();
} }
catch (IgniteNeedReconnectException e) { catch (IgniteNeedReconnectException e) {
assert ctx.discovery().reconnectSupported(); assert ctx.discovery().reconnectSupported();
Expand Down Expand Up @@ -1749,7 +1750,7 @@ private void startManager(GridManager mgr) throws IgniteCheckedException {


try { try {
if (!skipDaemon(mgr)) if (!skipDaemon(mgr))
mgr.start(cfg.isActiveOnStart()); mgr.start();
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
U.error(log, "Failed to start manager: " + mgr, e); U.error(log, "Failed to start manager: " + mgr, e);
Expand All @@ -1767,7 +1768,7 @@ private void startProcessor(GridProcessor proc) throws IgniteCheckedException {


try { try {
if (!skipDaemon(proc)) if (!skipDaemon(proc))
proc.start(cfg.isActiveOnStart()); proc.start();
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Failed to start processor: " + proc, e); throw new IgniteCheckedException("Failed to start processor: " + proc, e);
Expand Down
Expand Up @@ -362,7 +362,7 @@ protected final String stopInfo() {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public final void onKernalStart(boolean activeOnStart) throws IgniteCheckedException { @Override public final void onKernalStart() throws IgniteCheckedException {
for (final IgniteSpi spi : spis) { for (final IgniteSpi spi : spis) {
try { try {
spi.onContextInitialized(new IgniteSpiContext() { spi.onContextInitialized(new IgniteSpiContext() {
Expand Down
Expand Up @@ -102,7 +102,7 @@ public GridCheckpointManager(GridKernalContext ctx) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void start(boolean activeOnStart) throws IgniteCheckedException { @Override public void start() throws IgniteCheckedException {
for (CheckpointSpi spi : getSpis()) { for (CheckpointSpi spi : getSpis()) {
spi.setCheckpointListener(new CheckpointListener() { spi.setCheckpointListener(new CheckpointListener() {
@Override public void onCheckpointRemoved(String key) { @Override public void onCheckpointRemoved(String key) {
Expand Down
Expand Up @@ -47,7 +47,7 @@ public GridCollisionManager(GridKernalContext ctx) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void start(boolean activeOnStart) throws IgniteCheckedException { @Override public void start() throws IgniteCheckedException {
startSpi(); startSpi();


if (enabled()) { if (enabled()) {
Expand Down
Expand Up @@ -261,7 +261,7 @@ public void resetMetrics() {


/** {@inheritDoc} */ /** {@inheritDoc} */
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override public void start(boolean activeOnStart) throws IgniteCheckedException { @Override public void start() throws IgniteCheckedException {
assertParameter(discoDelay > 0, "discoveryStartupDelay > 0"); assertParameter(discoDelay > 0, "discoveryStartupDelay > 0");


startSpi(); startSpi();
Expand Down
Expand Up @@ -91,7 +91,7 @@ public GridDeploymentManager(GridKernalContext ctx) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void start(boolean activeOnStart) throws IgniteCheckedException { @Override public void start() throws IgniteCheckedException {
GridProtocolHandler.registerDeploymentManager(this); GridProtocolHandler.registerDeploymentManager(this);


assertParameter(ctx.config().getDeploymentMode() != null, "ctx.config().getDeploymentMode() != null"); assertParameter(ctx.config().getDeploymentMode() != null, "ctx.config().getDeploymentMode() != null");
Expand Down
Expand Up @@ -73,6 +73,7 @@
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
Expand Down Expand Up @@ -463,7 +464,7 @@ private void updateClientNodes(UUID leftNodeId) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void start(boolean activeOnStart) throws IgniteCheckedException { @Override public void start() throws IgniteCheckedException {
long totSysMemory = -1; long totSysMemory = -1;


try { try {
Expand Down Expand Up @@ -741,19 +742,36 @@ else if (type == EVT_CLIENT_NODE_RECONNECTED) {
} }


@Override public void onExchange(DiscoveryDataBag dataBag) { @Override public void onExchange(DiscoveryDataBag dataBag) {
assert dataBag != null;
assert dataBag.joiningNodeId() != null;

if (ctx.localNodeId().equals(dataBag.joiningNodeId())) { if (ctx.localNodeId().equals(dataBag.joiningNodeId())) {
//NodeAdded msg reached joining node after round-trip over the ring // NodeAdded msg reached joining node after round-trip over the ring.
GridClusterStateProcessor stateProc = ctx.state();

stateProc.onGridDataReceived(dataBag.gridDiscoveryData(
stateProc.discoveryDataType().ordinal()));

for (GridComponent c : ctx.components()) { for (GridComponent c : ctx.components()) {
if (c.discoveryDataType() != null) if (c.discoveryDataType() != null && c != stateProc)
c.onGridDataReceived(dataBag.gridDiscoveryData(c.discoveryDataType().ordinal())); c.onGridDataReceived(dataBag.gridDiscoveryData(c.discoveryDataType().ordinal()));
} }
} }
else { else {
//discovery data from newly joined node has to be applied to the current old node // Discovery data from newly joined node has to be applied to the current old node.
GridClusterStateProcessor stateProc = ctx.state();

JoiningNodeDiscoveryData data0 = dataBag.newJoinerDiscoveryData(
stateProc.discoveryDataType().ordinal());

assert data0 != null;

stateProc.onJoiningNodeDataReceived(data0);

for (GridComponent c : ctx.components()) { for (GridComponent c : ctx.components()) {
if (c.discoveryDataType() != null) { if (c.discoveryDataType() != null && c != stateProc) {
JoiningNodeDiscoveryData data = JoiningNodeDiscoveryData data = dataBag.newJoinerDiscoveryData(
dataBag.newJoinerDiscoveryData(c.discoveryDataType().ordinal()); c.discoveryDataType().ordinal());


if (data != null) if (data != null)
c.onJoiningNodeDataReceived(data); c.onJoiningNodeDataReceived(data);
Expand Down Expand Up @@ -1222,17 +1240,6 @@ private void checkAttributes(Iterable<ClusterNode> nodes) throws IgniteCheckedEx
", rmtAddrs=" + U.addressesAsString(n) + ']'); ", rmtAddrs=" + U.addressesAsString(n) + ']');
} }


boolean rmtActiveOnStart = n.attribute(ATTR_ACTIVE_ON_START);

if (locActiveOnStart != rmtActiveOnStart) {
throw new IgniteCheckedException("Remote node has active on start flag different from local " +
"[locId8=" + U.id8(locNode.id()) +
", locActiveOnStart=" + locActiveOnStart +
", rmtId8=" + U.id8(n.id()) +
", rmtActiveOnStart=" + rmtActiveOnStart +
", rmtAddrs=" + U.addressesAsString(n) + ']');
}

Boolean rmtSrvcCompatibilityEnabled = n.attribute(ATTR_SERVICES_COMPATIBILITY_MODE); Boolean rmtSrvcCompatibilityEnabled = n.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);


if (!F.eq(locSrvcCompatibilityEnabled, rmtSrvcCompatibilityEnabled)) { if (!F.eq(locSrvcCompatibilityEnabled, rmtSrvcCompatibilityEnabled)) {
Expand Down
Expand Up @@ -273,7 +273,7 @@ private void leaveBusy() {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void start(boolean activeOnStart) throws IgniteCheckedException { @Override public void start() throws IgniteCheckedException {
Map<IgnitePredicate<? extends Event>, int[]> evtLsnrs = ctx.config().getLocalEventListeners(); Map<IgnitePredicate<? extends Event>, int[]> evtLsnrs = ctx.config().getLocalEventListeners();


if (evtLsnrs != null) { if (evtLsnrs != null) {
Expand Down
Expand Up @@ -40,7 +40,7 @@ public GridFailoverManager(GridKernalContext ctx) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void start(boolean activeOnStart) throws IgniteCheckedException { @Override public void start() throws IgniteCheckedException {
startSpi(); startSpi();


if (log.isDebugEnabled()) if (log.isDebugEnabled())
Expand Down
Expand Up @@ -48,7 +48,7 @@ public GridIndexingManager(GridKernalContext ctx) {
/** /**
* @throws IgniteCheckedException Thrown in case of any errors. * @throws IgniteCheckedException Thrown in case of any errors.
*/ */
@Override public void start(boolean activeOnStart) throws IgniteCheckedException { @Override public void start() throws IgniteCheckedException {
startSpi(); startSpi();


if (log.isDebugEnabled()) if (log.isDebugEnabled())
Expand Down
Expand Up @@ -45,7 +45,7 @@ public GridLoadBalancerManager(GridKernalContext ctx) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void start(boolean activeOnStart) throws IgniteCheckedException { @Override public void start() throws IgniteCheckedException {
startSpi(); startSpi();


if (log.isDebugEnabled()) if (log.isDebugEnabled())
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
Expand Down Expand Up @@ -187,7 +188,6 @@ public void initializeForCache(CacheGroupDescriptor grpDesc,
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
public void storeCacheData(CacheGroupDescriptor grpDesc, StoredCacheData cacheData) throws IgniteCheckedException; public void storeCacheData(CacheGroupDescriptor grpDesc, StoredCacheData cacheData) throws IgniteCheckedException;

/** /**
* @param grpId Cache group ID. * @param grpId Cache group ID.
* @return {@code True} if index store for given cache group existed before node started. * @return {@code True} if index store for given cache group existed before node started.
Expand Down
Expand Up @@ -55,7 +55,7 @@ protected GridProcessorAdapter(GridKernalContext ctx) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException { @Override public void onKernalStart() throws IgniteCheckedException {
// No-op. // No-op.
} }


Expand Down Expand Up @@ -100,7 +100,7 @@ protected GridProcessorAdapter(GridKernalContext ctx) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void start(boolean activeOnStart) throws IgniteCheckedException { @Override public void start() throws IgniteCheckedException {
// No-op. // No-op.
} }


Expand Down
Expand Up @@ -133,7 +133,7 @@ public GridAffinityProcessor(GridKernalContext ctx) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void start(boolean activeOnStart) throws IgniteCheckedException { @Override public void start() throws IgniteCheckedException {
ctx.event().addLocalEventListener(lsnr, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED); ctx.event().addLocalEventListener(lsnr, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED);
} }


Expand Down
Expand Up @@ -349,11 +349,11 @@ private void updateCachesInfo(ExchangeActions exchActions) {
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
* @return {@code True} if client-only exchange is needed. * @return {@code True} if client-only exchange is needed.
*/ */
public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut, public boolean onCacheChangeRequest(
final GridDhtPartitionsExchangeFuture fut,
boolean crd, boolean crd,
final ExchangeActions exchActions) final ExchangeActions exchActions
throws IgniteCheckedException ) throws IgniteCheckedException {
{
assert exchActions != null && !exchActions.empty() : exchActions; assert exchActions != null && !exchActions.empty() : exchActions;


updateCachesInfo(exchActions); updateCachesInfo(exchActions);
Expand All @@ -377,7 +377,19 @@ public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,


NearCacheConfiguration nearCfg = null; NearCacheConfiguration nearCfg = null;


if (cctx.localNodeId().equals(req.initiatingNodeId())) { if (exchActions.newClusterState() == ClusterState.ACTIVE) {
if (CU.isSystemCache(req.cacheName()))
startCache = true;
else if (!cctx.localNode().isClient()) {
startCache = cctx.cacheContext(action.descriptor().cacheId()) == null &&
CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter());

nearCfg = req.nearCacheConfiguration();
}
else // Only static cache configured on client must be started.
startCache = cctx.kernalContext().state().isLocalConfigure(req.cacheName());
}
else if (cctx.localNodeId().equals(req.initiatingNodeId())) {
startCache = true; startCache = true;


nearCfg = req.nearCacheConfiguration(); nearCfg = req.nearCacheConfiguration();
Expand All @@ -388,9 +400,19 @@ public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
} }


try { try {
// Save configuration before cache started.
if (cctx.pageStore() != null && !cctx.localNode().isClient())
cctx.pageStore().storeCacheData(
cacheDesc.groupDescriptor(),
new StoredCacheData(req.startCacheConfiguration())
);

if (startCache) { if (startCache) {
cctx.cache().prepareCacheStart(cacheDesc, nearCfg, fut.topologyVersion()); cctx.cache().prepareCacheStart(cacheDesc, nearCfg, fut.topologyVersion());


if (exchActions.newClusterState() == null)
cctx.kernalContext().state().onCacheStart(req);

if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) { if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
if (fut.discoCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) if (fut.discoCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName()); U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
Expand Down Expand Up @@ -845,9 +867,11 @@ else if (grpHolder.client() && grp != null) {
* @param descs Cache descriptors. * @param descs Cache descriptors.
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
public void initStartedCaches(boolean crd, public void initStartedCaches(
boolean crd,
final GridDhtPartitionsExchangeFuture fut, final GridDhtPartitionsExchangeFuture fut,
Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException { Collection<DynamicCacheDescriptor> descs
) throws IgniteCheckedException {
for (DynamicCacheDescriptor desc : descs) { for (DynamicCacheDescriptor desc : descs) {
CacheGroupDescriptor grpDesc = desc.groupDescriptor(); CacheGroupDescriptor grpDesc = desc.groupDescriptor();


Expand Down
Expand Up @@ -56,7 +56,7 @@ Map<Integer, CacheGroupInfo> clientCacheGroups() {
/** /**
* @return Information about caches started on re-joining client node. * @return Information about caches started on re-joining client node.
*/ */
Map<String, CacheInfo> clientCaches() { public Map<String, CacheInfo> clientCaches() {
return clientCaches; return clientCaches;
} }


Expand Down
Expand Up @@ -177,6 +177,13 @@ public UUID receivedFrom() {
return rcvdFrom; return rcvdFrom;
} }


/**
* @return Flags.
*/
public long flags() {
return flags;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public String toString() { @Override public String toString() {
return S.toString(CacheData.class, this, "cacheName", cacheCfg.getName()); return S.toString(CacheData.class, this, "cacheName", cacheCfg.getName());
Expand Down

0 comments on commit bdbba0e

Please sign in to comment.