Skip to content

Commit

Permalink
#ignite-611: revert.
Browse files Browse the repository at this point in the history
  • Loading branch information
ivasilinets committed Mar 26, 2015
1 parent 1b02520 commit 30a6b12
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 18 deletions.
Expand Up @@ -464,6 +464,58 @@ public <K1, V1> GridCacheProjectionImpl<K1, V1> keepPortable0() {
plc); plc);
} }


/** {@inheritDoc} */
@SuppressWarnings({"unchecked", "RedundantCast"})
public <K1, V1> CacheProjection<K1, V1> projection(
Class<? super K1> keyType,
Class<? super V1> valType
) {
if (ctx.deploymentEnabled()) {
try {
ctx.deploy().registerClasses(keyType, valType);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}

GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>((CacheProjection<K1, V1>)this,
(GridCacheContext<K1, V1>)ctx,
CU.typeFilter0(keyType, valType),
/*flags*/null,
/*clientId*/null,
false,
null);

return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj);
}

/** {@inheritDoc} */
public CacheProjection<K, V> projection(CacheEntryPredicate filter) {
if (filter == null)
return this;

if (ctx.deploymentEnabled()) {
try {
ctx.deploy().registerClasses(filter);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}

GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(
this,
ctx,
filter,
null,
null,
false,
null);

return new GridCacheProxyImpl<>(ctx, prj, prj);
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public CacheConfiguration configuration() { @Override public CacheConfiguration configuration() {
return ctx.config(); return ctx.config();
Expand Down
Expand Up @@ -2189,8 +2189,9 @@ public GridCacheAdapter<Integer, String> marshallerCache() {
* @return Projection over utility cache. * @return Projection over utility cache.
*/ */
public <K extends GridCacheUtilityKey, V> GridCacheProjectionEx<K, V> utilityCache(Class<K> keyCls, Class<V> valCls) { public <K extends GridCacheUtilityKey, V> GridCacheProjectionEx<K, V> utilityCache(Class<K> keyCls, Class<V> valCls) {
GridCache<K, V> cache = cache(CU.UTILITY_CACHE_NAME); GridCacheAdapter<K, V> cache = internalCache(CU.UTILITY_CACHE_NAME);
return (GridCacheProjectionEx<K, V>)cache;
return (GridCacheProjectionEx<K, V>)cache.projection(keyCls, valCls);
} }


/** /**
Expand Down
Expand Up @@ -278,6 +278,65 @@ boolean isAll(K k, V v) {
return subjId; return subjId;
} }


/** {@inheritDoc} */
@SuppressWarnings( {"unchecked", "RedundantCast"})
public <K1, V1> CacheProjection<K1, V1> projection(
Class<? super K1> keyType,
Class<? super V1> valType
) {
A.notNull(keyType, "keyType", valType, "valType");

if (cctx.deploymentEnabled()) {
try {
cctx.deploy().registerClasses(keyType, valType);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}

GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>(
(CacheProjection<K1, V1>)this,
(GridCacheContext<K1, V1>)cctx,
CU.typeFilter0(keyType, valType),
flags,
subjId,
keepPortable,
expiryPlc);

return new GridCacheProxyImpl((GridCacheContext<K1, V1>)cctx, prj, prj);
}

/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
public CacheProjection<K, V> projection(CacheEntryPredicate filter) {
if (filter == null)
return new GridCacheProxyImpl<>(cctx, this, this);

if (this.filter != null)
filter = and(filter);

if (cctx.deploymentEnabled()) {
try {
cctx.deploy().registerClasses(filter);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}

GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
cctx,
filter,
flags,
subjId,
keepPortable,
expiryPlc);

return new GridCacheProxyImpl<>(cctx, prj, prj);
}


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public CacheProjection<K, V> flagsOn(@Nullable CacheFlag[] flags) { @Override public CacheProjection<K, V> flagsOn(@Nullable CacheFlag[] flags) {
if (F.isEmpty(flags)) if (F.isEmpty(flags))
Expand Down
Expand Up @@ -56,7 +56,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
private final ConcurrentMap<IgniteUuid, GridCacheQueueProxy> queuesMap; private final ConcurrentMap<IgniteUuid, GridCacheQueueProxy> queuesMap;


/** Queue header view. */ /** Queue header view. */
private GridCache<GridCacheQueueHeaderKey, GridCacheQueueHeader> queueHdrView; private CacheProjection<GridCacheQueueHeaderKey, GridCacheQueueHeader> queueHdrView;


/** Query notifying about queue update. */ /** Query notifying about queue update. */
private UUID queueQryId; private UUID queueQryId;
Expand Down Expand Up @@ -85,7 +85,7 @@ public CacheDataStructuresManager() {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override protected void onKernalStart0() throws IgniteCheckedException { @Override protected void onKernalStart0() throws IgniteCheckedException {
try { try {
queueHdrView = cctx.grid().cachex(cctx.name()); queueHdrView = cctx.cache().projection(GridCacheQueueHeaderKey.class, GridCacheQueueHeader.class);


initFlag = true; initFlag = true;
} }
Expand Down
Expand Up @@ -121,13 +121,13 @@ private CacheConfiguration cacheConfiguration(
public void testProjectionForDefaultCache() throws Exception { public void testProjectionForDefaultCache() throws Exception {
ClusterGroup prj = ignite.cluster().forCacheNodes(null); ClusterGroup prj = ignite.cluster().forCacheNodes(null);


assert prj != null; assertNotNull(prj);
assert prj.nodes().size() == 3; assertEquals(3, prj.nodes().size());
assert prj.nodes().contains(grid(0).localNode()); assertTrue(prj.nodes().contains(grid(0).localNode()));
assert !prj.nodes().contains(grid(1).localNode()); assertFalse(prj.nodes().contains(grid(1).localNode()));
assert prj.nodes().contains(grid(2).localNode()); assertTrue(prj.nodes().contains(grid(2).localNode()));
assert prj.nodes().contains(grid(3).localNode()); assertTrue(prj.nodes().contains(grid(3).localNode()));
assert !prj.nodes().contains(grid(4).localNode()); assertTrue(prj.nodes().contains(grid(4).localNode()));
} }


/** /**
Expand Down
Expand Up @@ -298,7 +298,7 @@ public void testNearDhtKeySize() throws Exception {


info("Generating keys for test..."); info("Generating keys for test...");


GridCache<String, Integer> cache0 = cache(0); GridCacheAdapter<String, Integer> cache0 = ((IgniteKernal)grid(0)).internalCache();


for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
while (true) { while (true) {
Expand All @@ -308,7 +308,7 @@ public void testNearDhtKeySize() throws Exception {
ignite(0).affinity(null).isBackup(grid(1).localNode(), key)) { ignite(0).affinity(null).isBackup(grid(1).localNode(), key)) {
keys.add(key); keys.add(key);


assertTrue(cache0.putx(key, i)); cache0.put(key, i);


break; break;
} }
Expand All @@ -317,21 +317,43 @@ public void testNearDhtKeySize() throws Exception {


info("Finished generating keys for test."); info("Finished generating keys for test.");


GridCache<String, Integer> cache2 = cache(2); GridCacheAdapter<String, Integer> cache2 = ((IgniteKernal)grid(2)).internalCache();


assertEquals(Integer.valueOf(0), cache2.get(keys.get(0))); assertEquals(Integer.valueOf(0), cache2.get(keys.get(0)));
assertEquals(Integer.valueOf(1), cache2.get(keys.get(1))); assertEquals(Integer.valueOf(1), cache2.get(keys.get(1)));


assertEquals(0, cache0.nearSize()); assertEquals(0, cache0.nearSize());
assertEquals(5, cache0.size() - cache0.nearSize()); assertEquals(5, cache0.size() - cache0.nearSize());


GridCache<String, Integer> cache1 = cache(1); GridCacheAdapter<String, Integer> cache1 = ((IgniteKernal)grid(1)).internalCache();


assertEquals(0, cache1.nearSize()); assertEquals(0, cache1.nearSize());
assertEquals(5, cache1.size() - cache1.nearSize()); assertEquals(5, cache1.size() - cache1.nearSize());


assertEquals(nearEnabled() ? 2 : 0, cache2.nearSize()); assertEquals(nearEnabled() ? 2 : 0, cache2.nearSize());
assertEquals(0, cache2.size() - cache2.nearSize()); assertEquals(0, cache2.size() - cache2.nearSize());

CacheEntryPredicate prjFilter = new CacheEntryPredicateAdapter() {
@Override public boolean apply(GridCacheEntryEx e) {
try {
Integer val = CU.value(e.rawGetOrUnmarshal(false), e.context(), false);

return val != null && val >= 1 && val <= 3;
}
catch (IgniteCheckedException err) {
throw new IgniteException(err);
}
}
};

assertEquals(0, cache0.projection(prjFilter).nearSize());
assertEquals(3, cache0.projection(prjFilter).size() - cache0.projection(prjFilter).nearSize());

assertEquals(0, cache1.projection(prjFilter).nearSize());
assertEquals(3, cache1.projection(prjFilter).size() - cache1.projection(prjFilter).nearSize());

assertEquals(nearEnabled() ? 1 : 0, cache2.projection(prjFilter).nearSize());
assertEquals(0, cache2.projection(prjFilter).size() - cache2.projection(prjFilter).nearSize());
} }


/** /**
Expand Down
Expand Up @@ -57,7 +57,7 @@ public class HadoopJobTracker extends HadoopComponent {
private final GridMutex mux = new GridMutex(); private final GridMutex mux = new GridMutex();


/** */ /** */
private volatile GridCacheAdapter<HadoopJobId, HadoopJobMetadata> jobMetaPrj; private volatile GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> jobMetaPrj;


/** Projection with expiry policy for finished job updates. */ /** Projection with expiry policy for finished job updates. */
private volatile GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> finishedJobMetaPrj; private volatile GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> finishedJobMetaPrj;
Expand Down Expand Up @@ -108,7 +108,7 @@ public class HadoopJobTracker extends HadoopComponent {
*/ */
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
private GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> jobMetaCache() { private GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> jobMetaCache() {
GridCacheAdapter<HadoopJobId, HadoopJobMetadata> prj = jobMetaPrj; GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> prj = jobMetaPrj;


if (prj == null) { if (prj == null) {
synchronized (mux) { synchronized (mux) {
Expand All @@ -129,7 +129,8 @@ private GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> jobMetaCache() {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }


jobMetaPrj = prj = sysCache; jobMetaPrj = prj = (GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata>)
sysCache.projection(HadoopJobId.class, HadoopJobMetadata.class);


if (ctx.configuration().getFinishedJobInfoTtl() > 0) { if (ctx.configuration().getFinishedJobInfoTtl() > 0) {
ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy( ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(
Expand Down

0 comments on commit 30a6b12

Please sign in to comment.