Skip to content

Commit

Permalink
#IGNITE-45 - WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Goncharuk committed Mar 3, 2015
1 parent 3468369 commit 893d0fe
Show file tree
Hide file tree
Showing 9 changed files with 438 additions and 37 deletions.
Expand Up @@ -381,6 +381,13 @@ protected GridKernalContextImpl(
* @param comp Manager to add.
*/
public void add(GridComponent comp) {
add(comp, true);
}

/**
* @param comp Manager to add.
*/
public void add(GridComponent comp, boolean addToList) {
assert comp != null;

/*
Expand Down Expand Up @@ -471,7 +478,8 @@ else if (comp instanceof ClusterProcessor)
else
assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass();

comps.add(comp);
if (addToList)
comps.add(comp);
}

/**
Expand Down
Expand Up @@ -740,6 +740,12 @@ public void start(final IgniteConfiguration cfg,

ackSecurity(ctx);

// Assign discovery manager to context before other processors start so they
// are able to register custom event listener.
GridManager discoMgr = new GridDiscoveryManager(ctx);

ctx.add(discoMgr, false);

// Start processors before discovery manager, so they will
// be able to start receiving messages once discovery completes.
startProcessor(ctx, new GridClockSyncProcessor(ctx), attrs);
Expand Down Expand Up @@ -776,7 +782,7 @@ public void start(final IgniteConfiguration cfg,
gw.setState(STARTED);

// Start discovery manager last to make sure that grid is fully initialized.
startManager(ctx, new GridDiscoveryManager(ctx), attrs);
startManager(ctx, discoMgr, attrs);
}
finally {
gw.writeUnlock();
Expand Down
Expand Up @@ -163,6 +163,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Metrics update worker. */
private final MetricsUpdater metricsUpdater = new MetricsUpdater();

/** Custom event listener. */
private GridPlainInClosure<Serializable> customEvtLsnr;

/** @param ctx Context. */
public GridDiscoveryManager(GridKernalContext ctx) {
super(ctx, ctx.config().getDiscoverySpi());
Expand Down Expand Up @@ -304,6 +307,15 @@ public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ve
return;
}

if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
try {
customEvtLsnr.apply(data);
}
catch (Exception e) {
U.error(log, "Failed to notify direct custom event listener: " + data, e);
}
}

if (topVer > 0 && (type == EVT_NODE_JOINED || type == EVT_NODE_FAILED || type == EVT_NODE_LEFT)) {
boolean set = GridDiscoveryManager.this.topVer.setIfGreater(topVer);

Expand Down Expand Up @@ -379,6 +391,13 @@ public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ve
log.debug(startInfo());
}

/**
* @param customEvtLsnr Custom event listener.
*/
public void setCustomEventListener(GridPlainInClosure<Serializable> customEvtLsnr) {
this.customEvtLsnr = customEvtLsnr;
}

/**
* @return Metrics.
*/
Expand Down Expand Up @@ -1488,17 +1507,17 @@ else if (log.isDebugEnabled())
}

case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent();

customEvt.node(ctx.discovery().localNode());
customEvt.eventNode(node);
customEvt.type(type);
customEvt.topologySnapshot(topVer, null);
customEvt.data(evt.get5());
if (ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) {
DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent();

assert ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT);
customEvt.node(ctx.discovery().localNode());
customEvt.eventNode(node);
customEvt.type(type);
customEvt.topologySnapshot(topVer, null);
customEvt.data(evt.get5());

ctx.event().record(customEvt);
ctx.event().record(customEvt);
}

return;
}
Expand Down
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;

import java.io.*;

/**
* Cache start descriptor.
*/
public class DynamicCacheDescriptor implements Serializable {
/** Cache configuration. */
@GridToStringExclude
private CacheConfiguration cacheCfg;

/** Deploy filter bytes. */
@GridToStringExclude
private byte[] deployFltrBytes;

/** Cache start ID. */
private IgniteUuid startId;

/**
* @param cacheCfg Cache configuration.
* @param deployFltrBytes Deployment filter bytes.
*/
public DynamicCacheDescriptor(CacheConfiguration cacheCfg, byte[] deployFltrBytes, IgniteUuid startId) {
this.cacheCfg = cacheCfg;
this.deployFltrBytes = deployFltrBytes;
this.startId = startId;
}

/**
* @return Cache configuration.
*/
public CacheConfiguration cacheConfiguration() {
return cacheCfg;
}

/**
* @return Start ID.
*/
public IgniteUuid startId() {
return startId;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheDescriptor.class, this, "cacheName", cacheCfg.getName());
}
}
Expand Up @@ -384,6 +384,16 @@ void onDiscoveryEvent(UUID nodeId, GridDhtPartitionsExchangeFuture<K, V> fut) {
}
}

/**
* Callback to start exchange for dynamically started cache.
*
* @param cacheDesc Cache descriptor.
*/
public void onCacheDeployed(DynamicCacheDescriptor cacheDesc) {
// TODO IGNITE-45 move to exchange future.
cctx.kernalContext().cache().onCacheStartFinished(cacheDesc);
}

/**
* @return {@code True} if topology has changed.
*/
Expand Down
Expand Up @@ -44,16 +44,20 @@
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.lifecycle.*;
import org.apache.ignite.spi.*;
import org.jetbrains.annotations.*;

import javax.cache.configuration.*;
import javax.cache.integration.*;
import javax.management.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;

