Skip to content

Commit

Permalink
# Bug fix: GridCacheProcessor.publicCache() doesn't return dynamic cr…
Browse files Browse the repository at this point in the history
…eated caches. Also GridCacheProcessor was simplified.
  • Loading branch information
sevdokimov-gg committed Mar 12, 2015
1 parent 63817ab commit f08510a
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 72 deletions.
Expand Up @@ -84,14 +84,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** */
private final Map<String, GridCacheAdapter<?, ?>> caches;

/** Map of proxies. */
private final Map<String, GridCache<?, ?>> proxies;

/** Map of proxies. */
private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies;

/** Map of public proxies, i.e. proxies which could be returned to the user. */
private final Map<String, GridCache<?, ?>> publicProxies;
private volatile List<GridCache<?, ?>> publicProxies;

/** Map of preload finish futures grouped by preload order. */
private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts;
Expand Down Expand Up @@ -127,8 +124,6 @@ public GridCacheProcessor(GridKernalContext ctx) {
super(ctx);

caches = new LinkedHashMap<>();
proxies = new HashMap<>();
publicProxies = new HashMap<>();
jCacheProxies = new HashMap<>();
preloadFuts = new TreeMap<>();

Expand Down Expand Up @@ -639,8 +634,6 @@ private void cleanup(CacheConfiguration cfg, @Nullable Object rsrc, boolean near
for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
GridCacheAdapter cache = e.getValue();

proxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null));

jCacheProxies.put(e.getKey(), new IgniteCacheProxy(cache.context(), cache, null, false));
}

Expand Down Expand Up @@ -696,8 +689,6 @@ private void cleanup(CacheConfiguration cfg, @Nullable Object rsrc, boolean near

startCache(cache);

proxies.put(name, new GridCacheProxyImpl(ctx, cache, null));

jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false));
}
}
Expand All @@ -708,14 +699,6 @@ private void cleanup(CacheConfiguration cfg, @Nullable Object rsrc, boolean near
}
});

// Internal caches which should not be returned to user.
for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
GridCacheAdapter cache = e.getValue();

if (!sysCaches.contains(e.getKey()))
publicProxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null));
}

// Must call onKernalStart on shared managers after creation of fetched caches.
for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
mgr.onKernalStart();
Expand Down Expand Up @@ -769,9 +752,8 @@ private void cleanup(CacheConfiguration cfg, @Nullable Object rsrc, boolean near
if (ctx.config().isDaemon())
return;

for (String cacheName : stopSeq) {
for (String cacheName : stopSeq)
stopCache(caches.get(cacheName), cancel);
}

List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers();

Expand Down Expand Up @@ -1991,14 +1973,20 @@ public <K, V> GridCache<K, V> cache(@Nullable String name) {
if (log.isDebugEnabled())
log.debug("Getting cache for name: " + name);

return (GridCache<K, V>)proxies.get(name);
IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name);

return jcache == null ? null : jcache.legacyProxy();
}

/**
* @return All configured cache instances.
*/
public Collection<GridCache<?, ?>> caches() {
return proxies.values();
return F.viewReadOnly(jCacheProxies.values(), new IgniteClosure<IgniteCacheProxy<?, ?>, GridCache<?, ?>>() {
@Override public GridCache<?, ?> apply(IgniteCacheProxy<?, ?> entries) {
return entries.legacyProxy();
}
});
}

/**
Expand Down Expand Up @@ -2060,12 +2048,12 @@ public <K, V> GridCache<K, V> publicCache(@Nullable String name) {
if (sysCaches.contains(name))
throw new IllegalStateException("Failed to get cache because it is system cache: " + name);

GridCache<K, V> cache = (GridCache<K, V>)publicProxies.get(name);
IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name);

if (cache == null)
if (jcache == null)
throw new IllegalArgumentException("Cache is not configured: " + name);

return cache;
return jcache.legacyProxy();
}

/**
Expand Down Expand Up @@ -2129,7 +2117,20 @@ public <K, V> IgniteCacheProxy<K, V> jcache(@Nullable String name) {
* @return All configured public cache instances.
*/
public Collection<GridCache<?, ?>> publicCaches() {
return publicProxies.values();
List<GridCache<?, ?>> res = publicProxies;

if (res == null) {
res = new ArrayList<>(jCacheProxies.size());

for (IgniteCacheProxy<?, ?> proxy : jCacheProxies.values()) {
if (!sysCaches.contains(proxy.getName()))
res.add(proxy.legacyProxy());
}

publicProxies = res;
}

return res;
}

