Skip to content

Commit

Permalink
IGNITE-45 - Client mode fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Goncharuk committed Mar 11, 2015
1 parent 52e4a96 commit 9373ed3
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 27 deletions.
Expand Up @@ -71,10 +71,12 @@ public DynamicCacheChangeRequest(String cacheName) {
* Constructor creates near cache start request. * Constructor creates near cache start request.
* *
* @param clientNodeId Client node ID. * @param clientNodeId Client node ID.
* @param startCfg Start cache configuration.
* @param nearCacheCfg Near cache configuration. * @param nearCacheCfg Near cache configuration.
*/ */
public DynamicCacheChangeRequest(UUID clientNodeId, NearCacheConfiguration nearCacheCfg) { public DynamicCacheChangeRequest(UUID clientNodeId, CacheConfiguration startCfg, NearCacheConfiguration nearCacheCfg) {
this.clientNodeId = clientNodeId; this.clientNodeId = clientNodeId;
this.startCfg = startCfg;
this.nearCacheCfg = nearCacheCfg; this.nearCacheCfg = nearCacheCfg;
} }


Expand All @@ -96,7 +98,7 @@ public void deploymentId(IgniteUuid deploymentId) {
* @return {@code True} if this is a start request. * @return {@code True} if this is a start request.
*/ */
public boolean isStart() { public boolean isStart() {
return startCfg != null; return clientNodeId == null && startCfg != null;
} }


/** /**
Expand All @@ -106,6 +108,13 @@ public boolean isClientStart() {
return clientNodeId != null; return clientNodeId != null;
} }


/**
* @return {@code True} if this is a stop request.
*/
public boolean isStop() {
return clientNodeId == null && startCfg == null;
}

/** /**
* @return Cache name. * @return Cache name.
*/ */
Expand Down
Expand Up @@ -99,6 +99,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/ */
private ExchangeFutureSet exchFuts = new ExchangeFutureSet(); private ExchangeFutureSet exchFuts = new ExchangeFutureSet();


public static volatile boolean stop = false;

/** Discovery listener. */ /** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) { @Override public void onEvent(Event evt) {
Expand Down
Expand Up @@ -1244,39 +1244,71 @@ public CacheMode cacheMode(String cacheName) {
* @param req Request to check. * @param req Request to check.
* @return {@code True} if change request was registered to apply. * @return {@code True} if change request was registered to apply.
*/ */
@SuppressWarnings("IfMayBeConditional")
public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) { public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) {
DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName()); DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName());


return desc != null && desc.deploymentId().equals(req.deploymentId()) && desc.cancelled() != req.isStart(); if (desc != null && desc.deploymentId().equals(req.deploymentId())) {
if (req.isStart() || req.isClientStart())
return !desc.cancelled();
else
return desc.cancelled();
}

return false;
} }


/** /**
* @param req Start request. * @param req Start request.
*/ */
public void prepareCacheStart(DynamicCacheChangeRequest req) throws IgniteCheckedException { public void prepareCacheStart(DynamicCacheChangeRequest req) throws IgniteCheckedException {
assert req.isStart(); assert req.isStart() || req.isClientStart();


IgnitePredicate nodeFilter = req.startCacheConfiguration().getNodeFilter(); IgnitePredicate nodeFilter = req.startCacheConfiguration().getNodeFilter();


if (nodeFilter.apply(ctx.discovery().localNode())) { ClusterNode locNode = ctx.discovery().localNode();
GridCacheContext cacheCtx = createCache(req.startCacheConfiguration());


cacheCtx.dynamicDeploymentId(req.deploymentId()); if (req.isStart()) {
if (nodeFilter.apply(locNode)) {
GridCacheContext cacheCtx = createCache(req.startCacheConfiguration());


sharedCtx.addCacheContext(cacheCtx); cacheCtx.dynamicDeploymentId(req.deploymentId());

sharedCtx.addCacheContext(cacheCtx);

startCache(cacheCtx.cache());
onKernalStart(cacheCtx.cache());

caches.put(cacheCtx.name(), cacheCtx.cache());
}
}
else if (req.isClientStart()) {
if (req.clientNodeId().equals(locNode.id())) {
if (nodeFilter.apply(locNode)) {
U.warn(log, "Requested to start client cache on affinity node (will ignore): " + req);

return;
}

GridCacheContext cacheCtx = createCache(req.startCacheConfiguration());


startCache(cacheCtx.cache()); cacheCtx.dynamicDeploymentId(req.deploymentId());
onKernalStart(cacheCtx.cache());


caches.put(cacheCtx.name(), cacheCtx.cache()); sharedCtx.addCacheContext(cacheCtx);

startCache(cacheCtx.cache());
onKernalStart(cacheCtx.cache());

caches.put(cacheCtx.name(), cacheCtx.cache());
}
} }
} }