import static org.apache.ignite.IgniteSystemProperties.*;
import static org.apache.ignite.cache.CacheAtomicityMode.*;
Expand Down Expand Up @@ -102,6 +106,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Transaction interface implementation. */
private IgniteTransactionsImpl transactions;

/** Pending cache starts. */
private ConcurrentMap<String, IgniteInternalFuture> pendingStarts = new ConcurrentHashMap<>();

/** Dynamic caches. */
private ConcurrentMap<String, DynamicCacheDescriptor> dynamicCaches = new ConcurrentHashMap<>();

/**
* @param ctx Kernal context.
*/
Expand Down Expand Up @@ -558,6 +568,13 @@ private void cleanup(CacheConfiguration cfg, @Nullable Object rsrc, boolean near

maxPreloadOrder = validatePreloadOrder(ctx.config().getCacheConfiguration());

ctx.discovery().setCustomEventListener(new GridPlainInClosure<Serializable>() {
@Override public void apply(Serializable evt) {
if (evt instanceof DynamicCacheDescriptor)
onCacheDeploymentRequested((DynamicCacheDescriptor)evt);
}
});

// Internal caches which should not be returned to user.
IgfsConfiguration[] igfsCfgs = ctx.grid().configuration().getIgfsConfiguration();

Expand Down Expand Up @@ -915,6 +932,23 @@ private void cleanup(CacheConfiguration cfg, @Nullable Object rsrc, boolean near
log.debug("Started cache processor.");
}

/**
* Callback invoked when first exchange future for dynamic cache is completed.
*
* @param startDesc Cache start descriptor.
*/
public void onCacheStartFinished(DynamicCacheDescriptor startDesc) {
CacheConfiguration ccfg = startDesc.cacheConfiguration();

DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName());

if (fut != null && fut.startId().equals(startDesc.startId())) {
fut.onDone();

pendingStarts.remove(ccfg.getName(), fut);
}
}

/**
* Creates shared context.
*
Expand Down Expand Up @@ -971,6 +1005,89 @@ private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx)
attrs.put(ATTR_CACHE_INTERCEPTORS, interceptors);
}

/**
* Dynamically starts cache.
*
* @param ccfg Cache configuration.
* @param nodeFilter Node filter to select nodes on which the cache should be deployed.
* @return Future that will be completed when cache is deployed.
*/
public IgniteInternalFuture<?> startCache(CacheConfiguration ccfg, IgnitePredicate<ClusterNode> nodeFilter) {
if (nodeFilter == null)
nodeFilter = F.alwaysTrue();

DynamicCacheStartFuture fut = new DynamicCacheStartFuture(ctx, IgniteUuid.fromUuid(ctx.localNodeId()));

try {
byte[] filterBytes = ctx.config().getMarshaller().marshal(nodeFilter);

for (CacheConfiguration ccfg0 : ctx.config().getCacheConfiguration()) {
if (ccfg0.getName().equals(ccfg.getName()))
return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " +
"(a cache with the same name is already configured): " + ccfg.getName()));
}

if (caches.containsKey(ccfg.getName()))
return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " +
"(a cache with the same name is already started): " + ccfg.getName()));

IgniteInternalFuture<?> old = pendingStarts.putIfAbsent(ccfg.getName(), fut);

if (old != null)
return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " +
"(a cache with the same name is already started): " + ccfg.getName()));

ctx.discovery().sendCustomEvent(new DynamicCacheDescriptor(ccfg, filterBytes, fut.startId()));

return fut;
}
catch (IgniteCheckedException e) {
fut.onDone(e);

// Safety.
pendingStarts.remove(ccfg.getName(), fut);

return fut;
}
}

/**
* Callback invoked from discovery thread when cache deployment request is received.
*
* @param startDesc Cache start descriptor.
*/
private void onCacheDeploymentRequested(DynamicCacheDescriptor startDesc) {
// TODO IGNITE-45 remove debug
U.debug(log, "Received start notification: " + startDesc);

CacheConfiguration ccfg = startDesc.cacheConfiguration();

// Check if cache with the same name was concurrently started form different node.
if (dynamicCaches.containsKey(ccfg.getName())) {
// If local node initiated start, fail the start future.
DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName());

if (startFut != null && startFut.startId().equals(startDesc.startId())) {
assert !startFut.syncNotify();

startFut.onDone(new IgniteCheckedException("Failed to start cache " +
"(a cache with the same name is already started): " + ccfg.getName()));

pendingStarts.remove(ccfg.getName(), startFut);
}

return;
}

DynamicCacheDescriptor old = dynamicCaches.put(ccfg.getName(), startDesc);

assert old == null : "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';

// TODO IGNITE-45 create cache context here.

sharedCtx.exchange().onCacheDeployed(startDesc);
}

/**
* Checks that preload-order-dependant caches has SYNC or ASYNC preloading mode.
*
Expand Down Expand Up @@ -1855,6 +1972,32 @@ private Iterable<Object> lifecycleAwares(CacheConfiguration ccfg, Object...objs)
return ret;
}

/**
*
*/
@SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
private static class DynamicCacheStartFuture extends GridFutureAdapter<Object> {
/** Start ID. */
private IgniteUuid startId;

/**
* @param ctx Kernal context.
*/
private DynamicCacheStartFuture(GridKernalContext ctx, IgniteUuid startId) {
// Start future can be completed from discovery thread, notification must NOT be sync.
super(ctx, false);

this.startId = startId;
}

/**
* @return Start ID.
*/
private IgniteUuid startId() {
return startId;
}
}

/**
*
*/
Expand Down

0 comments on commit 893d0fe

Please sign in to comment.