Skip to content

Commit

Permalink
Ignite-54-55 Implemented basic removeAll()
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Vinogradov committed Jan 25, 2015
1 parent 8970463 commit bcff8d8
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 38 deletions.
Expand Up @@ -1048,6 +1048,23 @@ public IgniteFuture<?> putAllAsync(@Nullable Map<? extends K, ? extends V> m,
*/
public Set<K> keySet();

/**
* Set of keys cached on this node. You can remove elements from this set, but you cannot add elements
* to this set. All removal operation will be reflected on the cache itself.
* <p>
* Iterator over this set will not fail if set was concurrently updated
* by another thread. This means that iterator may or may not return latest
* keys depending on whether they were added before or after current
* iterator position.
* <p>
* NOTE: this operation is not distributed and returns only the keys cached on this node.
*
* @param filter Optional filter to check prior to getting key form cache. Note
* that filter is checked atomically together with get operation.
* @return Key set for this cache projection.
*/
public Set<K> keySet(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter);

/**
* Set of keys for which this node is primary.
* This set is dynamic and may change with grid topology changes.
Expand Down
Expand Up @@ -78,6 +78,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
/** */
private static final long serialVersionUID = 0L;

/** removeAll() batch size. */
private static final long REMOVE_ALL_BATCH_SIZE = 100L;

/** clearAll() split threshold. */
public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;

Expand Down Expand Up @@ -3151,22 +3154,38 @@ public String toString() {

/** {@inheritDoc} */
@Override public void removeAll(IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException {
ctx.denyOnLocalRead();
try {
if (F.isEmptyOrNulls(filter))
filter = ctx.trueArray();

if (F.isEmptyOrNulls(filter))
filter = ctx.trueArray();
long topVer;

final IgnitePredicate<CacheEntry<K, V>>[] p = filter;
do {
topVer = ctx.affinity().affinityTopologyVersion();

syncOp(new SyncInOp(false) {
@Override public void inOp(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException {
tx.removeAllAsync(ctx, keySet(p), null, false, null).get();
}
// Send job to all nodes.
Collection<ClusterNode> nodes = ctx.grid().forCache(name()).nodes();

@Override public String toString() {
return "removeAll [filter=" + Arrays.toString(p) + ']';
}
});
IgniteFuture<Object> fut = null;

if (!nodes.isEmpty())
fut = ctx.closures().callAsyncNoFailover(BROADCAST, new GlobalRemoveAllCallable<>(name(), topVer, REMOVE_ALL_BATCH_SIZE, filter), nodes, true);

if (fut != null)
fut.get();

} while (ctx.affinity().affinityTopologyVersion() > topVer);
}
catch (ClusterGroupEmptyException ignore) {
if (log.isDebugEnabled())
log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]");
}
catch (ComputeTaskTimeoutException e) {
U.warn(log, "Timed out waiting for remote nodes to finish cache remove (consider increasing " +
"'networkTimeout' configuration property) [cacheName=" + name() + "]");

throw e;
}
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -4951,6 +4970,97 @@ protected AsyncInOp(Collection<? extends K> keys) {
public abstract IgniteFuture<?> inOp(IgniteTxLocalAdapter<K, V> tx);
}

/**
* Internal callable which performs remove all primary key mappings
* operation on a cache with the given name.
*/
@GridInternal
private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable {
/** */
private static final long serialVersionUID = 0L;

/** Cache name. */
private String cacheName;

/** Topology version. */
private long topVer;

/** Remove batch size. */
private long rmvBatchSz;

/** Filters. */
private IgnitePredicate<CacheEntry<K, V>>[] filter;

/** Injected grid instance. */
@IgniteInstanceResource
private Ignite ignite;

/**
* Empty constructor for serialization.
*/
public GlobalRemoveAllCallable() {
// No-op.
}

/**
* @param cacheName Cache name.
* @param topVer Topology version.
* @param rmvBatchSz Remove batch size.
* @param filter Filter.
*/
private GlobalRemoveAllCallable(String cacheName, long topVer, long rmvBatchSz, IgnitePredicate<CacheEntry<K, V>> ... filter) {
this.cacheName = cacheName;
this.topVer = topVer;
this.rmvBatchSz = rmvBatchSz;
this.filter = filter;
}

/**
* {@inheritDoc}
*/
@Override public Object call() throws Exception {
Set<K> keys = new HashSet<>();

final GridKernal grid = (GridKernal) ignite;

final GridCache<K,V> cache = grid.cachex(cacheName);

final GridCacheContext<K, V> ctx = grid.context().cache().<K, V>internalCache(cacheName).context();

assert cache != null;

for (K k : cache.keySet(filter)) {
if (ctx.affinity().primary(ctx.localNode(), k, topVer))
keys.add(k);
if (keys.size() >= rmvBatchSz) {
cache.removeAll(keys);

keys.clear();
}
}
cache.removeAll(keys);

return null;
}

/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, cacheName);
out.writeLong(topVer);
out.writeLong(rmvBatchSz);
out.writeObject(filter);

}

/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
cacheName = U.readString(in);
topVer = in.readLong();
rmvBatchSz = in.readLong();
filter = (IgnitePredicate<CacheEntry<K, V>>[]) in.readObject();
}
}

/**
* Internal callable which performs {@link org.apache.ignite.cache.CacheProjection#clearAll()}
* operation on a cache with the given name.
Expand Down
Expand Up @@ -914,6 +914,11 @@ private IgnitePredicate<CacheEntry<K, V>> visitor(final IgnitePredicate<CacheEnt
return cache.keySet(entryFilter(true));
}

/** {@inheritDoc} */
@Override public Set<K> keySet(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
return cache.keySet(filter);
}

/** {@inheritDoc} */
@Override public Set<K> primaryKeySet() {
return cache.primaryKeySet(entryFilter(true));
Expand Down
Expand Up @@ -992,6 +992,18 @@ public GridCacheProjectionImpl<K, V> gateProjection() {
}
}

/** {@inheritDoc} */
@Override public Set<K> keySet(@Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);

try {
return delegate.keySet(filter);
}
finally {
gate.leave(prev);
}
}