/** /**
* @param req Stop request. * @param req Stop request.
*/ */
public void prepareCacheStop(DynamicCacheChangeRequest req) { public void prepareCacheStop(DynamicCacheChangeRequest req) {
assert !req.isStart(); assert req.isStop();


// Break the proxy before exchange future is done. // Break the proxy before exchange future is done.
IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(req.cacheName()); IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(req.cacheName());
Expand Down Expand Up @@ -1306,7 +1338,7 @@ public void prepareCacheStop(DynamicCacheChangeRequest req) {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void onExchangeDone(DynamicCacheChangeRequest req) { public void onExchangeDone(DynamicCacheChangeRequest req) {
if (req.isStart()) { if (req.isStart() || req.isClientStart()) {
GridCacheAdapter<?, ?> cache = caches.get(req.cacheName()); GridCacheAdapter<?, ?> cache = caches.get(req.cacheName());


if (cache != null) if (cache != null)
Expand Down Expand Up @@ -1474,19 +1506,19 @@ public Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<Dynam
continue; continue;


if (req.isStart()) { if (req.isStart()) {
if (caches.containsKey(req.cacheName())) { if (registeredCaches.containsKey(req.cacheName())) {
fut.onDone(new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to start cache " + fut.onDone(new IgniteCheckedException("Failed to start cache " +
"(a cache with the same name is already started): " + req.cacheName()))); "(a cache with the same name is already started): " + req.cacheName()));
} }
} }
else { else if (!req.isClientStart()) {
GridCacheAdapter<?, ?> cache = caches.get(req.cacheName()); DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName());


if (cache == null) if (desc == null)
// No-op. // No-op.
fut.onDone(); fut.onDone();
else { else {
IgniteUuid dynamicDeploymentId = cache.context().dynamicDeploymentId(); IgniteUuid dynamicDeploymentId = desc.deploymentId();


assert dynamicDeploymentId != null; assert dynamicDeploymentId != null;


Expand Down Expand Up @@ -2032,10 +2064,15 @@ public <K, V> IgniteCache<K, V> publicJCache(@Nullable String name) {
IgniteCache<K,V> cache = (IgniteCache<K, V>)jCacheProxies.get(name); IgniteCache<K,V> cache = (IgniteCache<K, V>)jCacheProxies.get(name);


if (cache == null) { if (cache == null) {
if (!registeredCaches.containsKey(name)) DynamicCacheDescriptor desc = registeredCaches.get(name);

if (desc == null || desc.cancelled())
throw new IllegalArgumentException("Cache is not started: " + name); throw new IllegalArgumentException("Cache is not started: " + name);


DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ctx.localNodeId(), null); DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ctx.localNodeId(),
desc.cacheConfiguration(), null);

req.deploymentId(desc.deploymentId());


F.first(initiateCacheChanges(F.asList(req))).get(); F.first(initiateCacheChanges(F.asList(req))).get();


Expand Down
Expand Up @@ -427,6 +427,8 @@ public void init() throws IgniteInterruptedCheckedException {
assert oldestNode.get() != null; assert oldestNode.get() != null;


if (init.compareAndSet(false, true)) { if (init.compareAndSet(false, true)) {
U.debug(log, "Initializing exchange future: " + reqs);

if (isDone()) if (isDone())
return; return;


Expand Down Expand Up @@ -578,7 +580,7 @@ private boolean canSkipExchange() {
*/ */
private void startCaches() throws IgniteCheckedException { private void startCaches() throws IgniteCheckedException {
for (DynamicCacheChangeRequest req : reqs) { for (DynamicCacheChangeRequest req : reqs) {
if (req.isStart()) if (req.isStart() || req.isClientStart())
ctx.cache().prepareCacheStart(req); ctx.cache().prepareCacheStart(req);
} }
} }
Expand All @@ -588,7 +590,7 @@ private void startCaches() throws IgniteCheckedException {
*/ */
private void stopCaches() { private void stopCaches() {
for (DynamicCacheChangeRequest req : reqs) { for (DynamicCacheChangeRequest req : reqs) {
if (!req.isStart()) if (req.isStop())
ctx.cache().prepareCacheStop(req); ctx.cache().prepareCacheStop(req);
} }
} }
Expand Down Expand Up @@ -689,8 +691,14 @@ private boolean spreadPartitions() {


if (!F.isEmpty(reqs)) { if (!F.isEmpty(reqs)) {
for (DynamicCacheChangeRequest req : reqs) { for (DynamicCacheChangeRequest req : reqs) {
if (req.isStart() && F.eq(cacheCtx.name(), req.cacheName())) if (F.eq(cacheCtx.name(), req.cacheName())) {
cacheCtx.preloader().onInitialExchangeComplete(err); if (req.isStart())
cacheCtx.preloader().onInitialExchangeComplete(err);
else if (req.isClientStart()) {
if (req.clientNodeId().equals(ctx.localNodeId()))
cacheCtx.preloader().onInitialExchangeComplete(err);
}
}
} }
} }
} }
Expand Down
Expand Up @@ -120,6 +120,8 @@ public void testStartStopCacheMultithreadedSameNode() throws Exception {
try { try {
fut.get(); fut.get();


info("Succeeded: " + System.identityHashCode(fut));

succeeded++; succeeded++;
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
Expand Down Expand Up @@ -179,6 +181,8 @@ public void testStartCacheMultithreadedDifferentNodes() throws Exception {
try { try {
fut.get(); fut.get();


info("Succeeded: " + System.identityHashCode(fut));

succeeded++; succeeded++;
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
Expand Down Expand Up @@ -389,7 +393,7 @@ public void testDeployFilter() throws Exception {
else else
GridTestUtils.assertThrows(log, new Callable<Object>() { GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception { @Override public Object call() throws Exception {
return kernal0.jcache(DYNAMIC_CACHE_NAME); return kernal0.cache(DYNAMIC_CACHE_NAME);
} }
}, IllegalArgumentException.class, null); }, IllegalArgumentException.class, null);
} }
Expand Down Expand Up @@ -424,4 +428,49 @@ public void testFailWhenConfiguredCacheExists() throws Exception {
} }
}, IgniteCheckedException.class, null); }, IgniteCheckedException.class, null);
} }

/**
* @throws Exception If failed.
*/
public void _testClientCache() throws Exception {
try {
testAttribute = false;

startGrid(nodeCount());

final IgniteKernal kernal = (IgniteKernal)grid(0);

CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);

ccfg.setName(DYNAMIC_CACHE_NAME);

ccfg.setNodeFilter(NODE_FILTER);

kernal.context().cache().dynamicStartCache(ccfg).get();

GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
IgniteKernal ignite = (IgniteKernal)grid(nodeCount());

return ignite.cache(DYNAMIC_CACHE_NAME);
}
}, IllegalArgumentException.class, null);

GridCachePartitionExchangeManager.stop = true;

// Should obtain client cache on new node.
IgniteCache<Object, Object> clientCache = ignite(nodeCount()).jcache(DYNAMIC_CACHE_NAME);

clientCache.put("1", "1");

for (int g = 0; g < nodeCount() + 1; g++)
assertEquals("1", ignite(g).jcache(DYNAMIC_CACHE_NAME).get("1"));

kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get();
}
finally {
stopGrid(nodeCount());
}
}
} }

0 comments on commit 9373ed3

Please sign in to comment.