Skip to content

Commit

Permalink
# ignite-51-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Mar 9, 2015
1 parent 148b731 commit 1e4ce34
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 27 deletions.
Expand Up @@ -42,6 +42,9 @@ public class CacheObjectContext {
/** */
private boolean unmarshalVals;

/** */
private boolean p2pEnabled;

/**
* @param kernalCtx Kernal context.
* @param dfltAffMapper Default affinity mapper.
Expand All @@ -53,13 +56,21 @@ public CacheObjectContext(GridKernalContext kernalCtx,
boolean cpyOnGet,
boolean unmarshalVals) {
this.kernalCtx = kernalCtx;
this.p2pEnabled = kernalCtx.config().isPeerClassLoadingEnabled();
this.dfltAffMapper = dfltAffMapper;
this.cpyOnGet = cpyOnGet;
this.unmarshalVals = unmarshalVals;

proc = kernalCtx.cacheObjects();
}

/**
* @return {@code True} if peer class loading is enabled.
*/
public boolean p2pEnabled() {
return p2pEnabled;
}

/**
* @return Copy on get flag.
*/
Expand Down
Expand Up @@ -1221,7 +1221,6 @@ public Set<GridCacheEntryEx> allEntries() {
return entrySet((CacheEntryPredicate[])null);
}


/** {@inheritDoc} */
@Override public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {
return map.entriesx(filter);
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.ignite.events.*;
import org.apache.ignite.internal.managers.deployment.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.lang.*;
Expand Down Expand Up @@ -240,26 +241,20 @@ public void apply() {
private void onUndeploy0(final ClassLoader ldr, final GridCacheContext<K, V> cacheCtx) {
GridCacheAdapter<K, V> cache = cacheCtx.cache();

Collection<K> keys = new ArrayList<>();
Collection<KeyCacheObject> keys = new ArrayList<>();

for (Cache.Entry<K, V> e : cache.entrySet()) {
boolean undeploy = cacheCtx.isNear() ?
undeploy(ldr, e, cacheCtx.near()) || undeploy(ldr, e, cacheCtx.near().dht()) :
undeploy(ldr, e, cacheCtx.cache());
addEntries(ldr, keys, cache);

if (undeploy)
keys.add(e.getKey());
}
if (cache.isNear())
addEntries(ldr, keys, (((GridNearCacheAdapter)cache).dht()));

if (log.isDebugEnabled())
log.debug("Finished searching keys for undeploy [keysCnt=" + keys.size() + ']');

for (K k : keys)
cache.clearLocally(k);
cache.clearLocally(keys, true);

if (cacheCtx.isNear())
for (K k : keys)
cacheCtx.near().dht().clearLocally(k);
cacheCtx.near().dht().clearLocally(keys, true);

GridCacheQueryManager<K, V> qryMgr = cacheCtx.queries();

Expand Down Expand Up @@ -289,19 +284,57 @@ private void onUndeploy0(final ClassLoader ldr, final GridCacheContext<K, V> cac
globalLdr = new CacheClassLoader();
}

/**
* @param ldr Class loader.
* @param keys Keys.
* @param cache Cache.
*/
private void addEntries(ClassLoader ldr, Collection<KeyCacheObject> keys, GridCacheAdapter cache) {
GridCacheContext cacheCtx = cache.context();

for (GridCacheEntryEx e : (Collection<GridCacheEntryEx>)cache.entries()) {
boolean undeploy = cacheCtx.isNear() ?
undeploy(ldr, e, cacheCtx.near()) || undeploy(ldr, e, cacheCtx.near().dht()) :
undeploy(ldr, e, cacheCtx.cache());

if (undeploy)
keys.add(e.key());
}
}

/**
* @param ldr Class loader.
* @param e Entry.
* @param cache Cache.
* @return {@code True} if need to undeploy.
*/
private boolean undeploy(ClassLoader ldr, Cache.Entry<K, V> e, GridCacheAdapter cache) {
if (e == null)
private boolean undeploy(ClassLoader ldr, GridCacheEntryEx e, GridCacheAdapter cache) {
KeyCacheObject key = e.key();

GridCacheEntryEx entry = cache.peekEx(key);

if (entry == null)
return false;

K key0 = e.getKey();
Object key0;
Object val0;

try {
CacheObject v = entry.peek(GridCachePeekMode.GLOBAL, CU.empty0());

V val0 = e.getValue();
key0 = key.value(cache.context().cacheObjectContext(), false);

assert key0 != null : "Key cannot be null for cache entry: " + e;

val0 = CU.value(v, cache.context(), false);
}
catch (GridCacheEntryRemovedException ignore) {
return false;
}
catch (IgniteException ignore) {
// Peek can throw runtime exception if unmarshalling failed.
return true;
}

ClassLoader keyLdr = U.detectObjectClassLoader(key0);
ClassLoader valLdr = U.detectObjectClassLoader(val0);
Expand Down
Expand Up @@ -257,7 +257,13 @@ protected final void marshalInfo(GridCacheEntryInfo info, GridCacheContext ctx)
if (info.key() != null)
prepareObject(info.key().value(ctx.cacheObjectContext(), false), ctx.shared());

prepareObject(CU.value(info.value(), ctx, false), ctx.shared());
CacheObject val = info.value();

if (val != null) {
val.finishUnmarshal(ctx.cacheObjectContext(), ctx.deploy().globalLoader());

prepareObject(CU.value(val, ctx, false), ctx.shared());
}
}
}
}
Expand Down
Expand Up @@ -18,7 +18,7 @@
package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.*;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.*;
import org.jetbrains.annotations.*;

import java.util.*;
Expand Down Expand Up @@ -60,8 +60,10 @@ public UserCacheObjectImpl() {
valBytes = ctx.processor().marshal(ctx, val);

if (ctx.unmarshalValues()) {
Object val = ctx.processor().unmarshal(ctx, valBytes,
IgniteUtils.detectClass(this.val).getClassLoader());
ClassLoader ldr = ctx.p2pEnabled() ?
IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader();

Object val = ctx.processor().unmarshal(ctx, valBytes, ldr);

return new CacheObjectImpl(val, valBytes);
}
Expand Down
Expand Up @@ -163,7 +163,7 @@ public IgniteCacheObjectProcessorImpl(GridKernalContext ctx) {
return new CacheObjectContext(ctx,
new GridCacheDefaultAffinityKeyMapper(),
ccfg != null && ccfg.isCopyOnGet(),
ccfg != null && ccfg.isQueryIndexEnabled());
ctx.config().isPeerClassLoadingEnabled() || (ccfg != null && ccfg.isQueryIndexEnabled()));
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -205,8 +205,8 @@ public void testDeployment3() throws Exception {
U.sleep(500);
}

assert g1.jcache(null).localSize() == 0;
assert g2.jcache(null).localSize() == 0;
assertEquals(0, g1.jcache(null).localSize());
assertEquals(0, g2.jcache(null).localSize());

startGrid(3);
}
Expand Down
Expand Up @@ -278,9 +278,8 @@ public static TestSuite suite() throws Exception {
suite.addTestSuite(GridCacheSyncReplicatedPreloadSelfTest.class);
// suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class); TODO: uncomment when fix GG-2239

// TODO IGNITE-51
// suite.addTestSuite(GridCacheDeploymentSelfTest.class);
// suite.addTestSuite(GridCacheDeploymentOffHeapSelfTest.class);
suite.addTestSuite(GridCacheDeploymentSelfTest.class);
suite.addTestSuite(GridCacheDeploymentOffHeapSelfTest.class);

suite.addTestSuite(GridCachePutArrayValueSelfTest.class);
suite.addTestSuite(GridCacheReplicatedUnswapAdvancedSelfTest.class);
Expand Down

0 comments on commit 1e4ce34

Please sign in to comment.