Skip to content

Commit

Permalink
IGNITE-136 Clear local store for entry from swap.
Browse files Browse the repository at this point in the history
  • Loading branch information
niktikhonov committed Feb 25, 2015
1 parent 6ba6090 commit f33c074
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 35 deletions.
Expand Up @@ -488,6 +488,8 @@ private void clearSwap() {
try {
GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry<V>>> it = cctx.swap().iterator(id, false);

boolean isLocStore = cctx.store().isLocalStore();

if (it != null) {
// We can safely remove these values because no entries will be created for evicted partition.
while (it.hasNext()) {
Expand All @@ -498,6 +500,9 @@ private void clearSwap() {
K key = cctx.marshaller().unmarshal(keyBytes, cctx.deploy().globalLoader());

cctx.swap().remove(key, keyBytes);

if (isLocStore)
cctx.store().removeFromStore(null, key);
}
}
}
Expand Down Expand Up @@ -531,7 +536,7 @@ private void clearAll() {

boolean rec = cctx.events().isRecordable(EVT_CACHE_PRELOAD_OBJECT_UNLOADED);

boolean locStore = cctx.store().isLocalStore();
boolean isLocStore = cctx.store().isLocalStore();

for (Iterator<GridDhtCacheEntry<K, V>> it = map.values().iterator(); it.hasNext();) {
GridDhtCacheEntry<K, V> cached = it.next();
Expand All @@ -540,7 +545,7 @@ private void clearAll() {
if (cached.clearInternal(clearVer, swap)) {
it.remove();

if (locStore)
if (isLocStore)
cctx.store().removeFromStore(null, cached.key());

if (!cached.isInternal()) {
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.processors.cache;

import com.google.common.collect.*;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cache.store.*;
Expand All @@ -37,6 +38,7 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import static org.apache.ignite.cache.CacheMemoryMode.*;
import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.cache.CachePreloadMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
Expand Down Expand Up @@ -100,13 +102,16 @@ public GridCacheAbstractLocalStoreSelfTest() {
LOCAL_STORE_3.clear();
}

/**
*
*/
private CacheConfiguration cache(String gridName, String cacheName, int backups) {
CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setName(cacheName);
cacheCfg.setCacheMode(getCacheMode());
cacheCfg.setAtomicityMode(getAtomicMode());
cacheCfg.setDistributionMode(getDisrtMode());
cacheCfg.setDistributionMode(getDistributionMode());
cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
cacheCfg.setPreloadMode(SYNC);

Expand All @@ -120,14 +125,19 @@ else if (gridName.endsWith("2"))
cacheCfg.setWriteThrough(true);
cacheCfg.setReadThrough(true);
cacheCfg.setBackups(backups);
cacheCfg.setOffHeapMaxMemory(0);
cacheCfg.setSwapEnabled(true);

if (isOffHeapTiredMode())
cacheCfg.setMemoryMode(OFFHEAP_TIERED);

return cacheCfg;
}

/**
* @return Distribution mode.
*/
protected abstract CacheDistributionMode getDisrtMode();
protected abstract CacheDistributionMode getDistributionMode();

/**
* @return Cache atomicity mode.
Expand All @@ -139,6 +149,13 @@ else if (gridName.endsWith("2"))
*/
protected abstract CacheMode getCacheMode();

/**
* @return Cache memory mode.
*/
protected boolean isOffHeapTiredMode() {
return false;
}

/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
stopAllGrids();
Expand All @@ -152,57 +169,47 @@ public void testPrimaryNode() throws Exception {

IgniteCache<Object, Object> cache = ignite1.jcache(null);

// Populate cache and check that local store has all value.
for (int i = 0; i < KEYS; i++)
cache.put(i, i);

for (int i = 0; i < KEYS; i++)
assertEquals(LOCAL_STORE_1.load(i).get1().intValue(), i);
checkLocalStore(ignite1, LOCAL_STORE_1);

final CountDownLatch startPartExchange = new CountDownLatch(1);
final AtomicBoolean eventOcr = new AtomicBoolean(true);
final CountDownLatch partExchanged = new CountDownLatch(1);

final int[] leftPartition = new int[1];

if (getCacheMode() != REPLICATED) {
ignite1.events().localListen(new IgnitePredicate<Event>() {
private AtomicInteger eventCnt = new AtomicInteger(0);

@Override public boolean apply(Event event) {
startPartExchange.countDown();

eventOcr.set(true);

if (leftPartition[0] - eventCnt.incrementAndGet() == 0)
partExchanged.countDown();

return true;
}
}, EventType.EVT_CACHE_PRELOAD_OBJECT_UNLOADED);
}, EventType.EVT_CACHE_PRELOAD_PART_UNLOADED);
}

Ignite ignite2 = startGrid(2);

// Partition count which must be transferred to 2'nd node.
leftPartition[0] = ignite2.affinity(null).allPartitions(ignite2.cluster().localNode()).length;

assertEquals(Ignition.allGrids().size(), 2);

// Wait when partition unloaded.
waitExpirePartition(startPartExchange, eventOcr);
if (getCacheMode() != REPLICATED)
assert partExchanged.await(2, TimeUnit.SECONDS);

checkLocalStore(ignite1, LOCAL_STORE_1);
checkLocalStore(ignite2, LOCAL_STORE_2);
}

/**
* Wait when partition unloaded.
* @throws Exception If failed.
*/
private void waitExpirePartition(CountDownLatch startPartExchange, AtomicBoolean eventOcr) throws Exception {
if (getCacheMode() != REPLICATED) {
assert startPartExchange.await(2, TimeUnit.SECONDS);

while (true) {
if (eventOcr.get()) {
eventOcr.set(false);

TimeUnit.MILLISECONDS.sleep(100);
}
else
break;
}
}
}

public void testBackupNode() throws Exception {
Ignite ignite1 = startGrid(1);

Expand Down Expand Up @@ -240,6 +247,59 @@ public void testBackupNode() throws Exception {
checkLocalStoreForBackup(ignite2, LOCAL_STORE_2);
}

/**
* @throws Exception If failed.
*/
public void testSwap() throws Exception {
Ignite ignite1 = startGrid(1);

IgniteCache<Object, Object> cache = ignite1.jcache(null);

// Populate cache and check that local store has all value.
for (int i = 0; i < KEYS; i++)
cache.put(i, i);

checkLocalStore(ignite1, LOCAL_STORE_1);

// Push entry to swap.
for (int i = 0; i < KEYS; i++)
cache.localEvict(Lists.newArrayList(i));

for (int i = 0; i < KEYS; i++)
assertNull(cache.localPeek(i, CachePeekMode.ONHEAP));

final CountDownLatch partExchanged = new CountDownLatch(1);

final int[] leftPartition = new int[1];

if (getCacheMode() != REPLICATED) {
ignite1.events().localListen(new IgnitePredicate<Event>() {
private AtomicInteger eventCnt = new AtomicInteger(0);

@Override public boolean apply(Event event) {
if (leftPartition[0] - eventCnt.incrementAndGet() == 0)
partExchanged.countDown();

return true;
}
}, EventType.EVT_CACHE_PRELOAD_PART_UNLOADED);
}

Ignite ignite2 = startGrid(2);

// Partition count which must be transferred to 2'nd node.
leftPartition[0] = ignite2.affinity(null).allPartitions(ignite2.cluster().localNode()).length;

assertEquals(Ignition.allGrids().size(), 2);

// Wait when partition unloaded.
if (getCacheMode() != REPLICATED)
assert partExchanged.await(2, TimeUnit.SECONDS);

checkLocalStore(ignite1, LOCAL_STORE_1);
checkLocalStore(ignite2, LOCAL_STORE_2);
}

/**
* Check that local stores contains only primary entry.
*/
Expand Down
Expand Up @@ -35,7 +35,7 @@ public GridCachePartitionedLocalStoreSelfTest() {
}

/** {@inheritDoc} */
@Override protected CacheDistributionMode getDisrtMode() {
@Override protected CacheDistributionMode getDistributionMode() {
return PARTITIONED_ONLY;
}

Expand Down
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.cache.*;

import static org.apache.ignite.cache.CacheAtomicityMode.*;
import static org.apache.ignite.cache.CacheDistributionMode.*;
import static org.apache.ignite.cache.CacheMode.*;

/**
*
*/
public class GridCachePartitionedOffHeapLocalStoreSelfTest extends GridCacheAbstractLocalStoreSelfTest {
/**
*
*/
public GridCachePartitionedOffHeapLocalStoreSelfTest() {
super();
}

/** {@inheritDoc} */
@Override protected CacheDistributionMode getDistributionMode() {
return PARTITIONED_ONLY;
}

/** {@inheritDoc} */
@Override protected CacheAtomicityMode getAtomicMode() {
return ATOMIC;
}

/** {@inheritDoc} */
@Override protected CacheMode getCacheMode() {
return PARTITIONED;
}

/** {@inheritDoc} */
@Override protected boolean isOffHeapTiredMode() {
return true;
}
}
Expand Up @@ -35,7 +35,7 @@ public GridCacheReplicatedLocalStoreSelfTest() {
}

/** {@inheritDoc} */
@Override protected CacheDistributionMode getDisrtMode() {
@Override protected CacheDistributionMode getDistributionMode() {
return PARTITIONED_ONLY;
}

Expand Down
Expand Up @@ -35,7 +35,7 @@ public GridCacheTxPartitionedLocalStoreSelfTest() {
}

/** {@inheritDoc} */
@Override protected CacheDistributionMode getDisrtMode() {
@Override protected CacheDistributionMode getDistributionMode() {
return PARTITIONED_ONLY;
}

Expand Down
Expand Up @@ -140,7 +140,8 @@ public static TestSuite suite() throws Exception {
suite.addTestSuite(GridCacheGlobalLoadTest.class);
suite.addTestSuite(GridCachePartitionedLocalStoreSelfTest.class);
suite.addTestSuite(GridCacheReplicatedLocalStoreSelfTest.class);
//suite.addTestSuite(GridCacheTxPartitionedLocalStoreSelfTest.class); TODO GG-9762
suite.addTestSuite(GridCachePartitionedOffHeapLocalStoreSelfTest.class);
suite.addTestSuite(GridCacheTxPartitionedLocalStoreSelfTest.class);

// Heuristic exception handling. TODO IGNITE-257
// suite.addTestSuite(GridCacheColocatedTxExceptionSelfTest.class);
Expand Down

0 comments on commit f33c074

Please sign in to comment.