/**
Expand Down
Expand Up @@ -63,6 +63,10 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** Projection. */
private GridCacheProjectionImpl<K, V> prj;

/** */
@GridToStringExclude
private GridCacheProxyImpl<K, V> legacyProxy;

/**
* Empty constructor required for {@link Externalizable}.
*/
Expand Down Expand Up @@ -92,6 +96,8 @@ public IgniteCacheProxy(
this.prj = prj;

gate = ctx.gate();

legacyProxy = new GridCacheProxyImpl<K, V>(ctx, delegate, prj);
}

/**
Expand Down Expand Up @@ -258,8 +264,8 @@ public GridCacheGateway<K, V> gate() {
}
}

private IgniteBiPredicate<K,V> acceptAll() {
return new IgniteBiPredicate<K,V>() {
private IgniteBiPredicate<K, V> acceptAll() {
return new IgniteBiPredicate<K, V>() {
@Override public boolean apply(K k, V v) {
return true;
}
Expand All @@ -272,12 +278,12 @@ private IgniteBiPredicate<K,V> acceptAll() {
* @return Cursor.
*/
@SuppressWarnings("unchecked")
private QueryCursor<Entry<K,V>> doQuery(Query filter, @Nullable ClusterGroup grp) {
final CacheQuery<Map.Entry<K,V>> qry;
final CacheQueryFuture<Map.Entry<K,V>> fut;
private QueryCursor<Entry<K, V>> doQuery(Query filter, @Nullable ClusterGroup grp) {
final CacheQuery<Map.Entry<K, V>> qry;
final CacheQueryFuture<Map.Entry<K, V>> fut;

if (filter instanceof ScanQuery) {
IgniteBiPredicate<K,V> p = ((ScanQuery)filter).getFilter();
IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();

qry = delegate.queries().createScanQuery(p != null ? p : acceptAll());

Expand Down Expand Up @@ -323,8 +329,8 @@ else if (filter instanceof SqlQuery) {
else
throw new IgniteException("Unsupported query predicate: " + filter);

return new QueryCursorImpl<>(new ClIter<Map.Entry<K,V>,Cache.Entry<K,V>>(fut) {
@Override protected Cache.Entry<K,V> convert(Map.Entry<K,V> e) {
return new QueryCursorImpl<>(new ClIter<Map.Entry<K, V>, Cache.Entry<K, V>>(fut) {
@Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
return new CacheEntryImpl<>(e.getKey(), e.getValue());
}
});
Expand All @@ -337,7 +343,7 @@ else if (filter instanceof SqlQuery) {
* @param loc Local flag.
* @return Initial iteration cursor.
*/
private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V> qry, boolean loc) {
private QueryCursor<Entry<K, V>> queryContinuous(ContinuousQuery<K, V> qry, boolean loc) {
if (qry.getInitialQuery() instanceof ContinuousQuery)
throw new IgniteException("Initial predicate for continuous query can't be an instance of another " +
"continuous query. Use SCAN or SQL query for initial iteration.");
Expand Down Expand Up @@ -398,7 +404,7 @@ private ClusterGroup projection(boolean loc) {

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public QueryCursor<Entry<K,V>> query(Query qry) {
@Override public QueryCursor<Entry<K, V>> query(Query qry) {
A.notNull(qry, "qry");

GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
Expand Down Expand Up @@ -431,7 +437,7 @@ private ClusterGroup projection(boolean loc) {
try {
validate(qry);

CacheQuery<List<?>> q = ((GridCacheQueriesEx<K,V>)delegate.queries()).createSqlFieldsQuery(qry.getSql(), false);
CacheQuery<List<?>> q = ((GridCacheQueriesEx<K, V>)delegate.queries()).createSqlFieldsQuery(qry.getSql(), false);

if (qry.getPageSize() > 0)
q.pageSize(qry.getPageSize());
Expand Down Expand Up @@ -459,8 +465,8 @@ private ClusterGroup projection(boolean loc) {
* @param p Query.
* @return Cursor.
*/
private QueryCursor<Entry<K,V>> doLocalQuery(SqlQuery p) {
return new QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal(
private QueryCursor<Entry<K, V>> doLocalQuery(SqlQuery p) {
return new QueryCursorImpl<>(ctx.kernalContext().query().<K, V>queryLocal(
ctx.name(), p.getType(), p.getSql(), p.getArgs()));
}

Expand All @@ -486,7 +492,7 @@ private void validate(Query qry) {

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public QueryCursor<Entry<K,V>> localQuery(Query qry) {
@Override public QueryCursor<Entry<K, V>> localQuery(Query qry) {
A.notNull(qry, "qry");

GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
Expand All @@ -498,7 +504,7 @@ private void validate(Query qry) {
return doLocalQuery((SqlQuery)qry);

if (qry instanceof ContinuousQuery)
return queryContinuous((ContinuousQuery<K,V>)qry, true);
return queryContinuous((ContinuousQuery<K, V>)qry, true);

return doQuery(qry, projection(true));
}
Expand Down Expand Up @@ -1144,8 +1150,8 @@ public Set<Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {

/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
Object... args) {
EntryProcessor<K, V, T> entryProcessor,
Object... args) {
try {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);

Expand All @@ -1169,8 +1175,8 @@ public Set<Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {

/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
IgniteEntryProcessor<K, V, T> entryProcessor,
Object... args) {
IgniteEntryProcessor<K, V, T> entryProcessor,
Object... args) {
try {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);

Expand Down Expand Up @@ -1317,39 +1323,25 @@ else if (clazz.isAssignableFrom(IgniteEx.class))
}

/**
* Creates projection that will operate with portable objects.
* <p>
* Projection returned by this method will force cache not to deserialize portable objects,
* so keys and values will be returned from cache API methods without changes. Therefore,
* signature of the projection can contain only following types:
* <ul>
* <li>{@code PortableObject} for portable classes</li>
* <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li>
* <li>Arrays of primitives (byte[], int[], ...)</li>
* <li>{@link String} and array of {@link String}s</li>
* <li>{@link UUID} and array of {@link UUID}s</li>
* <li>{@link Date} and array of {@link Date}s</li>
* <li>{@link java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li>
* <li>Enums and array of enums</li>
* <li>
* Maps, collections and array of objects (but objects inside
* them will still be converted if they are portable)
* </li>
* </ul>
* <p>
* For example, if you use {@link Integer} as a key and {@code Value} class as a value
* (which will be stored in portable format), you should acquire following projection
* to avoid deserialization:
* Creates projection that will operate with portable objects. <p> Projection returned by this method will force
* cache not to deserialize portable objects, so keys and values will be returned from cache API methods without
* changes. Therefore, signature of the projection can contain only following types: <ul> <li>{@code PortableObject}
* for portable classes</li> <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li>
* <li>Arrays of primitives (byte[], int[], ...)</li> <li>{@link String} and array of {@link String}s</li>
* <li>{@link UUID} and array of {@link UUID}s</li> <li>{@link Date} and array of {@link Date}s</li> <li>{@link
* java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li> <li>Enums and array of enums</li> <li> Maps,
* collections and array of objects (but objects inside them will still be converted if they are portable) </li>
* </ul> <p> For example, if you use {@link Integer} as a key and {@code Value} class as a value (which will be
* stored in portable format), you should acquire following projection to avoid deserialization:
* <pre>
* CacheProjection<Integer, GridPortableObject> prj = cache.keepPortable();
*
* // Value is not deserialized and returned in portable format.
* GridPortableObject po = prj.get(1);
* </pre>
* <p>
* Note that this method makes sense only if cache is working in portable mode
* ({@code CacheConfiguration#isPortableEnabled()} returns {@code true}. If not,
* this method is no-op and will return current projection.
* <p> Note that this method makes sense only if cache is working in portable mode ({@code
* CacheConfiguration#isPortableEnabled()} returns {@code true}. If not, this method is no-op and will return
* current projection.
*
* @return Projection for portable objects.
*/
Expand Down Expand Up @@ -1433,6 +1425,14 @@ private <R> void setFuture(IgniteInternalFuture<R> fut) {
curFut.set(new IgniteFutureImpl<>(fut));
}

/**
* @return Legacy proxy.
*/
@NotNull
public GridCacheProxyImpl<K, V> legacyProxy() {
return legacyProxy;
}

/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(ctx);
Expand All @@ -1452,6 +1452,8 @@ private <R> void setFuture(IgniteInternalFuture<R> fut) {
prj = (GridCacheProjectionImpl<K, V>)in.readObject();

gate = ctx.gate();

legacyProxy = new GridCacheProxyImpl<K, V>(ctx, delegate, prj);
}

/** {@inheritDoc} */
Expand Down

0 comments on commit f08510a

Please sign in to comment.