Skip to content

Commit

Permalink
# IGNITE-54-55 Minor changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
sevdokimov-gg committed Feb 3, 2015
1 parent f6fa027 commit 4ebc46f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 13 deletions.
Expand Up @@ -3400,14 +3400,10 @@ public String toString() {
// Send job to all nodes. // Send job to all nodes.
Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).nodes(); Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).nodes();


IgniteInternalFuture<Object> fut = null; if (!nodes.isEmpty()) {

ctx.closures().callAsyncNoFailover(BROADCAST,
if (!nodes.isEmpty()) new GlobalRemoveAllCallable<>(name(), topVer, REMOVE_ALL_BATCH_SIZE), nodes, true).get();
fut = ctx.closures().callAsyncNoFailover(BROADCAST, new GlobalRemoveAllCallable<>(name(), topVer, REMOVE_ALL_BATCH_SIZE), nodes, true); }

if (fut != null)
fut.get();

} while (ctx.affinity().affinityTopologyVersion() > topVer); } while (ctx.affinity().affinityTopologyVersion() > topVer);
} }
catch (ClusterGroupEmptyException ignore) { catch (ClusterGroupEmptyException ignore) {
Expand Down Expand Up @@ -5236,7 +5232,6 @@ public GlobalRemoveAllCallable() {
* @param cacheName Cache name. * @param cacheName Cache name.
* @param topVer Topology version. * @param topVer Topology version.
* @param rmvBatchSz Remove batch size. * @param rmvBatchSz Remove batch size.
* @param filter Filter.
*/ */
private GlobalRemoveAllCallable(String cacheName, long topVer, long rmvBatchSz) { private GlobalRemoveAllCallable(String cacheName, long topVer, long rmvBatchSz) {
this.cacheName = cacheName; this.cacheName = cacheName;
Expand All @@ -5248,25 +5243,30 @@ private GlobalRemoveAllCallable(String cacheName, long topVer, long rmvBatchSz)
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override public Object call() throws Exception { @Override public Object call() throws Exception {
Set<K> keys = new HashSet<>(); Collection<K> keys = new ArrayList<>();


final IgniteKernal grid = (IgniteKernal) ignite; final IgniteKernal grid = (IgniteKernal) ignite;


final GridCache<K,V> cache = grid.cachex(cacheName); final GridCache<K,V> cache = grid.cachex(cacheName);


final GridCacheContext<K, V> ctx = grid.context().cache().<K, V>internalCache(cacheName).context(); final GridCacheContext<K, V> ctx = grid.context().cache().<K, V>internalCache(cacheName).context();


if (ctx.affinity().affinityTopologyVersion() != topVer)
return null; // Ignore this remove request because remove request will be sent again.

assert cache != null; assert cache != null;


for (K k : cache.keySet()) { for (K k : cache.keySet()) {
if (ctx.affinity().primary(ctx.localNode(), k, topVer)) if (ctx.affinity().primary(ctx.localNode(), k, topVer))
keys.add(k); keys.add(k);

if (keys.size() >= rmvBatchSz) { if (keys.size() >= rmvBatchSz) {
cache.removeAll(keys); cache.removeAll(keys);


keys.clear(); keys.clear();
} }
} }

cache.removeAll(keys); cache.removeAll(keys);


return null; return null;
Expand Down
Expand Up @@ -124,9 +124,8 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
// removeAll() removes mapping only when it presents at a primary node. // removeAll() removes mapping only when it presents at a primary node.
// To remove all mappings used force remove by key. // To remove all mappings used force remove by key.
if (cache.size() > 0) { if (cache.size() > 0) {
for (Object k : cache.keySet()) { for (Object k : cache.keySet())
cache.remove(k); cache.remove(k);
}
} }


if (offheapTiered(cache)) { if (offheapTiered(cache)) {
Expand Down
Expand Up @@ -101,8 +101,11 @@ public class GridCachePartitionedMultiThreadedPutGetSelfTest extends GridCommonA
@Override protected void afterTest() throws Exception { @Override protected void afterTest() throws Exception {
super.afterTest(); super.afterTest();


if (GRID_CNT > 0)
grid(0).cache(null).removeAll();

for (int i = 0; i < GRID_CNT; i++) { for (int i = 0; i < GRID_CNT; i++) {
grid(i).cache(null).removeAll(); grid(0).cache(null).clearLocally();


assert grid(i).cache(null).isEmpty(); assert grid(i).cache(null).isEmpty();
} }
Expand Down

0 comments on commit 4ebc46f

Please sign in to comment.