Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Jan 26, 2015
1 parent 7faa36f commit e3598a9
Show file tree
Hide file tree
Showing 8 changed files with 443 additions and 108 deletions.
79 changes: 79 additions & 0 deletions modules/core/src/main/java/org/apache/ignite/IgniteCache.java
Expand Up @@ -101,6 +101,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* {@link CacheStore#loadCache(IgniteBiInClosure, Object...)} method.
* @throws CacheException If loading failed.
*/
@IgniteAsyncSupported
public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException;

/**
Expand All @@ -124,6 +125,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* {@link CacheStore#loadCache(IgniteBiInClosure, Object...)} method.
* @throws CacheException If loading failed.
*/
@IgniteAsyncSupported
public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException;

/**
Expand Down Expand Up @@ -155,6 +157,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* @throws CacheException If put operation failed.
* @throws org.apache.ignite.internal.processors.cache.CacheFlagException If projection flags validation failed.
*/
@IgniteAsyncSupported
@Nullable public V getAndPutIfAbsent(K key, V val) throws CacheException;

/**
Expand Down Expand Up @@ -295,6 +298,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* @param peekModes Optional peek modes. If not provided, then total cache size is returned.
* @return Cache size across all nodes.
*/
@IgniteAsyncSupported
public int size(CachePeekMode... peekModes) throws CacheException;

/**
Expand All @@ -313,6 +317,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* will be returned for {@link EntryProcessor}s that return a
* <code>null</code> value for a key.
*/
@IgniteAsyncSupported
<T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args);

/**
Expand Down Expand Up @@ -353,4 +358,78 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* @return Projection for portable objects.
*/
public <K1, V1> IgniteCache<K1, V1> keepPortable();

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public V get(K key);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public Map<K, V> getAll(Set<? extends K> keys);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public boolean containsKey(K key);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public void put(K key, V val);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public V getAndPut(K key, V val);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public void putAll(Map<? extends K, ? extends V> map);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public boolean putIfAbsent(K key, V val);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public boolean remove(K key);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public boolean remove(K key, V oldVal);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public V getAndRemove(K key);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public boolean replace(K key, V oldVal, V newVal);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public boolean replace(K key, V val);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public V getAndReplace(K key, V val);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public void removeAll(Set<? extends K> keys);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public void removeAll();

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public void clear();

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments);

/** {@inheritDoc} */
@IgniteAsyncSupported
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
Object... args);
}
Expand Up @@ -343,6 +343,12 @@ public interface CacheProjection<K, V> extends Iterable<CacheEntry<K, V>> {
*/
public boolean containsKey(K key);

/**
* @param key Key.
* @return Future.
*/
public IgniteFuture<Boolean> containsKeyAsync(K key);

/**
* Returns {@code true} if this cache contains given value.
*
Expand Down
Expand Up @@ -638,6 +638,20 @@ public void onKernalStop() {
return containsKey(key, null);
}

/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> containsKeyAsync(K key) {
return containsKeyAsync(key, null);
}

/**
* @param key Key.
* @param filter Filter.
* @return Future.
*/
public IgniteFuture<Boolean> containsKeyAsync(K key, @Nullable IgnitePredicate<CacheEntry<K, V>> filter) {
return new GridFinishedFuture<>(ctx.kernalContext(), containsKey(key, filter));
}

/** {@inheritDoc} */
@Override public boolean containsValue(V val) {
return containsValue(val, null);
Expand Down Expand Up @@ -3575,6 +3589,37 @@ public void localLoad(Collection<? extends K> keys) throws IgniteCheckedExceptio
}
}

/**
* @param p Predicate.
* @param args Arguments.
* @throws IgniteCheckedException If failed.
*/
void globalLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException {
ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name());

IgniteCompute comp = ctx.kernalContext().grid().compute(nodes).withNoFailover();

comp.broadcast(new LoadCacheClosure<>(ctx.name(), p, args));
}

/**
* @param p Predicate.
* @param args Arguments.
* @throws IgniteCheckedException If failed.
*/
IgniteFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
throws IgniteCheckedException {
ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name());

IgniteCompute comp = ctx.kernalContext().grid().compute(nodes).withNoFailover();

comp = comp.enableAsync();

comp.broadcast(new LoadCacheClosure<>(ctx.name(), p, args));

return comp.future();
}

/** {@inheritDoc} */
@Nullable @Override public CacheEntry<K, V> randomEntry() {
GridCacheMapEntry<K, V> e = map.randomEntry();
Expand Down Expand Up @@ -5036,7 +5081,7 @@ private GlobalSizeCallable(String cacheName, boolean primaryOnly) {

/** {@inheritDoc} */
@Override public Integer apply(Object o) {
GridCache<Object, Object> cache = ((GridEx) ignite).cachex(cacheName);
GridCache<Object, Object> cache = ((GridEx)ignite).cachex(cacheName);

return primaryOnly ? cache.primarySize() : cache.size();
}
Expand Down Expand Up @@ -5367,4 +5412,74 @@ void onDone() {
ldr.addData(col);
}
}

/**
*
*/
private static class LoadCacheClosure<K, V> implements Callable<Void>, Externalizable {
/** */
private static final long serialVersionUID = 0L;

/** */
private String cacheName;

/** */
private IgniteBiPredicate<K, V> p;

/** */
private Object[] args;

/** */
@IgniteInstanceResource
private Ignite ignite;

/**
* Required by {@link Externalizable}.
*/
public LoadCacheClosure() {
// No-op.
}

/**
* @param cacheName Cache name.
* @param p Predicate.
* @param args Arguments.
*/
private LoadCacheClosure(String cacheName, IgniteBiPredicate<K, V> p, Object[] args) {
this.cacheName = cacheName;
this.p = p;
this.args = args;
}

/** {@inheritDoc} */
@Override public Void call() throws Exception {
IgniteCache<K, V> cache = ignite.jcache(cacheName);

assert cache != null : cacheName;

cache.localLoadCache(p, args);

return null;
}

/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(p);

out.writeObject(args);
}

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
p = (IgniteBiPredicate<K, V>)in.readObject();

args = (Object[])in.readObject();
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(LoadCacheClosure.class, this);
}
}
}
Expand Up @@ -612,6 +612,11 @@ private IgnitePredicate<CacheEntry<K, V>> visitor(final IgnitePredicate<CacheEnt
return cache.containsKey(key, entryFilter(true));
}

/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> containsKeyAsync(K key) {
return cache.containsKeyAsync(key, entryFilter(false));
}

/** {@inheritDoc} */
@Override public boolean containsValue(V val) {
return cache.containsValue(val, entryFilter(true));
Expand Down
Expand Up @@ -339,6 +339,18 @@ public GridCacheProjectionImpl<K, V> gateProjection() {
}
}

/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> containsKeyAsync(K key) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);

try {
return delegate.containsKeyAsync(key);
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */
@Override public boolean containsValue(V val) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
Expand Down

0 comments on commit e3598a9

Please sign in to comment.