/** {@inheritDoc} */
@Override public Set<K> primaryKeySet() {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
Expand Down
Expand Up @@ -620,25 +620,6 @@ public void removeAll(IgnitePredicate<CacheEntry<K, V>>... filter) {
}
}

/**
* @param keys Keys to remove.
*/
public void removeAll(Collection<? extends K> keys) {
try {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);

try {
delegate.removeAll(keys);
}
finally {
gate.leave(prev);
}
}
catch (IgniteCheckedException e) {
throw cacheException(e);
}
}

/** {@inheritDoc} */
@Override public void removeAll() {
// TODO IGNITE-1.
Expand Down
Expand Up @@ -513,7 +513,8 @@ public void near(GridNearAtomicCache<K, V> near) {

/** {@inheritDoc} */
@Override public void removeAll(IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException {
removeAllAsync(filter).get();
super.removeAll(filter); // TODO: IGNITE-?? Fix asynс cleanup
//removeAllAsync(filter).get();
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -100,7 +100,7 @@ public static <K, V> IgniteDataLoadCacheUpdater<K, V> groupLocked() {
* @param putMap Entries to put.
* @throws IgniteCheckedException If failed.
*/
protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Collection<K> rmvCol,
protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
Map<K, V> putMap) throws IgniteCheckedException {
assert rmvCol != null || putMap != null;

Expand Down Expand Up @@ -154,7 +154,7 @@ private static class Batched<K, V> implements IgniteDataLoadCacheUpdater<K, V> {
assert !F.isEmpty(entries);

Map<K, V> putAll = null;
Collection<K> rmvAll = null;
Set<K> rmvAll = null;

for (Map.Entry<K, V> entry : entries) {
K key = entry.getKey();
Expand All @@ -165,7 +165,7 @@ private static class Batched<K, V> implements IgniteDataLoadCacheUpdater<K, V> {

if (val == null) {
if (rmvAll == null)
rmvAll = new ArrayList<>();
rmvAll = new HashSet<>();

rmvAll.add(key);
}
Expand Down Expand Up @@ -195,7 +195,7 @@ private static class BatchedSorted<K, V> implements IgniteDataLoadCacheUpdater<K
assert !F.isEmpty(entries);

Map<K, V> putAll = null;
Collection<K> rmvAll = null;
Set<K> rmvAll = null;

for (Map.Entry<K, V> entry : entries) {
K key = entry.getKey();
Expand Down Expand Up @@ -240,7 +240,7 @@ private static class GroupLocked<K, V> implements IgniteDataLoadCacheUpdater<K,
Map<Integer, Integer> partsCounts = new HashMap<>();

// Group by partition ID.
Map<Integer, Collection<K>> rmvPartMap = null;
Map<Integer, Set<K>> rmvPartMap = null;
Map<Integer, Map<K, V>> putPartMap = null;

Ignite ignite = cache.unwrap(Ignite.class);
Expand All @@ -264,7 +264,7 @@ private static class GroupLocked<K, V> implements IgniteDataLoadCacheUpdater<K,
if (rmvPartMap == null)
rmvPartMap = new HashMap<>();

F.addIfAbsent(rmvPartMap, part, F.<K>newList()).add(key);
F.addIfAbsent(rmvPartMap, part, F.<K>newSet()).add(key);
}
else {
if (putPartMap == null)
Expand Down
Expand Up @@ -2486,6 +2486,25 @@ public void testRemoveAll() throws Exception {
cache().removeAll();

assert cache().isEmpty();
long entryCount = hugeRemoveAllEntryCount();

for (int i = 0; i < entryCount; i++)
cache().put(String.valueOf(i), i);

for (int i = 0; i < entryCount; i++)
assertEquals(Integer.valueOf(i), cache().get(String.valueOf(i)));

cache().removeAll();

for (int i = 0; i < entryCount; i++)
assertNull(cache().get(String.valueOf(i)));
}

/**
* Provides count on entities to be removed in removeAll() test
*/
protected long hugeRemoveAllEntryCount(){
return 1000L;
}

/**
Expand Down

0 comments on commit bcff8d8

Please sign in to comment.