From c60eed556f7421ca36660845453c4734aebc5125 Mon Sep 17 00:00:00 2001 From: Alexander Paschenko Date: Thu, 23 Jun 2016 21:09:19 +0300 Subject: [PATCH 01/10] IGNITE-1088 H2 based cache store for multi JVM tests --- .../GridCacheAbstractFullApiSelfTest.java | 144 ++-- .../GridCacheAbstractMetricsSelfTest.java | 2 +- .../cache/GridCacheAbstractSelfTest.java | 624 +++++++++++++++--- .../GridCacheInterceptorAbstractSelfTest.java | 2 +- ...CacheNearOnlyMultiNodeFullApiSelfTest.java | 2 +- ...OffHeapTieredMultiNodeFullApiSelfTest.java | 2 +- 6 files changed, 626 insertions(+), 150 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 59d3170aae8f5..b973125abc77e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -61,6 +61,7 @@ import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -222,9 +223,11 @@ protected CacheMemoryMode memoryMode() { /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { + initStoreStrategy(); if (cacheStartType() == CacheStartMode.STATIC) super.beforeTestsStarted(); else { + initStoreStrategy(); cacheCfgMap = Collections.synchronizedMap(new HashMap()); if (cacheStartType() == CacheStartMode.NODES_THEN_CACHES) { @@ -272,12 +275,12 @@ protected CacheMemoryMode memoryMode() { * @throws Exception if failed. */ public void testWriteThroughTx() { - if(isMultiJvm()) - fail("https://issues.apache.org/jira/browse/IGNITE-1088"); + /*if(isMultiJvm()) + fail("https://issues.apache.org/jira/browse/IGNITE-1088");*/ String key = "writeThroughKey"; - map.remove(key); + storeStrategy.removeFromStore(key); try (final Transaction transaction = grid(0).transactions().txStart()) { IgniteCache cache = jcache(0); @@ -294,7 +297,7 @@ public void testWriteThroughTx() { transaction.commit(); } - assertEquals(2, map.get(key)); + assertEquals(2, storeStrategy.getFromStore(key)); } /** @@ -310,11 +313,11 @@ public void testNoReadThroughTx() { IgniteCache cache = jcache(0); - resetStore(); + storeStrategy.resetStore(); cache.put(key, 1); - putToStore(key, 2); + storeStrategy.putToStore(key, 2); try (final Transaction transaction = grid(0).transactions().txStart()) { Integer old = cache.get(key); @@ -328,7 +331,7 @@ public void testNoReadThroughTx() { transaction.commit(); } - assertEquals(0, reads.get()); + assertEquals(0, storeStrategy.getReads()); } /** {@inheritDoc} */ @@ -564,9 +567,6 @@ public void testRemoveInExplicitLocks() throws Exception { * @throws Exception If failed. */ public void testRemoveAllSkipStore() throws Exception { - if (isMultiJvm()) - fail("https://issues.apache.org/jira/browse/IGNITE-1088"); - IgniteCache jcache = jcache(); jcache.putAll(F.asMap("1", 1, "2", 2, "3", 3)); @@ -2068,7 +2068,7 @@ public void testGetAndPutIfAbsent() throws Exception { // Check db. if (!isMultiJvm()) { - putToStore("key3", 3); + storeStrategy.putToStore("key3", 3); assertEquals((Integer)3, cache.getAndPutIfAbsent("key3", 4)); @@ -2140,7 +2140,7 @@ public void testGetAndPutIfAbsentAsync() throws Exception { // Check db. if (!isMultiJvm()) { - putToStore("key3", 3); + storeStrategy.putToStore("key3", 3); cacheAsync.getAndPutIfAbsent("key3", 4); @@ -2189,7 +2189,7 @@ public void testPutIfAbsent() throws Exception { // Check db. if (!isMultiJvm()) { - putToStore("key3", 3); + storeStrategy.putToStore("key3", 3); assertFalse(cache.putIfAbsent("key3", 4)); } @@ -2262,7 +2262,7 @@ private void checkPutxIfAbsentAsync(boolean inTx) throws Exception { // Check db. if (!isMultiJvm()) { - putToStore("key3", 3); + storeStrategy.putToStore("key3", 3); cacheAsync.putIfAbsent("key3", 4); @@ -2366,7 +2366,7 @@ public void testGetAndReplace() throws Exception { assert cache.get("key") == 4; if (!isMultiJvm()) { - putToStore("key2", 5); + storeStrategy.putToStore("key2", 5); info("key2 5 -> 6"); @@ -2425,7 +2425,7 @@ public void testReplace() throws Exception { assert cache.get("key") == 4; if (!isMultiJvm()) { - putToStore("key2", 5); + storeStrategy.putToStore("key2", 5); assert cache.replace("key2", 6); @@ -2501,7 +2501,7 @@ public void testGetAndReplaceAsync() throws Exception { assert cache.get("key") == 4; if (!isMultiJvm()) { - putToStore("key2", 5); + storeStrategy.putToStore("key2", 5); cacheAsync.replace("key2", 5, 6); @@ -2563,7 +2563,7 @@ public void testReplacexAsync() throws Exception { assert cache.get("key") == 4; if (!isMultiJvm()) { - putToStore("key2", 5); + storeStrategy.putToStore("key2", 5); cacheAsync.replace("key2", 6); @@ -2701,7 +2701,7 @@ public void testRemoveLoad() throws Exception { jcache().removeAll(keys); for (String key : keys) - putToStore(key, Integer.parseInt(key)); + storeStrategy.putToStore(key, Integer.parseInt(key)); for (int g = 0; g < gridCount(); g++) grid(g).cache(null).localLoadCache(null); @@ -3764,7 +3764,7 @@ private void checkTtl(boolean inTx, boolean oldEntry) throws Exception { } // Avoid reloading from store. - map.remove(key); + storeStrategy.removeFromStore(key); assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() { @SuppressWarnings("unchecked") @@ -4813,7 +4813,7 @@ public void testWithSkipStore() throws Exception { List keys = primaryKeysForCache(cache, 10); for (int i = 0; i < keys.size(); ++i) - putToStore(keys.get(i), i); + storeStrategy.putToStore(keys.get(i), i); assertFalse(cacheSkipStore.iterator().hasNext()); @@ -4851,7 +4851,7 @@ public void testWithSkipStore() throws Exception { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertEquals(i, map.get(key)); + assertEquals(i, storeStrategy.getFromStore(key)); } for (int i = 0; i < keys.size(); ++i) { @@ -4860,13 +4860,13 @@ public void testWithSkipStore() throws Exception { Integer val1 = -1; cacheSkipStore.put(key, val1); - assertEquals(i, map.get(key)); + assertEquals(i, storeStrategy.getFromStore(key)); assertEquals(val1, cacheSkipStore.get(key)); Integer val2 = -2; assertEquals(val1, cacheSkipStore.invoke(key, new SetValueProcessor(val2))); - assertEquals(i, map.get(key)); + assertEquals(i, storeStrategy.getFromStore(key)); assertEquals(val2, cacheSkipStore.get(key)); } @@ -4875,7 +4875,7 @@ public void testWithSkipStore() throws Exception { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStrategy.isInStore(key)); } for (String key : keys) { @@ -4883,37 +4883,37 @@ public void testWithSkipStore() throws Exception { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStrategy.isInStore(key)); - map.put(key, 0); + storeStrategy.putToStore(key, 0); Integer val = -1; assertNull(cacheSkipStore.invoke(key, new SetValueProcessor(val))); - assertEquals(0, map.get(key)); + assertEquals(0, storeStrategy.getFromStore(key)); assertEquals(val, cacheSkipStore.get(key)); cache.remove(key); - map.put(key, 0); + storeStrategy.putToStore(key, 0); assertTrue(cacheSkipStore.putIfAbsent(key, val)); assertEquals(val, cacheSkipStore.get(key)); - assertEquals(0, map.get(key)); + assertEquals(0, storeStrategy.getFromStore(key)); cache.remove(key); - map.put(key, 0); + storeStrategy.putToStore(key, 0); assertNull(cacheSkipStore.getAndPut(key, val)); assertEquals(val, cacheSkipStore.get(key)); - assertEquals(0, map.get(key)); + assertEquals(0, storeStrategy.getFromStore(key)); cache.remove(key); } assertFalse(cacheSkipStore.iterator().hasNext()); - assertTrue(map.size() == 0); + assertTrue(storeStrategy.getStoreSize() == 0); assertTrue(cache.size(ALL) == 0); // putAll/removeAll from multiple nodes. @@ -4928,7 +4928,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStrategy.isInStore(key)); } cache.putAll(data); @@ -4936,7 +4936,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStrategy.isInStore(key)); } cacheSkipStore.removeAll(data.keySet()); @@ -4944,7 +4944,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStrategy.isInStore(key)); } cacheSkipStore.putAll(data); @@ -4952,7 +4952,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStrategy.isInStore(key)); } cacheSkipStore.removeAll(data.keySet()); @@ -4960,7 +4960,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStrategy.isInStore(key)); } cache.removeAll(data.keySet()); @@ -4968,24 +4968,24 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStrategy.isInStore(key)); } - assertTrue(map.size() == 0); + assertTrue(storeStrategy.getStoreSize() == 0); // Miscellaneous checks. String newKey = "New key"; - assertFalse(map.containsKey(newKey)); + assertFalse(storeStrategy.isInStore(newKey)); cacheSkipStore.put(newKey, 1); - assertFalse(map.containsKey(newKey)); + assertFalse(storeStrategy.isInStore(newKey)); cache.put(newKey, 1); - assertTrue(map.containsKey(newKey)); + assertTrue(storeStrategy.isInStore(newKey)); Iterator> it = cacheSkipStore.iterator(); @@ -4995,20 +4995,20 @@ public void testWithSkipStore() throws Exception { String rmvKey = entry.getKey(); - assertTrue(map.containsKey(rmvKey)); + assertTrue(storeStrategy.isInStore(rmvKey)); it.remove(); assertNull(cacheSkipStore.get(rmvKey)); - assertTrue(map.containsKey(rmvKey)); + assertTrue(storeStrategy.isInStore(rmvKey)); assertTrue(cache.size(ALL) == 0); assertTrue(cacheSkipStore.size(ALL) == 0); cache.remove(rmvKey); - assertTrue(map.size() == 0); + assertTrue(storeStrategy.getStoreSize() == 0); } /** @@ -5035,7 +5035,7 @@ public void testWithSkipStoreRemoveAll() throws Exception { for (String key : data.keySet()) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStrategy.isInStore(key)); } cacheSkipStore.removeAll(); @@ -5043,7 +5043,7 @@ public void testWithSkipStoreRemoveAll() throws Exception { for (String key : data.keySet()) { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStrategy.isInStore(key)); } cache.removeAll(); @@ -5051,7 +5051,7 @@ public void testWithSkipStoreRemoveAll() throws Exception { for (String key : data.keySet()) { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStrategy.isInStore(key)); } } @@ -5127,7 +5127,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStrategy.isInStore(key)); } tx.commit(); @@ -5136,10 +5136,10 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStrategy.isInStore(key)); } - assertEquals(0, map.size()); + assertEquals(0, storeStrategy.getStoreSize()); // cacheSkipStore putAll(..)/removeAll(..) check. try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { @@ -5153,10 +5153,10 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStrategy.isInStore(key)); } - map.putAll(data); + storeStrategy.putAllToStore(data); try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { cacheSkipStore.removeAll(data.keySet()); @@ -5167,12 +5167,12 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStrategy.isInStore(key)); cache.remove(key); } - assertTrue(map.size() == 0); + assertTrue(storeStrategy.getStoreSize() == 0); // cache putAll(..)/removeAll(..) check. try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { @@ -5181,7 +5181,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStrategy.isInStore(key)); } cache.removeAll(data.keySet()); @@ -5189,13 +5189,13 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStrategy.isInStore(key)); } tx.commit(); } - assertTrue(map.size() == 0); + assertTrue(storeStrategy.getStoreSize() == 0); // putAll(..) from both cacheSkipStore and cache. try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { @@ -5216,7 +5216,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStrategy.isInStore(key)); } tx.commit(); @@ -5227,7 +5227,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStrategy.isInStore(key)); } for (int i = keys.size() / 2; i < keys.size(); i++) { @@ -5235,7 +5235,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStrategy.isInStore(key)); } cache.removeAll(data.keySet()); @@ -5243,16 +5243,16 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStrategy.isInStore(key)); } // Check that read-through is disabled when cacheSkipStore is used. for (int i = 0; i < keys.size(); i++) - putToStore(keys.get(i), i); + storeStrategy.putToStore(keys.get(i), i); assertTrue(cacheSkipStore.size(ALL) == 0); assertTrue(cache.size(ALL) == 0); - assertTrue(map.size() != 0); + assertTrue(storeStrategy.getStoreSize() != 0); try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { assertTrue(cacheSkipStore.getAll(data.keySet()).size() == 0); @@ -5275,7 +5275,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { for (String key : data.keySet()) { - map.put(key, 0); + storeStrategy.putToStore(key, 0); assertNull(cacheSkipStore.invoke(key, new SetValueProcessor(val))); } @@ -5284,7 +5284,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, } for (String key : data.keySet()) { - assertEquals(0, map.get(key)); + assertEquals(0, storeStrategy.getFromStore(key)); assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); @@ -5294,7 +5294,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { for (String key : data.keySet()) { - map.put(key, 0); + storeStrategy.putToStore(key, 0); assertTrue(cacheSkipStore.putIfAbsent(key, val)); } @@ -5303,7 +5303,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, } for (String key : data.keySet()) { - assertEquals(0, map.get(key)); + assertEquals(0, storeStrategy.getFromStore(key)); assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); @@ -5313,7 +5313,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { for (String key : data.keySet()) { - map.put(key, 0); + storeStrategy.putToStore(key, 0); assertNull(cacheSkipStore.getAndPut(key, val)); } @@ -5322,7 +5322,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, } for (String key : data.keySet()) { - assertEquals(0, map.get(key)); + assertEquals(0, storeStrategy.getFromStore(key)); assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); @@ -5342,7 +5342,7 @@ private void checkEmpty(IgniteCache cache, IgniteCache map = new ConcurrentHashMap8<>(); - - /** Reads counter. */ - protected static final AtomicInteger reads = new AtomicInteger(); - - /** Writes counter. */ - protected static final AtomicInteger writes = new AtomicInteger(); - - /** Removes counter. */ - protected static final AtomicInteger removes = new AtomicInteger(); - /** VM ip finder for TCP discovery. */ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + protected static TestCacheStoreStrategy storeStrategy; + /** * @return Grids count to start. */ @@ -97,6 +110,8 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { assert cnt >= 1 : "At least one grid must be started"; + initStoreStrategy(); + startGrids(cnt); awaitPartitionMapExchange(); @@ -106,7 +121,13 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { @Override protected void afterTestsStopped() throws Exception { stopAllGrids(); - map.clear(); + storeStrategy.resetStore(); + } + + /** Initialize {@link #storeStrategy} with respect to the nature of the test */ + void initStoreStrategy() throws IgniteCheckedException { + if (storeStrategy == null) + storeStrategy = isMultiJvm() ? new H2CacheStoreStrategy() : new MapCacheStoreStrategy(); } /** {@inheritDoc} */ @@ -188,28 +209,7 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { assert jcache().unwrap(Ignite.class).transactions().tx() == null; assertEquals("Cache is not empty", 0, jcache().localSize(CachePeekMode.ALL)); - resetStore(); - } - - /** - * Cleans up cache store. - */ - protected void resetStore() { - map.clear(); - - reads.set(0); - writes.set(0); - removes.set(0); - } - - /** - * Put entry to cache store. - * - * @param key Key. - * @param val Value. - */ - protected void putToStore(Object key, Object val) { - map.put(key, val); + storeStrategy.resetStore(); } /** {@inheritDoc} */ @@ -241,13 +241,15 @@ protected void putToStore(Object key, Object val) { protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { CacheConfiguration cfg = defaultCacheConfiguration(); - CacheStore store = cacheStore(); + Factory> storeFactory = storeStrategy.getStoreFactory(); + CacheStore store = storeFactory.create(); if (store != null) { - cfg.setCacheStoreFactory(new TestStoreFactory()); + cfg.setCacheStoreFactory(storeFactory); cfg.setReadThrough(true); cfg.setWriteThrough(true); cfg.setLoadPreviousValue(true); + storeStrategy.updateCacheConfiguration(cfg); } cfg.setSwapEnabled(swapEnabled()); @@ -302,37 +304,6 @@ protected CacheWriteSynchronizationMode writeSynchronization() { return FULL_SYNC; } - /** - * @return Write through storage emulator. - */ - protected static CacheStore cacheStore() { - return new CacheStoreAdapter() { - @Override public void loadCache(IgniteBiInClosure clo, - Object... args) { - for (Map.Entry e : map.entrySet()) - clo.apply(e.getKey(), e.getValue()); - } - - @Override public Object load(Object key) { - reads.incrementAndGet(); - - return map.get(key); - } - - @Override public void write(javax.cache.Cache.Entry e) { - writes.incrementAndGet(); - - map.put(e.getKey(), e.getValue()); - } - - @Override public void delete(Object key) { - removes.incrementAndGet(); - - map.remove(key); - } - }; - } - /** * @return {@code true} if swap should be enabled. */ @@ -575,12 +546,517 @@ public SumReducer() { } } - /** - * Serializable factory. - */ - protected static class TestStoreFactory implements Factory { - @Override public CacheStore create() { - return cacheStore(); + /** Interface for cache store backend manipulation and stats routines */ + protected interface TestCacheStoreStrategy { + + /** */ + void afterTestsStopped(); + + /** */ + int getReads(); + + /** */ + int getWrites(); + + /** */ + int getRemoves(); + + /** */ + int getStoreSize(); + + /** */ + void resetStore(); + + /** + * Put entry to cache store + * + * @param key Key. + * @param val Value. + */ + void putToStore(Object key, Object val); + + /** */ + void putAllToStore(Map data); + + /** */ + Object getFromStore(Object key); + + /** */ + void removeFromStore(Object key); + + /** */ + boolean isInStore(Object key); + + /** */ + void updateCacheConfiguration(CacheConfiguration configuration); + + /** + * @return Factory for write-through storage emulator + */ + Factory> getStoreFactory(); + } + + /** {@link TestCacheStoreStrategy} implemented as a wrapper around {@link #map} */ + protected static class MapCacheStoreStrategy implements TestCacheStoreStrategy { + + /** Removes counter. */ + private final static AtomicInteger removes = new AtomicInteger(); + + /** Writes counter. */ + private final static AtomicInteger writes = new AtomicInteger(); + + /** Reads counter. */ + private final static AtomicInteger reads = new AtomicInteger(); + + /** Store map. */ + private final static Map map = new ConcurrentHashMap8<>(); + + /** {@inheritDoc} */ + @Override public void afterTestsStopped() { + resetStore(); + } + + /** {@inheritDoc} */ + @Override public int getReads() { + return reads.get(); + } + + /** {@inheritDoc} */ + @Override public int getWrites() { + return writes.get(); + } + + /** {@inheritDoc} */ + @Override public int getRemoves() { + return removes.get(); + } + + /** {@inheritDoc} */ + @Override public int getStoreSize() { + return map.size(); + } + + /** {@inheritDoc} */ + @Override public void resetStore() { + map.clear(); + + reads.set(0); + writes.set(0); + removes.set(0); + } + + /** {@inheritDoc} */ + @Override public void putToStore(Object key, Object val) { + map.put(key, val); + } + + /** {@inheritDoc} */ + @Override public void putAllToStore(Map data) { + map.putAll(data); + } + + /** {@inheritDoc} */ + @Override public Object getFromStore(Object key) { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public void removeFromStore(Object key) { + map.remove(key); + } + + /** {@inheritDoc} */ + @Override public boolean isInStore(Object key) { + return map.containsKey(key); + } + + @Override + public void updateCacheConfiguration(CacheConfiguration configuration) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Factory> getStoreFactory() { + return FactoryBuilder.factoryOf(MapCacheStore.class); + } + + /** Serializable {@link #map} backed cache store factory */ + private static class MapStoreFactory implements Factory> { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new MapCacheStore(); + } + } + + /** {@link CacheStore} backed by {@link #map} */ + public static class MapCacheStore extends CacheStoreAdapter { + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure clo, Object... args) { + for (Map.Entry e : map.entrySet()) + clo.apply(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) { + reads.incrementAndGet(); + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry e) { + writes.incrementAndGet(); + map.put(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + removes.incrementAndGet(); + map.remove(key); + } + } + } + + protected static class H2CacheStoreStrategy implements TestCacheStoreStrategy { + private final JdbcConnectionPool dataSrc; + + /** Create table script. */ + private static final String CREATE_CACHE_TABLE = + "create table if not exists CACHE(k binary not null, v binary not null, PRIMARY KEY(k));"; + + private static final String CREATE_STATS_TABLE = + "create table if not exists STATS(id bigint not null, reads int not null, writes int not null, " + + "removes int not null, PRIMARY KEY(id));"; + + private static final String POPULATE_STATS_TABLE = + "delete from STATS;\n" + + "insert into STATS(id, reads, writes, removes) values(1, 0, 0, 0);"; + + /** */ + public H2CacheStoreStrategy() throws IgniteCheckedException { + try { + Server.createTcpServer("-tcpDaemon").start(); + dataSrc = createDataSource(); + + try (Connection conn = connection()) { + RunScript.execute(conn, new StringReader(CREATE_CACHE_TABLE)); + RunScript.execute(conn, new StringReader(CREATE_STATS_TABLE)); + RunScript.execute(conn, new StringReader(POPULATE_STATS_TABLE)); + } + } + catch (SQLException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void afterTestsStopped() { + + } + + /** {@inheritDoc} */ + @Override public int getReads() { + return querySingleInt("select reads from STATS limit 0,1;", "Failed to query number of reads from STATS table"); + } + + /** {@inheritDoc} */ + @Override public int getWrites() { + return querySingleInt("select writes from STATS limit 0,1;", "Failed to query number of writes from STATS table"); + } + + /** {@inheritDoc} */ + @Override public int getRemoves() { + return querySingleInt("select removes from STATS limit 0,1;", "Failed to query number of removals from STATS table"); + } + + /** {@inheritDoc} */ + @Override public int getStoreSize() { + return querySingleInt("select count(*) from CACHE;", "Failed to query number of rows from CACHE table"); + } + + /** {@inheritDoc} */ + @Override public void resetStore() { + try (Connection conn = connection()) { + RunScript.execute(conn, new StringReader("delete from CACHE;")); + RunScript.execute(conn, new StringReader(POPULATE_STATS_TABLE)); + } + catch (SQLException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void putToStore(Object key, Object val) { + try { + H2CacheStore.putToDb(connection(), key, val); + } + catch (SQLException e) { + e.printStackTrace(); + } + } + + /** {@inheritDoc} */ + @Override public void putAllToStore(Map data) { + Connection conn = null; + PreparedStatement stmt = null; + try { + conn = connection(); + stmt = conn.prepareStatement(H2CacheStore.INSERT); + for (Map.Entry e : data.entrySet()) { + byte[] v = H2CacheStore.serialize(e.getValue()); + stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(e.getKey()))); + stmt.setBinaryStream(2, new ByteArrayInputStream(v)); + stmt.setBinaryStream(3, new ByteArrayInputStream(v)); + stmt.addBatch(); + } + stmt.executeBatch(); + } + catch (SQLException e) { + throw new IgniteException(e); + } + finally { + H2CacheStore.end(stmt, conn); + } + } + + /** {@inheritDoc} */ + @Override public Object getFromStore(Object key) { + try { + return H2CacheStore.getFromDb(connection(), key); + } + catch (SQLException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void removeFromStore(Object key) { + try { + H2CacheStore.removeFromDb(connection(), key); + } + catch (SQLException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isInStore(Object key) { + return getFromStore(key) != null; + } + + private Connection connection() throws SQLException { + return dataSrc.getConnection(); + } + + private int querySingleInt(String query, String errorMsg) { + Connection conn = null; + PreparedStatement stmt = null; + try { + conn = connection(); + stmt = conn.prepareStatement(query); + ResultSet rs = stmt.executeQuery(); + if (rs.next()) + return rs.getInt(1); + else + throw new IgniteException(errorMsg); + } + catch (SQLException e) { + throw new IgniteException(e); + } + finally { + H2CacheStore.end(stmt, conn); + } + } + + /** */ + private static JdbcConnectionPool createDataSource() { + return JdbcConnectionPool.create("jdbc:h2:tcp://localhost/mem:TestDb;mode=MySQL", "sa", ""); + } + + /** {@inheritDoc} */ + @Override public void updateCacheConfiguration(CacheConfiguration configuration) { + configuration.setCacheStoreSessionListenerFactories(new H2CacheStoreSessionListenerFactory()); + } + + /** {@inheritDoc} */ + @Override public Factory> getStoreFactory() { + return new H2StoreFactory(); + } + + } + + /** Serializable H2 backed cache store factory */ + private static class H2StoreFactory implements Factory> { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new H2CacheStore(); + } + } + + private static class H2CacheStoreSessionListenerFactory implements Factory { + @Override public CacheStoreSessionListener create() { + CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener(); + lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:tcp://localhost/mem:TestDb;mode=MySQL;DB_CLOSE_DELAY=-1", "sa", "")); + return lsnr; + } + } + + + public static class H2CacheStore extends CacheStoreAdapter { + /** Store session */ + @CacheStoreSessionResource + private CacheStoreSession ses; + + /** Template for an insert statement */ + private static final String INSERT = + "insert into CACHE(k, v) values(?, ?) on duplicate key update v = ?;"; + + /** {@inheritDoc} */ + @Override public Object load(Object key) throws CacheLoaderException { + try { + Object res = getFromDb(ses.attachment(), key); + updateStats("reads"); + return res; + } + catch (SQLException e) { + throw new CacheLoaderException("Failed to load object [key=" + key + ']', e); + } + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) throws CacheWriterException { + try { + putToDb(ses.attachment(), entry.getKey(), entry.getValue()); + updateStats("writes"); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to write object [key=" + entry.getKey() + ", " + + "val=" + entry.getValue() + ']', e); + } + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + try { + removeFromDb(ses.attachment(), key); + updateStats("removes"); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to delete object [key=" + key + ']', e); + } + } + + /** + * Select from H2 and deserialize from bytes the value pointed at by key + * @param conn {@link Connection} to use + * @param key key to llok for the value by + * @return Stored object or null if the key is missing from DB + * @throws SQLException + */ + static Object getFromDb(Connection conn, Object key) throws SQLException { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement("select v from CACHE where k = ?"); + stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); + ResultSet rs = stmt.executeQuery(); + return rs.next() ? H2CacheStore.deserialize(IOUtils.toByteArray(rs.getBinaryStream(1))) : null; + } + catch (IOException e) { + throw new IgniteException(e); + } + finally { + end(stmt, conn); + } + } + + static void putToDb(Connection conn, Object key, Object val) throws SQLException { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(H2CacheStore.INSERT); + byte[] v = H2CacheStore.serialize(val); + stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); + stmt.setBinaryStream(2, new ByteArrayInputStream(v)); + stmt.setBinaryStream(3, new ByteArrayInputStream(v)); + stmt.executeUpdate(); + } + finally { + end(stmt, conn); + } + } + + static void removeFromDb(Connection conn, Object key) throws SQLException { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement("delete from CACHE where k = ?"); + stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); + stmt.executeUpdate(); + } + finally { + end(stmt, conn); + } + } + + /** + * Increment stored stats for given field + * @param field field name + */ + private void updateStats(String field) { + Connection conn = ses.attachment(); + assert conn != null; + Statement stmt = null; + try { + stmt = conn.createStatement(); + stmt.executeUpdate("update STATS set " + field + " = " + field + " + 1;"); + } + catch (SQLException e) { + throw new IgniteException("Failed to update H2 store usage stats", e); + } + finally { + end(stmt, conn); + } + } + + /** + * Quietly close statement and connection + * @param stmt {@link Statement} to close + * @param conn {@link Connection} to close + */ + private static void end(Statement stmt, Connection conn) { + U.closeQuiet(stmt); + U.closeQuiet(conn); + } + + /** + * Turn given arbitrary {@link Object} to byte array + * @param obj {@link Object} to serialize + * @return bytes representation of given {@link Object} + */ + static byte[] serialize(Object obj) { + try (ByteArrayOutputStream b = new ByteArrayOutputStream()) { + try (ObjectOutputStream o = new ObjectOutputStream(b)) { + o.writeObject(obj); + } + return b.toByteArray(); + } + catch (Exception e) { + throw new IgniteException("Failed to serialize object to byte array [obj=" + obj, e); + } + } + + /** + * Deserialize an object from its byte array representation + * @param bytes byte array representation of the {@link Object} + * @return deserialized {@link Object} + */ + public static Object deserialize(byte[] bytes) { + try (ByteArrayInputStream b = new ByteArrayInputStream(bytes)) { + try (ObjectInputStream o = new ObjectInputStream(b)) { + return o.readObject(); + } + } + catch (Exception e) { + throw new IgniteException("Failed to deserialize object from byte array", e); + } } } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java index f50a3e0eb44de..5e86962d80847 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java @@ -1444,7 +1444,7 @@ private void checkCacheValue(Object key, @Nullable Object expVal) throws Excepti interceptor.disabled = true; if (storeEnabled()) - assertEquals("Unexpected store value", expVal, map.get(key)); + assertEquals("Unexpected store value", expVal, storeStrategy.getFromStore(key)); try { for (int i = 0; i < gridCount(); i++) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java index eaab103baf40f..a522cda485ad8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java @@ -390,7 +390,7 @@ else if (i == nearIdx) { } // Avoid reloading from store. - map.remove(key); + storeStrategy.removeFromStore(key); assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() { @SuppressWarnings("unchecked") diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java index cbcc7393d0bda..39ab7ce77b287 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java @@ -68,6 +68,6 @@ public void testPut() throws Exception { assertEquals(5, primaryCache.localPeek(key, CachePeekMode.ONHEAP).intValue()); assertNull(primaryCache.localPeek(key, CachePeekMode.OFFHEAP)); assertEquals(5, cache.get(key).intValue()); - assertEquals(5, map.get(key)); + assertEquals(5, storeStrategy.getFromStore(key)); } } \ No newline at end of file From 9cc5a2cb77224b9136f929f3e47d9aa6aa91dc01 Mon Sep 17 00:00:00 2001 From: Alexander Paschenko Date: Fri, 24 Jun 2016 17:26:21 +0300 Subject: [PATCH 02/10] IGNITE-1088 H2 cache store based tests fix; code style/javadoc fixes --- .../GridCacheAbstractFullApiSelfTest.java | 150 ++++++------ .../GridCacheAbstractMetricsSelfTest.java | 2 +- .../cache/GridCacheAbstractSelfTest.java | 220 ++++++++++++------ .../GridCacheInterceptorAbstractSelfTest.java | 2 +- ...CacheNearOnlyMultiNodeFullApiSelfTest.java | 2 +- ...OffHeapTieredMultiNodeFullApiSelfTest.java | 2 +- 6 files changed, 215 insertions(+), 163 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index b973125abc77e..bc2593138fbe1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -61,7 +61,6 @@ import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; -import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -275,12 +274,9 @@ protected CacheMemoryMode memoryMode() { * @throws Exception if failed. */ public void testWriteThroughTx() { - /*if(isMultiJvm()) - fail("https://issues.apache.org/jira/browse/IGNITE-1088");*/ - String key = "writeThroughKey"; - storeStrategy.removeFromStore(key); + storeStgy.removeFromStore(key); try (final Transaction transaction = grid(0).transactions().txStart()) { IgniteCache cache = jcache(0); @@ -297,7 +293,7 @@ public void testWriteThroughTx() { transaction.commit(); } - assertEquals(2, storeStrategy.getFromStore(key)); + assertEquals(2, storeStgy.getFromStore(key)); } /** @@ -306,18 +302,15 @@ public void testWriteThroughTx() { * @throws Exception if failed. */ public void testNoReadThroughTx() { - if(isMultiJvm()) - fail("https://issues.apache.org/jira/browse/IGNITE-1088"); - String key = "writeThroughKey"; IgniteCache cache = jcache(0); - storeStrategy.resetStore(); + storeStgy.resetStore(); cache.put(key, 1); - storeStrategy.putToStore(key, 2); + storeStgy.putToStore(key, 2); try (final Transaction transaction = grid(0).transactions().txStart()) { Integer old = cache.get(key); @@ -331,7 +324,7 @@ public void testNoReadThroughTx() { transaction.commit(); } - assertEquals(0, storeStrategy.getReads()); + assertEquals(0, storeStgy.getReads()); } /** {@inheritDoc} */ @@ -2068,7 +2061,7 @@ public void testGetAndPutIfAbsent() throws Exception { // Check db. if (!isMultiJvm()) { - storeStrategy.putToStore("key3", 3); + storeStgy.putToStore("key3", 3); assertEquals((Integer)3, cache.getAndPutIfAbsent("key3", 4)); @@ -2140,7 +2133,7 @@ public void testGetAndPutIfAbsentAsync() throws Exception { // Check db. if (!isMultiJvm()) { - storeStrategy.putToStore("key3", 3); + storeStgy.putToStore("key3", 3); cacheAsync.getAndPutIfAbsent("key3", 4); @@ -2189,7 +2182,7 @@ public void testPutIfAbsent() throws Exception { // Check db. if (!isMultiJvm()) { - storeStrategy.putToStore("key3", 3); + storeStgy.putToStore("key3", 3); assertFalse(cache.putIfAbsent("key3", 4)); } @@ -2262,7 +2255,7 @@ private void checkPutxIfAbsentAsync(boolean inTx) throws Exception { // Check db. if (!isMultiJvm()) { - storeStrategy.putToStore("key3", 3); + storeStgy.putToStore("key3", 3); cacheAsync.putIfAbsent("key3", 4); @@ -2366,7 +2359,7 @@ public void testGetAndReplace() throws Exception { assert cache.get("key") == 4; if (!isMultiJvm()) { - storeStrategy.putToStore("key2", 5); + storeStgy.putToStore("key2", 5); info("key2 5 -> 6"); @@ -2425,7 +2418,7 @@ public void testReplace() throws Exception { assert cache.get("key") == 4; if (!isMultiJvm()) { - storeStrategy.putToStore("key2", 5); + storeStgy.putToStore("key2", 5); assert cache.replace("key2", 6); @@ -2501,7 +2494,7 @@ public void testGetAndReplaceAsync() throws Exception { assert cache.get("key") == 4; if (!isMultiJvm()) { - storeStrategy.putToStore("key2", 5); + storeStgy.putToStore("key2", 5); cacheAsync.replace("key2", 5, 6); @@ -2563,7 +2556,7 @@ public void testReplacexAsync() throws Exception { assert cache.get("key") == 4; if (!isMultiJvm()) { - storeStrategy.putToStore("key2", 5); + storeStgy.putToStore("key2", 5); cacheAsync.replace("key2", 6); @@ -2688,9 +2681,6 @@ public void testDeletedEntriesFlag() throws Exception { * @throws Exception If failed. */ public void testRemoveLoad() throws Exception { - if (isMultiJvm()) - fail("https://issues.apache.org/jira/browse/IGNITE-1088"); - int cnt = 10; Set keys = new HashSet<>(); @@ -2701,7 +2691,7 @@ public void testRemoveLoad() throws Exception { jcache().removeAll(keys); for (String key : keys) - storeStrategy.putToStore(key, Integer.parseInt(key)); + storeStgy.putToStore(key, Integer.parseInt(key)); for (int g = 0; g < gridCount(); g++) grid(g).cache(null).localLoadCache(null); @@ -3764,7 +3754,7 @@ private void checkTtl(boolean inTx, boolean oldEntry) throws Exception { } // Avoid reloading from store. - storeStrategy.removeFromStore(key); + storeStgy.removeFromStore(key); assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() { @SuppressWarnings("unchecked") @@ -4803,9 +4793,6 @@ protected void testGlobalClearKey(boolean async, Collection keysToRmv) t * @throws Exception If failed. */ public void testWithSkipStore() throws Exception { - if(isMultiJvm()) - fail("https://issues.apache.org/jira/browse/IGNITE-1088"); - IgniteCache cache = grid(0).cache(null); IgniteCache cacheSkipStore = cache.withSkipStore(); @@ -4813,7 +4800,7 @@ public void testWithSkipStore() throws Exception { List keys = primaryKeysForCache(cache, 10); for (int i = 0; i < keys.size(); ++i) - storeStrategy.putToStore(keys.get(i), i); + storeStgy.putToStore(keys.get(i), i); assertFalse(cacheSkipStore.iterator().hasNext()); @@ -4851,7 +4838,7 @@ public void testWithSkipStore() throws Exception { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertEquals(i, storeStrategy.getFromStore(key)); + assertEquals(i, storeStgy.getFromStore(key)); } for (int i = 0; i < keys.size(); ++i) { @@ -4860,13 +4847,13 @@ public void testWithSkipStore() throws Exception { Integer val1 = -1; cacheSkipStore.put(key, val1); - assertEquals(i, storeStrategy.getFromStore(key)); + assertEquals(i, storeStgy.getFromStore(key)); assertEquals(val1, cacheSkipStore.get(key)); Integer val2 = -2; assertEquals(val1, cacheSkipStore.invoke(key, new SetValueProcessor(val2))); - assertEquals(i, storeStrategy.getFromStore(key)); + assertEquals(i, storeStgy.getFromStore(key)); assertEquals(val2, cacheSkipStore.get(key)); } @@ -4875,7 +4862,7 @@ public void testWithSkipStore() throws Exception { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(storeStrategy.isInStore(key)); + assertTrue(storeStgy.isInStore(key)); } for (String key : keys) { @@ -4883,37 +4870,37 @@ public void testWithSkipStore() throws Exception { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(storeStrategy.isInStore(key)); + assertFalse(storeStgy.isInStore(key)); - storeStrategy.putToStore(key, 0); + storeStgy.putToStore(key, 0); Integer val = -1; assertNull(cacheSkipStore.invoke(key, new SetValueProcessor(val))); - assertEquals(0, storeStrategy.getFromStore(key)); + assertEquals(0, storeStgy.getFromStore(key)); assertEquals(val, cacheSkipStore.get(key)); cache.remove(key); - storeStrategy.putToStore(key, 0); + storeStgy.putToStore(key, 0); assertTrue(cacheSkipStore.putIfAbsent(key, val)); assertEquals(val, cacheSkipStore.get(key)); - assertEquals(0, storeStrategy.getFromStore(key)); + assertEquals(0, storeStgy.getFromStore(key)); cache.remove(key); - storeStrategy.putToStore(key, 0); + storeStgy.putToStore(key, 0); assertNull(cacheSkipStore.getAndPut(key, val)); assertEquals(val, cacheSkipStore.get(key)); - assertEquals(0, storeStrategy.getFromStore(key)); + assertEquals(0, storeStgy.getFromStore(key)); cache.remove(key); } assertFalse(cacheSkipStore.iterator().hasNext()); - assertTrue(storeStrategy.getStoreSize() == 0); + assertTrue(storeStgy.getStoreSize() == 0); assertTrue(cache.size(ALL) == 0); // putAll/removeAll from multiple nodes. @@ -4928,7 +4915,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertFalse(storeStrategy.isInStore(key)); + assertFalse(storeStgy.isInStore(key)); } cache.putAll(data); @@ -4936,7 +4923,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(storeStrategy.isInStore(key)); + assertTrue(storeStgy.isInStore(key)); } cacheSkipStore.removeAll(data.keySet()); @@ -4944,7 +4931,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(storeStrategy.isInStore(key)); + assertTrue(storeStgy.isInStore(key)); } cacheSkipStore.putAll(data); @@ -4952,7 +4939,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(storeStrategy.isInStore(key)); + assertTrue(storeStgy.isInStore(key)); } cacheSkipStore.removeAll(data.keySet()); @@ -4960,7 +4947,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(storeStrategy.isInStore(key)); + assertTrue(storeStgy.isInStore(key)); } cache.removeAll(data.keySet()); @@ -4968,24 +4955,24 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(storeStrategy.isInStore(key)); + assertFalse(storeStgy.isInStore(key)); } - assertTrue(storeStrategy.getStoreSize() == 0); + assertTrue(storeStgy.getStoreSize() == 0); // Miscellaneous checks. String newKey = "New key"; - assertFalse(storeStrategy.isInStore(newKey)); + assertFalse(storeStgy.isInStore(newKey)); cacheSkipStore.put(newKey, 1); - assertFalse(storeStrategy.isInStore(newKey)); + assertFalse(storeStgy.isInStore(newKey)); cache.put(newKey, 1); - assertTrue(storeStrategy.isInStore(newKey)); + assertTrue(storeStgy.isInStore(newKey)); Iterator> it = cacheSkipStore.iterator(); @@ -4995,29 +4982,26 @@ public void testWithSkipStore() throws Exception { String rmvKey = entry.getKey(); - assertTrue(storeStrategy.isInStore(rmvKey)); + assertTrue(storeStgy.isInStore(rmvKey)); it.remove(); assertNull(cacheSkipStore.get(rmvKey)); - assertTrue(storeStrategy.isInStore(rmvKey)); + assertTrue(storeStgy.isInStore(rmvKey)); assertTrue(cache.size(ALL) == 0); assertTrue(cacheSkipStore.size(ALL) == 0); cache.remove(rmvKey); - assertTrue(storeStrategy.getStoreSize() == 0); + assertTrue(storeStgy.getStoreSize() == 0); } /** * @throws Exception If failed. */ public void testWithSkipStoreRemoveAll() throws Exception { - if (isMultiJvm()) - fail("https://issues.apache.org/jira/browse/IGNITE-1088"); - if (atomicityMode() == TRANSACTIONAL || (atomicityMode() == ATOMIC && nearEnabled())) // TODO IGNITE-373. return; @@ -5035,7 +5019,7 @@ public void testWithSkipStoreRemoveAll() throws Exception { for (String key : data.keySet()) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(storeStrategy.isInStore(key)); + assertTrue(storeStgy.isInStore(key)); } cacheSkipStore.removeAll(); @@ -5043,7 +5027,7 @@ public void testWithSkipStoreRemoveAll() throws Exception { for (String key : data.keySet()) { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(storeStrategy.isInStore(key)); + assertTrue(storeStgy.isInStore(key)); } cache.removeAll(); @@ -5051,7 +5035,7 @@ public void testWithSkipStoreRemoveAll() throws Exception { for (String key : data.keySet()) { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(storeStrategy.isInStore(key)); + assertFalse(storeStgy.isInStore(key)); } } @@ -5127,7 +5111,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); - assertFalse(storeStrategy.isInStore(key)); + assertFalse(storeStgy.isInStore(key)); } tx.commit(); @@ -5136,10 +5120,10 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); - assertFalse(storeStrategy.isInStore(key)); + assertFalse(storeStgy.isInStore(key)); } - assertEquals(0, storeStrategy.getStoreSize()); + assertEquals(0, storeStgy.getStoreSize()); // cacheSkipStore putAll(..)/removeAll(..) check. try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { @@ -5153,10 +5137,10 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); - assertFalse(storeStrategy.isInStore(key)); + assertFalse(storeStgy.isInStore(key)); } - storeStrategy.putAllToStore(data); + storeStgy.putAllToStore(data); try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { cacheSkipStore.removeAll(data.keySet()); @@ -5167,12 +5151,12 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(storeStrategy.isInStore(key)); + assertTrue(storeStgy.isInStore(key)); cache.remove(key); } - assertTrue(storeStrategy.getStoreSize() == 0); + assertTrue(storeStgy.getStoreSize() == 0); // cache putAll(..)/removeAll(..) check. try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { @@ -5181,7 +5165,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertFalse(storeStrategy.isInStore(key)); + assertFalse(storeStgy.isInStore(key)); } cache.removeAll(data.keySet()); @@ -5189,13 +5173,13 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(storeStrategy.isInStore(key)); + assertFalse(storeStgy.isInStore(key)); } tx.commit(); } - assertTrue(storeStrategy.getStoreSize() == 0); + assertTrue(storeStgy.getStoreSize() == 0); // putAll(..) from both cacheSkipStore and cache. try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { @@ -5216,7 +5200,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertFalse(storeStrategy.isInStore(key)); + assertFalse(storeStgy.isInStore(key)); } tx.commit(); @@ -5227,7 +5211,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertFalse(storeStrategy.isInStore(key)); + assertFalse(storeStgy.isInStore(key)); } for (int i = keys.size() / 2; i < keys.size(); i++) { @@ -5235,7 +5219,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(storeStrategy.isInStore(key)); + assertTrue(storeStgy.isInStore(key)); } cache.removeAll(data.keySet()); @@ -5243,16 +5227,16 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key: keys) { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(storeStrategy.isInStore(key)); + assertFalse(storeStgy.isInStore(key)); } // Check that read-through is disabled when cacheSkipStore is used. for (int i = 0; i < keys.size(); i++) - storeStrategy.putToStore(keys.get(i), i); + storeStgy.putToStore(keys.get(i), i); assertTrue(cacheSkipStore.size(ALL) == 0); assertTrue(cache.size(ALL) == 0); - assertTrue(storeStrategy.getStoreSize() != 0); + assertTrue(storeStgy.getStoreSize() != 0); try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { assertTrue(cacheSkipStore.getAll(data.keySet()).size() == 0); @@ -5275,7 +5259,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { for (String key : data.keySet()) { - storeStrategy.putToStore(key, 0); + storeStgy.putToStore(key, 0); assertNull(cacheSkipStore.invoke(key, new SetValueProcessor(val))); } @@ -5284,7 +5268,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, } for (String key : data.keySet()) { - assertEquals(0, storeStrategy.getFromStore(key)); + assertEquals(0, storeStgy.getFromStore(key)); assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); @@ -5294,7 +5278,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { for (String key : data.keySet()) { - storeStrategy.putToStore(key, 0); + storeStgy.putToStore(key, 0); assertTrue(cacheSkipStore.putIfAbsent(key, val)); } @@ -5303,7 +5287,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, } for (String key : data.keySet()) { - assertEquals(0, storeStrategy.getFromStore(key)); + assertEquals(0, storeStgy.getFromStore(key)); assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); @@ -5313,7 +5297,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { for (String key : data.keySet()) { - storeStrategy.putToStore(key, 0); + storeStgy.putToStore(key, 0); assertNull(cacheSkipStore.getAndPut(key, val)); } @@ -5322,7 +5306,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, } for (String key : data.keySet()) { - assertEquals(0, storeStrategy.getFromStore(key)); + assertEquals(0, storeStgy.getFromStore(key)); assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); @@ -5342,7 +5326,7 @@ private void checkEmpty(IgniteCache cache, IgniteCache> storeFactory = storeStrategy.getStoreFactory(); + Factory> storeFactory = storeStgy.getStoreFactory(); CacheStore store = storeFactory.create(); if (store != null) { @@ -249,7 +250,7 @@ protected CacheConfiguration cacheConfiguration(String gridName) throws Exceptio cfg.setReadThrough(true); cfg.setWriteThrough(true); cfg.setLoadPreviousValue(true); - storeStrategy.updateCacheConfiguration(cfg); + storeStgy.updateCacheConfiguration(cfg); } cfg.setSwapEnabled(swapEnabled()); @@ -547,24 +548,30 @@ public SumReducer() { } /** Interface for cache store backend manipulation and stats routines */ - protected interface TestCacheStoreStrategy { - - /** */ - void afterTestsStopped(); - - /** */ + public interface TestCacheStoreStrategy { + /** + * @return Number of reads to store + */ int getReads(); - /** */ + /** + * @return Number of writes to store + */ int getWrites(); - /** */ + /** + * @return Number of removals from store + */ int getRemoves(); - /** */ + /** + * @return Total number of items in the store + */ int getStoreSize(); - /** */ + /** + * Clear store contents + */ void resetStore(); /** @@ -575,30 +582,42 @@ protected interface TestCacheStoreStrategy { */ void putToStore(Object key, Object val); - /** */ + /** + * @param data items to put to store + */ void putAllToStore(Map data); - /** */ + /** + * @param key to look for + * @return {@link Object} pointed to by given key or null if no object is present + */ Object getFromStore(Object key); - /** */ + /** + * @param key to look for + */ void removeFromStore(Object key); - /** */ + /** + * @param key to look for + * @return true if object pointed to by key is in store, false otherwise + */ boolean isInStore(Object key); - /** */ + /** + * Called from {@link #cacheConfiguration(String)}, this method allows implementations to tune cache config + * @param configuration {@link CacheConfiguration} to tune + */ void updateCacheConfiguration(CacheConfiguration configuration); /** - * @return Factory for write-through storage emulator + * @return {@link Factory} for write-through storage emulator */ Factory> getStoreFactory(); } /** {@link TestCacheStoreStrategy} implemented as a wrapper around {@link #map} */ - protected static class MapCacheStoreStrategy implements TestCacheStoreStrategy { - + private static class MapCacheStoreStrategy implements TestCacheStoreStrategy { /** Removes counter. */ private final static AtomicInteger removes = new AtomicInteger(); @@ -611,11 +630,6 @@ protected static class MapCacheStoreStrategy implements TestCacheStoreStrategy { /** Store map. */ private final static Map map = new ConcurrentHashMap8<>(); - /** {@inheritDoc} */ - @Override public void afterTestsStopped() { - resetStore(); - } - /** {@inheritDoc} */ @Override public int getReads() { return reads.get(); @@ -670,8 +684,8 @@ protected static class MapCacheStoreStrategy implements TestCacheStoreStrategy { return map.containsKey(key); } - @Override - public void updateCacheConfiguration(CacheConfiguration configuration) { + /** {@inheritDoc} */ + @Override public void updateCacheConfiguration(CacheConfiguration configuration) { // No-op. } @@ -717,26 +731,30 @@ public static class MapCacheStore extends CacheStoreAdapter { } } + /** {@link TestCacheStoreStrategy} backed by H2 in-memory database */ protected static class H2CacheStoreStrategy implements TestCacheStoreStrategy { + /** Pool to get {@link Connection}s from */ private final JdbcConnectionPool dataSrc; - /** Create table script. */ + /** Script that creates CACHE table */ private static final String CREATE_CACHE_TABLE = "create table if not exists CACHE(k binary not null, v binary not null, PRIMARY KEY(k));"; + /** Script that creates STATS table */ private static final String CREATE_STATS_TABLE = "create table if not exists STATS(id bigint not null, reads int not null, writes int not null, " + "removes int not null, PRIMARY KEY(id));"; + /** Script that populates STATS table */ private static final String POPULATE_STATS_TABLE = "delete from STATS;\n" + "insert into STATS(id, reads, writes, removes) values(1, 0, 0, 0);"; /** */ - public H2CacheStoreStrategy() throws IgniteCheckedException { + H2CacheStoreStrategy() throws IgniteCheckedException { try { Server.createTcpServer("-tcpDaemon").start(); - dataSrc = createDataSource(); + dataSrc = H2CacheStoreSessionListenerFactory.createDataSource(); try (Connection conn = connection()) { RunScript.execute(conn, new StringReader(CREATE_CACHE_TABLE)); @@ -749,11 +767,6 @@ public H2CacheStoreStrategy() throws IgniteCheckedException { } } - /** {@inheritDoc} */ - @Override public void afterTestsStopped() { - - } - /** {@inheritDoc} */ @Override public int getReads() { return querySingleInt("select reads from STATS limit 0,1;", "Failed to query number of reads from STATS table"); @@ -787,11 +800,16 @@ public H2CacheStoreStrategy() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public void putToStore(Object key, Object val) { + Connection conn = null; try { - H2CacheStore.putToDb(connection(), key, val); + conn = connection(); + H2CacheStore.putToDb(conn, key, val); } catch (SQLException e) { - e.printStackTrace(); + throw new IgniteException(e); + } + finally { + U.closeQuiet(conn); } } @@ -815,28 +833,39 @@ public H2CacheStoreStrategy() throws IgniteCheckedException { throw new IgniteException(e); } finally { - H2CacheStore.end(stmt, conn); + U.closeQuiet(stmt); + U.closeQuiet(conn); } } /** {@inheritDoc} */ @Override public Object getFromStore(Object key) { + Connection conn = null; try { - return H2CacheStore.getFromDb(connection(), key); + conn = connection(); + return H2CacheStore.getFromDb(conn, key); } catch (SQLException e) { throw new IgniteException(e); } + finally { + U.closeQuiet(conn); + } } /** {@inheritDoc} */ @Override public void removeFromStore(Object key) { + Connection conn = null; try { - H2CacheStore.removeFromDb(connection(), key); + conn = connection(); + H2CacheStore.removeFromDb(conn, key); } catch (SQLException e) { throw new IgniteException(e); } + finally { + U.closeQuiet(conn); + } } /** {@inheritDoc} */ @@ -844,17 +873,28 @@ public H2CacheStoreStrategy() throws IgniteCheckedException { return getFromStore(key) != null; } + /** + * @return New {@link Connection} from {@link #dataSrc} + * @throws SQLException if failed + */ private Connection connection() throws SQLException { return dataSrc.getConnection(); } - private int querySingleInt(String query, String errorMsg) { + /** + * Retrieve single int value from {@link ResultSet} returned by given query + * @param qry Query string (fully populated, with params) + * @param errorMsg Message for {@link IgniteException} to bear in case of failure + * @return requested value + */ + private int querySingleInt(String qry, String errorMsg) { Connection conn = null; PreparedStatement stmt = null; + ResultSet rs = null; try { conn = connection(); - stmt = conn.prepareStatement(query); - ResultSet rs = stmt.executeQuery(); + stmt = conn.prepareStatement(qry); + rs = stmt.executeQuery(); if (rs.next()) return rs.getInt(1); else @@ -864,15 +904,12 @@ private int querySingleInt(String query, String errorMsg) { throw new IgniteException(e); } finally { - H2CacheStore.end(stmt, conn); + U.closeQuiet(rs); + U.closeQuiet(stmt); + U.closeQuiet(conn); } } - /** */ - private static JdbcConnectionPool createDataSource() { - return JdbcConnectionPool.create("jdbc:h2:tcp://localhost/mem:TestDb;mode=MySQL", "sa", ""); - } - /** {@inheritDoc} */ @Override public void updateCacheConfiguration(CacheConfiguration configuration) { configuration.setCacheStoreSessionListenerFactories(new H2CacheStoreSessionListenerFactory()); @@ -893,15 +930,24 @@ private static class H2StoreFactory implements Factory { + /** + * @return Connection pool + */ + static JdbcConnectionPool createDataSource() { + return JdbcConnectionPool.create("jdbc:h2:tcp://localhost/mem:TestDb;mode=MySQL;DB_CLOSE_DELAY=-1", "sa", ""); + } + + /** {@inheritDoc} */ @Override public CacheStoreSessionListener create() { CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener(); - lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:tcp://localhost/mem:TestDb;mode=MySQL;DB_CLOSE_DELAY=-1", "sa", "")); + lsnr.setDataSource(createDataSource()); return lsnr; } } - + /** H2 backed {@link CacheStoreAdapter} implementations */ public static class H2CacheStore extends CacheStoreAdapter { /** Store session */ @CacheStoreSessionResource @@ -911,6 +957,26 @@ public static class H2CacheStore extends CacheStoreAdapter { private static final String INSERT = "insert into CACHE(k, v) values(?, ?) on duplicate key update v = ?;"; + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure clo, Object... args) { + Connection conn = ses.attachment(); + Statement stmt = null; + ResultSet rs = null; + try { + stmt = conn.createStatement(); + rs = stmt.executeQuery("select * from CACHE"); + while (rs.next()) + clo.apply(deserialize(rs.getBytes(1)), deserialize(rs.getBytes(2))); + } + catch (SQLException e) { + throw new IgniteException(e); + } + finally { + U.closeQuiet(rs); + U.closeQuiet(stmt); + } + } + /** {@inheritDoc} */ @Override public Object load(Object key) throws CacheLoaderException { try { @@ -955,20 +1021,26 @@ public static class H2CacheStore extends CacheStoreAdapter { */ static Object getFromDb(Connection conn, Object key) throws SQLException { PreparedStatement stmt = null; + ResultSet rs = null; try { stmt = conn.prepareStatement("select v from CACHE where k = ?"); stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); - ResultSet rs = stmt.executeQuery(); - return rs.next() ? H2CacheStore.deserialize(IOUtils.toByteArray(rs.getBinaryStream(1))) : null; - } - catch (IOException e) { - throw new IgniteException(e); + rs = stmt.executeQuery(); + return rs.next() ? H2CacheStore.deserialize(rs.getBytes(1)) : null; } finally { - end(stmt, conn); + U.closeQuiet(rs); + U.closeQuiet(stmt); } } + /** + * Put key-value pair to H2 + * @param conn {@link Connection} to use + * @param key key + * @param val value + * @throws SQLException if failed + */ static void putToDb(Connection conn, Object key, Object val) throws SQLException { PreparedStatement stmt = null; try { @@ -980,10 +1052,16 @@ static void putToDb(Connection conn, Object key, Object val) throws SQLException stmt.executeUpdate(); } finally { - end(stmt, conn); + U.closeQuiet(stmt); } } + /** + * Remove given key and its value from H2 + * @param conn {@link Connection} to invoke query upon + * @param key to remove + * @throws SQLException if failed + */ static void removeFromDb(Connection conn, Object key) throws SQLException { PreparedStatement stmt = null; try { @@ -992,7 +1070,7 @@ static void removeFromDb(Connection conn, Object key) throws SQLException { stmt.executeUpdate(); } finally { - end(stmt, conn); + U.closeQuiet(stmt); } } @@ -1012,20 +1090,10 @@ private void updateStats(String field) { throw new IgniteException("Failed to update H2 store usage stats", e); } finally { - end(stmt, conn); + U.closeQuiet(stmt); } } - /** - * Quietly close statement and connection - * @param stmt {@link Statement} to close - * @param conn {@link Connection} to close - */ - private static void end(Statement stmt, Connection conn) { - U.closeQuiet(stmt); - U.closeQuiet(conn); - } - /** * Turn given arbitrary {@link Object} to byte array * @param obj {@link Object} to serialize diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java index 5e86962d80847..68bfb6f57f562 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheInterceptorAbstractSelfTest.java @@ -1444,7 +1444,7 @@ private void checkCacheValue(Object key, @Nullable Object expVal) throws Excepti interceptor.disabled = true; if (storeEnabled()) - assertEquals("Unexpected store value", expVal, storeStrategy.getFromStore(key)); + assertEquals("Unexpected store value", expVal, storeStgy.getFromStore(key)); try { for (int i = 0; i < gridCount(); i++) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java index a522cda485ad8..411e1bfd90a62 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java @@ -390,7 +390,7 @@ else if (i == nearIdx) { } // Avoid reloading from store. - storeStrategy.removeFromStore(key); + storeStgy.removeFromStore(key); assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() { @SuppressWarnings("unchecked") diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java index 39ab7ce77b287..0386510d3bc5b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest.java @@ -68,6 +68,6 @@ public void testPut() throws Exception { assertEquals(5, primaryCache.localPeek(key, CachePeekMode.ONHEAP).intValue()); assertNull(primaryCache.localPeek(key, CachePeekMode.OFFHEAP)); assertEquals(5, cache.get(key).intValue()); - assertEquals(5, storeStrategy.getFromStore(key)); + assertEquals(5, storeStgy.getFromStore(key)); } } \ No newline at end of file From 719915210b83a59ee660452fdf9d2044879f3bc3 Mon Sep 17 00:00:00 2001 From: Alexander Paschenko Date: Fri, 24 Jun 2016 20:31:49 +0300 Subject: [PATCH 03/10] IGNITE-1088 Moved test cache store related stuff to separate classes to use in other tests affected by the issue --- .../cache/GridCacheAbstractSelfTest.java | 607 ------------------ .../cache/H2CacheStoreStrategy.java | 441 +++++++++++++ .../cache/MapCacheStoreStrategy.java | 144 +++++ .../cache/TestCacheStoreStrategy.java | 92 +++ 4 files changed, 677 insertions(+), 607 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java index a298a727a3509..adb0b543a864f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java @@ -17,39 +17,18 @@ package org.apache.ignite.internal.processors.cache; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.StringReader; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; import javax.cache.configuration.Factory; -import javax.cache.configuration.FactoryBuilder; -import javax.cache.integration.CacheLoaderException; -import javax.cache.integration.CacheWriterException; -import org.apache.commons.io.IOUtils; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.store.CacheStore; -import org.apache.ignite.cache.store.CacheStoreAdapter; -import org.apache.ignite.cache.store.CacheStoreSession; -import org.apache.ignite.cache.store.CacheStoreSessionListener; -import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; @@ -62,20 +41,14 @@ import org.apache.ignite.internal.util.typedef.R1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.resources.CacheStoreSessionResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; -import org.h2.jdbcx.JdbcConnectionPool; -import org.h2.tools.RunScript; -import org.h2.tools.Server; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; @@ -547,584 +520,4 @@ public SumReducer() { } } - /** Interface for cache store backend manipulation and stats routines */ - public interface TestCacheStoreStrategy { - /** - * @return Number of reads to store - */ - int getReads(); - - /** - * @return Number of writes to store - */ - int getWrites(); - - /** - * @return Number of removals from store - */ - int getRemoves(); - - /** - * @return Total number of items in the store - */ - int getStoreSize(); - - /** - * Clear store contents - */ - void resetStore(); - - /** - * Put entry to cache store - * - * @param key Key. - * @param val Value. - */ - void putToStore(Object key, Object val); - - /** - * @param data items to put to store - */ - void putAllToStore(Map data); - - /** - * @param key to look for - * @return {@link Object} pointed to by given key or null if no object is present - */ - Object getFromStore(Object key); - - /** - * @param key to look for - */ - void removeFromStore(Object key); - - /** - * @param key to look for - * @return true if object pointed to by key is in store, false otherwise - */ - boolean isInStore(Object key); - - /** - * Called from {@link #cacheConfiguration(String)}, this method allows implementations to tune cache config - * @param configuration {@link CacheConfiguration} to tune - */ - void updateCacheConfiguration(CacheConfiguration configuration); - - /** - * @return {@link Factory} for write-through storage emulator - */ - Factory> getStoreFactory(); - } - - /** {@link TestCacheStoreStrategy} implemented as a wrapper around {@link #map} */ - private static class MapCacheStoreStrategy implements TestCacheStoreStrategy { - /** Removes counter. */ - private final static AtomicInteger removes = new AtomicInteger(); - - /** Writes counter. */ - private final static AtomicInteger writes = new AtomicInteger(); - - /** Reads counter. */ - private final static AtomicInteger reads = new AtomicInteger(); - - /** Store map. */ - private final static Map map = new ConcurrentHashMap8<>(); - - /** {@inheritDoc} */ - @Override public int getReads() { - return reads.get(); - } - - /** {@inheritDoc} */ - @Override public int getWrites() { - return writes.get(); - } - - /** {@inheritDoc} */ - @Override public int getRemoves() { - return removes.get(); - } - - /** {@inheritDoc} */ - @Override public int getStoreSize() { - return map.size(); - } - - /** {@inheritDoc} */ - @Override public void resetStore() { - map.clear(); - - reads.set(0); - writes.set(0); - removes.set(0); - } - - /** {@inheritDoc} */ - @Override public void putToStore(Object key, Object val) { - map.put(key, val); - } - - /** {@inheritDoc} */ - @Override public void putAllToStore(Map data) { - map.putAll(data); - } - - /** {@inheritDoc} */ - @Override public Object getFromStore(Object key) { - return map.get(key); - } - - /** {@inheritDoc} */ - @Override public void removeFromStore(Object key) { - map.remove(key); - } - - /** {@inheritDoc} */ - @Override public boolean isInStore(Object key) { - return map.containsKey(key); - } - - /** {@inheritDoc} */ - @Override public void updateCacheConfiguration(CacheConfiguration configuration) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public Factory> getStoreFactory() { - return FactoryBuilder.factoryOf(MapCacheStore.class); - } - - /** Serializable {@link #map} backed cache store factory */ - private static class MapStoreFactory implements Factory> { - /** {@inheritDoc} */ - @Override public CacheStore create() { - return new MapCacheStore(); - } - } - - /** {@link CacheStore} backed by {@link #map} */ - public static class MapCacheStore extends CacheStoreAdapter { - - /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure clo, Object... args) { - for (Map.Entry e : map.entrySet()) - clo.apply(e.getKey(), e.getValue()); - } - - /** {@inheritDoc} */ - @Override public Object load(Object key) { - reads.incrementAndGet(); - return map.get(key); - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry e) { - writes.incrementAndGet(); - map.put(e.getKey(), e.getValue()); - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) { - removes.incrementAndGet(); - map.remove(key); - } - } - } - - /** {@link TestCacheStoreStrategy} backed by H2 in-memory database */ - protected static class H2CacheStoreStrategy implements TestCacheStoreStrategy { - /** Pool to get {@link Connection}s from */ - private final JdbcConnectionPool dataSrc; - - /** Script that creates CACHE table */ - private static final String CREATE_CACHE_TABLE = - "create table if not exists CACHE(k binary not null, v binary not null, PRIMARY KEY(k));"; - - /** Script that creates STATS table */ - private static final String CREATE_STATS_TABLE = - "create table if not exists STATS(id bigint not null, reads int not null, writes int not null, " + - "removes int not null, PRIMARY KEY(id));"; - - /** Script that populates STATS table */ - private static final String POPULATE_STATS_TABLE = - "delete from STATS;\n" + - "insert into STATS(id, reads, writes, removes) values(1, 0, 0, 0);"; - - /** */ - H2CacheStoreStrategy() throws IgniteCheckedException { - try { - Server.createTcpServer("-tcpDaemon").start(); - dataSrc = H2CacheStoreSessionListenerFactory.createDataSource(); - - try (Connection conn = connection()) { - RunScript.execute(conn, new StringReader(CREATE_CACHE_TABLE)); - RunScript.execute(conn, new StringReader(CREATE_STATS_TABLE)); - RunScript.execute(conn, new StringReader(POPULATE_STATS_TABLE)); - } - } - catch (SQLException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public int getReads() { - return querySingleInt("select reads from STATS limit 0,1;", "Failed to query number of reads from STATS table"); - } - - /** {@inheritDoc} */ - @Override public int getWrites() { - return querySingleInt("select writes from STATS limit 0,1;", "Failed to query number of writes from STATS table"); - } - - /** {@inheritDoc} */ - @Override public int getRemoves() { - return querySingleInt("select removes from STATS limit 0,1;", "Failed to query number of removals from STATS table"); - } - - /** {@inheritDoc} */ - @Override public int getStoreSize() { - return querySingleInt("select count(*) from CACHE;", "Failed to query number of rows from CACHE table"); - } - - /** {@inheritDoc} */ - @Override public void resetStore() { - try (Connection conn = connection()) { - RunScript.execute(conn, new StringReader("delete from CACHE;")); - RunScript.execute(conn, new StringReader(POPULATE_STATS_TABLE)); - } - catch (SQLException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - @Override public void putToStore(Object key, Object val) { - Connection conn = null; - try { - conn = connection(); - H2CacheStore.putToDb(conn, key, val); - } - catch (SQLException e) { - throw new IgniteException(e); - } - finally { - U.closeQuiet(conn); - } - } - - /** {@inheritDoc} */ - @Override public void putAllToStore(Map data) { - Connection conn = null; - PreparedStatement stmt = null; - try { - conn = connection(); - stmt = conn.prepareStatement(H2CacheStore.INSERT); - for (Map.Entry e : data.entrySet()) { - byte[] v = H2CacheStore.serialize(e.getValue()); - stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(e.getKey()))); - stmt.setBinaryStream(2, new ByteArrayInputStream(v)); - stmt.setBinaryStream(3, new ByteArrayInputStream(v)); - stmt.addBatch(); - } - stmt.executeBatch(); - } - catch (SQLException e) { - throw new IgniteException(e); - } - finally { - U.closeQuiet(stmt); - U.closeQuiet(conn); - } - } - - /** {@inheritDoc} */ - @Override public Object getFromStore(Object key) { - Connection conn = null; - try { - conn = connection(); - return H2CacheStore.getFromDb(conn, key); - } - catch (SQLException e) { - throw new IgniteException(e); - } - finally { - U.closeQuiet(conn); - } - } - - /** {@inheritDoc} */ - @Override public void removeFromStore(Object key) { - Connection conn = null; - try { - conn = connection(); - H2CacheStore.removeFromDb(conn, key); - } - catch (SQLException e) { - throw new IgniteException(e); - } - finally { - U.closeQuiet(conn); - } - } - - /** {@inheritDoc} */ - @Override public boolean isInStore(Object key) { - return getFromStore(key) != null; - } - - /** - * @return New {@link Connection} from {@link #dataSrc} - * @throws SQLException if failed - */ - private Connection connection() throws SQLException { - return dataSrc.getConnection(); - } - - /** - * Retrieve single int value from {@link ResultSet} returned by given query - * @param qry Query string (fully populated, with params) - * @param errorMsg Message for {@link IgniteException} to bear in case of failure - * @return requested value - */ - private int querySingleInt(String qry, String errorMsg) { - Connection conn = null; - PreparedStatement stmt = null; - ResultSet rs = null; - try { - conn = connection(); - stmt = conn.prepareStatement(qry); - rs = stmt.executeQuery(); - if (rs.next()) - return rs.getInt(1); - else - throw new IgniteException(errorMsg); - } - catch (SQLException e) { - throw new IgniteException(e); - } - finally { - U.closeQuiet(rs); - U.closeQuiet(stmt); - U.closeQuiet(conn); - } - } - - /** {@inheritDoc} */ - @Override public void updateCacheConfiguration(CacheConfiguration configuration) { - configuration.setCacheStoreSessionListenerFactories(new H2CacheStoreSessionListenerFactory()); - } - - /** {@inheritDoc} */ - @Override public Factory> getStoreFactory() { - return new H2StoreFactory(); - } - - } - - /** Serializable H2 backed cache store factory */ - private static class H2StoreFactory implements Factory> { - /** {@inheritDoc} */ - @Override public CacheStore create() { - return new H2CacheStore(); - } - } - - /** Serializable {@link Factory} producing H2 backed {@link CacheStoreSessionListener}s */ - private static class H2CacheStoreSessionListenerFactory implements Factory { - /** - * @return Connection pool - */ - static JdbcConnectionPool createDataSource() { - return JdbcConnectionPool.create("jdbc:h2:tcp://localhost/mem:TestDb;mode=MySQL;DB_CLOSE_DELAY=-1", "sa", ""); - } - - /** {@inheritDoc} */ - @Override public CacheStoreSessionListener create() { - CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener(); - lsnr.setDataSource(createDataSource()); - return lsnr; - } - } - - /** H2 backed {@link CacheStoreAdapter} implementations */ - public static class H2CacheStore extends CacheStoreAdapter { - /** Store session */ - @CacheStoreSessionResource - private CacheStoreSession ses; - - /** Template for an insert statement */ - private static final String INSERT = - "insert into CACHE(k, v) values(?, ?) on duplicate key update v = ?;"; - - /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure clo, Object... args) { - Connection conn = ses.attachment(); - Statement stmt = null; - ResultSet rs = null; - try { - stmt = conn.createStatement(); - rs = stmt.executeQuery("select * from CACHE"); - while (rs.next()) - clo.apply(deserialize(rs.getBytes(1)), deserialize(rs.getBytes(2))); - } - catch (SQLException e) { - throw new IgniteException(e); - } - finally { - U.closeQuiet(rs); - U.closeQuiet(stmt); - } - } - - /** {@inheritDoc} */ - @Override public Object load(Object key) throws CacheLoaderException { - try { - Object res = getFromDb(ses.attachment(), key); - updateStats("reads"); - return res; - } - catch (SQLException e) { - throw new CacheLoaderException("Failed to load object [key=" + key + ']', e); - } - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry entry) throws CacheWriterException { - try { - putToDb(ses.attachment(), entry.getKey(), entry.getValue()); - updateStats("writes"); - } - catch (SQLException e) { - throw new CacheWriterException("Failed to write object [key=" + entry.getKey() + ", " + - "val=" + entry.getValue() + ']', e); - } - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) throws CacheWriterException { - try { - removeFromDb(ses.attachment(), key); - updateStats("removes"); - } - catch (SQLException e) { - throw new CacheWriterException("Failed to delete object [key=" + key + ']', e); - } - } - - /** - * Select from H2 and deserialize from bytes the value pointed at by key - * @param conn {@link Connection} to use - * @param key key to llok for the value by - * @return Stored object or null if the key is missing from DB - * @throws SQLException - */ - static Object getFromDb(Connection conn, Object key) throws SQLException { - PreparedStatement stmt = null; - ResultSet rs = null; - try { - stmt = conn.prepareStatement("select v from CACHE where k = ?"); - stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); - rs = stmt.executeQuery(); - return rs.next() ? H2CacheStore.deserialize(rs.getBytes(1)) : null; - } - finally { - U.closeQuiet(rs); - U.closeQuiet(stmt); - } - } - - /** - * Put key-value pair to H2 - * @param conn {@link Connection} to use - * @param key key - * @param val value - * @throws SQLException if failed - */ - static void putToDb(Connection conn, Object key, Object val) throws SQLException { - PreparedStatement stmt = null; - try { - stmt = conn.prepareStatement(H2CacheStore.INSERT); - byte[] v = H2CacheStore.serialize(val); - stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); - stmt.setBinaryStream(2, new ByteArrayInputStream(v)); - stmt.setBinaryStream(3, new ByteArrayInputStream(v)); - stmt.executeUpdate(); - } - finally { - U.closeQuiet(stmt); - } - } - - /** - * Remove given key and its value from H2 - * @param conn {@link Connection} to invoke query upon - * @param key to remove - * @throws SQLException if failed - */ - static void removeFromDb(Connection conn, Object key) throws SQLException { - PreparedStatement stmt = null; - try { - stmt = conn.prepareStatement("delete from CACHE where k = ?"); - stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); - stmt.executeUpdate(); - } - finally { - U.closeQuiet(stmt); - } - } - - /** - * Increment stored stats for given field - * @param field field name - */ - private void updateStats(String field) { - Connection conn = ses.attachment(); - assert conn != null; - Statement stmt = null; - try { - stmt = conn.createStatement(); - stmt.executeUpdate("update STATS set " + field + " = " + field + " + 1;"); - } - catch (SQLException e) { - throw new IgniteException("Failed to update H2 store usage stats", e); - } - finally { - U.closeQuiet(stmt); - } - } - - /** - * Turn given arbitrary {@link Object} to byte array - * @param obj {@link Object} to serialize - * @return bytes representation of given {@link Object} - */ - static byte[] serialize(Object obj) { - try (ByteArrayOutputStream b = new ByteArrayOutputStream()) { - try (ObjectOutputStream o = new ObjectOutputStream(b)) { - o.writeObject(obj); - } - return b.toByteArray(); - } - catch (Exception e) { - throw new IgniteException("Failed to serialize object to byte array [obj=" + obj, e); - } - } - - /** - * Deserialize an object from its byte array representation - * @param bytes byte array representation of the {@link Object} - * @return deserialized {@link Object} - */ - public static Object deserialize(byte[] bytes) { - try (ByteArrayInputStream b = new ByteArrayInputStream(bytes)) { - try (ObjectInputStream o = new ObjectInputStream(b)) { - return o.readObject(); - } - } - catch (Exception e) { - throw new IgniteException("Failed to deserialize object from byte array", e); - } - } - } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java new file mode 100644 index 0000000000000..02718707168f2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.StringReader; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cache.store.CacheStoreSession; +import org.apache.ignite.cache.store.CacheStoreSessionListener; +import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.resources.CacheStoreSessionResource; +import org.h2.jdbcx.JdbcConnectionPool; +import org.h2.tools.RunScript; +import org.h2.tools.Server; + +/** {@link TestCacheStoreStrategy} backed by H2 in-memory database */ +public class H2CacheStoreStrategy implements TestCacheStoreStrategy { + /** Pool to get {@link Connection}s from */ + private final JdbcConnectionPool dataSrc; + + /** Script that creates CACHE table */ + private static final String CREATE_CACHE_TABLE = + "create table if not exists CACHE(k binary not null, v binary not null, PRIMARY KEY(k));"; + + /** Script that creates STATS table */ + private static final String CREATE_STATS_TABLE = + "create table if not exists STATS(id bigint not null, reads int not null, writes int not null, " + + "removes int not null, PRIMARY KEY(id));"; + + /** Script that populates STATS table */ + private static final String POPULATE_STATS_TABLE = + "delete from STATS;\n" + + "insert into STATS(id, reads, writes, removes) values(1, 0, 0, 0);"; + + /** */ + public H2CacheStoreStrategy() throws IgniteCheckedException { + try { + Server.createTcpServer().start(); + dataSrc = H2CacheStoreSessionListenerFactory.createDataSource(); + + try (Connection conn = connection()) { + RunScript.execute(conn, new StringReader(CREATE_CACHE_TABLE)); + RunScript.execute(conn, new StringReader(CREATE_STATS_TABLE)); + RunScript.execute(conn, new StringReader(POPULATE_STATS_TABLE)); + } + } + catch (SQLException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public int getReads() { + return querySingleInt("select reads from STATS limit 0,1;", "Failed to query number of reads from STATS table"); + } + + /** {@inheritDoc} */ + @Override public int getWrites() { + return querySingleInt("select writes from STATS limit 0,1;", "Failed to query number of writes from STATS table"); + } + + /** {@inheritDoc} */ + @Override public int getRemoves() { + return querySingleInt("select removes from STATS limit 0,1;", "Failed to query number of removals from STATS table"); + } + + /** {@inheritDoc} */ + @Override public int getStoreSize() { + return querySingleInt("select count(*) from CACHE;", "Failed to query number of rows from CACHE table"); + } + + /** {@inheritDoc} */ + @Override public void resetStore() { + try (Connection conn = connection()) { + RunScript.execute(conn, new StringReader("delete from CACHE;")); + RunScript.execute(conn, new StringReader(POPULATE_STATS_TABLE)); + } + catch (SQLException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void putToStore(Object key, Object val) { + Connection conn = null; + try { + conn = connection(); + H2CacheStore.putToDb(conn, key, val); + } + catch (SQLException e) { + throw new IgniteException(e); + } + finally { + U.closeQuiet(conn); + } + } + + /** {@inheritDoc} */ + @Override public void putAllToStore(Map data) { + Connection conn = null; + PreparedStatement stmt = null; + try { + conn = connection(); + stmt = conn.prepareStatement(H2CacheStore.MERGE); + for (Map.Entry e : data.entrySet()) { + stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(e.getKey()))); + stmt.setBinaryStream(2, new ByteArrayInputStream(H2CacheStore.serialize(e.getValue()))); + stmt.addBatch(); + } + stmt.executeBatch(); + } + catch (SQLException e) { + throw new IgniteException(e); + } + finally { + U.closeQuiet(stmt); + U.closeQuiet(conn); + } + } + + /** {@inheritDoc} */ + @Override public Object getFromStore(Object key) { + Connection conn = null; + try { + conn = connection(); + return H2CacheStore.getFromDb(conn, key); + } + catch (SQLException e) { + throw new IgniteException(e); + } + finally { + U.closeQuiet(conn); + } + } + + /** {@inheritDoc} */ + @Override public void removeFromStore(Object key) { + Connection conn = null; + try { + conn = connection(); + H2CacheStore.removeFromDb(conn, key); + } + catch (SQLException e) { + throw new IgniteException(e); + } + finally { + U.closeQuiet(conn); + } + } + + /** {@inheritDoc} */ + @Override public boolean isInStore(Object key) { + return getFromStore(key) != null; + } + + /** + * @return New {@link Connection} from {@link #dataSrc} + * @throws SQLException if failed + */ + private Connection connection() throws SQLException { + return dataSrc.getConnection(); + } + + /** + * Retrieve single int value from {@link ResultSet} returned by given query + * @param qry Query string (fully populated, with params) + * @param errorMsg Message for {@link IgniteException} to bear in case of failure + * @return requested value + */ + private int querySingleInt(String qry, String errorMsg) { + Connection conn = null; + PreparedStatement stmt = null; + ResultSet rs = null; + try { + conn = connection(); + stmt = conn.prepareStatement(qry); + rs = stmt.executeQuery(); + if (rs.next()) + return rs.getInt(1); + else + throw new IgniteException(errorMsg); + } + catch (SQLException e) { + throw new IgniteException(e); + } + finally { + U.closeQuiet(rs); + U.closeQuiet(stmt); + U.closeQuiet(conn); + } + } + + /** {@inheritDoc} */ + @Override public void updateCacheConfiguration(CacheConfiguration configuration) { + configuration.setCacheStoreSessionListenerFactories(new H2CacheStoreSessionListenerFactory()); + } + + /** {@inheritDoc} */ + @Override public Factory> getStoreFactory() { + return new H2StoreFactory(); + } + + /** Serializable H2 backed cache store factory */ + public static class H2StoreFactory implements Factory> { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new H2CacheStore(); + } + } + + /** Serializable {@link Factory} producing H2 backed {@link CacheStoreSessionListener}s */ + public static class H2CacheStoreSessionListenerFactory implements Factory { + /** + * @return Connection pool + */ + static JdbcConnectionPool createDataSource() { + JdbcConnectionPool pool = JdbcConnectionPool.create("jdbc:h2:tcp://localhost/mem:TestDb;LOCK_MODE=0", "sa", ""); + pool.setMaxConnections(48); + return pool; + } + + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener create() { + CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener(); + lsnr.setDataSource(createDataSource()); + return lsnr; + } + } + + /** H2 backed {@link CacheStoreAdapter} implementations */ + public static class H2CacheStore extends CacheStoreAdapter { + /** Store session */ + @CacheStoreSessionResource + private CacheStoreSession ses; + + /** Template for an insert statement */ + private static final String MERGE = "merge into CACHE(k, v) values(?, ?);"; + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure clo, Object... args) { + Connection conn = ses.attachment(); + Statement stmt = null; + ResultSet rs = null; + try { + stmt = conn.createStatement(); + rs = stmt.executeQuery("select * from CACHE"); + while (rs.next()) + clo.apply(deserialize(rs.getBytes(1)), deserialize(rs.getBytes(2))); + } + catch (SQLException e) { + throw new IgniteException(e); + } + finally { + U.closeQuiet(rs); + U.closeQuiet(stmt); + } + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) throws CacheLoaderException { + try { + Object res = getFromDb(ses.attachment(), key); + updateStats("reads"); + return res; + } + catch (SQLException e) { + throw new CacheLoaderException("Failed to load object [key=" + key + ']', e); + } + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) throws CacheWriterException { + try { + putToDb(ses.attachment(), entry.getKey(), entry.getValue()); + updateStats("writes"); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to write object [key=" + entry.getKey() + ", " + + "val=" + entry.getValue() + ']', e); + } + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + try { + removeFromDb(ses.attachment(), key); + updateStats("removes"); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to delete object [key=" + key + ']', e); + } + } + + /** + * Select from H2 and deserialize from bytes the value pointed at by key + * @param conn {@link Connection} to use + * @param key key to llok for the value by + * @return Stored object or null if the key is missing from DB + * @throws SQLException + */ + static Object getFromDb(Connection conn, Object key) throws SQLException { + PreparedStatement stmt = null; + ResultSet rs = null; + try { + stmt = conn.prepareStatement("select v from CACHE where k = ?"); + stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); + rs = stmt.executeQuery(); + return rs.next() ? H2CacheStore.deserialize(rs.getBytes(1)) : null; + } + finally { + U.closeQuiet(rs); + U.closeQuiet(stmt); + } + } + + /** + * Put key-value pair to H2 + * @param conn {@link Connection} to use + * @param key key + * @param val value + * @throws SQLException if failed + */ + static void putToDb(Connection conn, Object key, Object val) throws SQLException { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(H2CacheStore.MERGE); + stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); + stmt.setBinaryStream(2, new ByteArrayInputStream(H2CacheStore.serialize(val))); + stmt.executeUpdate(); + } + finally { + U.closeQuiet(stmt); + } + } + + /** + * Remove given key and its value from H2 + * @param conn {@link Connection} to invoke query upon + * @param key to remove + * @throws SQLException if failed + */ + static void removeFromDb(Connection conn, Object key) throws SQLException { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement("delete from CACHE where k = ?"); + stmt.setBinaryStream(1, new ByteArrayInputStream(H2CacheStore.serialize(key))); + stmt.executeUpdate(); + } + finally { + U.closeQuiet(stmt); + } + } + + /** + * Increment stored stats for given field + * @param field field name + */ + private void updateStats(String field) { + Connection conn = ses.attachment(); + assert conn != null; + Statement stmt = null; + try { + stmt = conn.createStatement(); + stmt.executeUpdate("update STATS set " + field + " = " + field + " + 1;"); + } + catch (SQLException e) { + throw new IgniteException("Failed to update H2 store usage stats", e); + } + finally { + U.closeQuiet(stmt); + } + } + + /** + * Turn given arbitrary {@link Object} to byte array + * @param obj {@link Object} to serialize + * @return bytes representation of given {@link Object} + */ + static byte[] serialize(Object obj) { + try (ByteArrayOutputStream b = new ByteArrayOutputStream()) { + try (ObjectOutputStream o = new ObjectOutputStream(b)) { + o.writeObject(obj); + } + return b.toByteArray(); + } + catch (Exception e) { + throw new IgniteException("Failed to serialize object to byte array [obj=" + obj, e); + } + } + + /** + * Deserialize an object from its byte array representation + * @param bytes byte array representation of the {@link Object} + * @return deserialized {@link Object} + */ + public static Object deserialize(byte[] bytes) { + try (ByteArrayInputStream b = new ByteArrayInputStream(bytes)) { + try (ObjectInputStream o = new ObjectInputStream(b)) { + return o.readObject(); + } + } + catch (Exception e) { + throw new IgniteException("Failed to deserialize object from byte array", e); + } + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java new file mode 100644 index 0000000000000..537edeae4f8c2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.jsr166.ConcurrentHashMap8; + +/** {@link TestCacheStoreStrategy} implemented as a wrapper around {@link #map} */ +public class MapCacheStoreStrategy implements TestCacheStoreStrategy { + /** Removes counter. */ + private final static AtomicInteger removes = new AtomicInteger(); + + /** Writes counter. */ + private final static AtomicInteger writes = new AtomicInteger(); + + /** Reads counter. */ + private final static AtomicInteger reads = new AtomicInteger(); + + /** Store map. */ + private final static Map map = new ConcurrentHashMap8<>(); + + /** {@inheritDoc} */ + @Override public int getReads() { + return reads.get(); + } + + /** {@inheritDoc} */ + @Override public int getWrites() { + return writes.get(); + } + + /** {@inheritDoc} */ + @Override public int getRemoves() { + return removes.get(); + } + + /** {@inheritDoc} */ + @Override public int getStoreSize() { + return map.size(); + } + + /** {@inheritDoc} */ + @Override public void resetStore() { + map.clear(); + + reads.set(0); + writes.set(0); + removes.set(0); + } + + /** {@inheritDoc} */ + @Override public void putToStore(Object key, Object val) { + map.put(key, val); + } + + /** {@inheritDoc} */ + @Override public void putAllToStore(Map data) { + map.putAll(data); + } + + /** {@inheritDoc} */ + @Override public Object getFromStore(Object key) { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public void removeFromStore(Object key) { + map.remove(key); + } + + /** {@inheritDoc} */ + @Override public boolean isInStore(Object key) { + return map.containsKey(key); + } + + /** {@inheritDoc} */ + @Override public void updateCacheConfiguration(CacheConfiguration configuration) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Factory> getStoreFactory() { + return FactoryBuilder.factoryOf(MapCacheStore.class); + } + + /** Serializable {@link #map} backed cache store factory */ + public static class MapStoreFactory implements Factory> { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new MapCacheStore(); + } + } + + /** {@link CacheStore} backed by {@link #map} */ + public static class MapCacheStore extends CacheStoreAdapter { + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure clo, Object... args) { + for (Map.Entry e : map.entrySet()) + clo.apply(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) { + reads.incrementAndGet(); + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry e) { + writes.incrementAndGet(); + map.put(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + removes.incrementAndGet(); + map.remove(key); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java new file mode 100644 index 0000000000000..d5d0d9b091a83 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Map; +import javax.cache.configuration.Factory; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.configuration.CacheConfiguration; + +/** Interface for cache store backend manipulation and stats routines */ +public interface TestCacheStoreStrategy { + /** + * @return Number of reads to store + */ + public int getReads(); + + /** + * @return Number of writes to store + */ + public int getWrites(); + + /** + * @return Number of removals from store + */ + public int getRemoves(); + + /** + * @return Total number of items in the store + */ + public int getStoreSize(); + + /** + * Clear store contents + */ + public void resetStore(); + + /** + * Put entry to cache store + * + * @param key Key. + * @param val Value. + */ + public void putToStore(Object key, Object val); + + /** + * @param data items to put to store + */ + public void putAllToStore(Map data); + + /** + * @param key to look for + * @return {@link Object} pointed to by given key or null if no object is present + */ + public Object getFromStore(Object key); + + /** + * @param key to look for + */ + public void removeFromStore(Object key); + + /** + * @param key to look for + * @return true if object pointed to by key is in store, false otherwise + */ + public boolean isInStore(Object key); + + /** + * Called from {@link #cacheConfiguration(String)}, this method allows implementations to tune cache config + * @param configuration {@link CacheConfiguration} to tune + */ + public void updateCacheConfiguration(CacheConfiguration configuration); + + /** + * @return {@link Factory} for write-through storage emulator + */ + public Factory> getStoreFactory(); +} From d649d76a437c2110def5baf63e944a52b94ab3bf Mon Sep 17 00:00:00 2001 From: Alexander Paschenko Date: Wed, 29 Jun 2016 15:37:08 +0300 Subject: [PATCH 04/10] IGNITE-1088 Switched IgniteCache* tests to new store strategy and unmuted previously failing tests --- ...gniteCacheConfigVariationsFullApiTest.java | 113 +++++++++--------- ...gniteCacheReadThroughEvictionSelfTest.java | 2 +- .../configvariations/ConfigVariations.java | 5 +- ...niteCacheConfigVariationsAbstractTest.java | 67 +++-------- .../cache/IgniteCacheQueryIndexSelfTest.java | 2 +- 5 files changed, 75 insertions(+), 114 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java index 5e8f16258231b..2e9a037313229 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java @@ -351,9 +351,6 @@ public void testRemoveInExplicitLocks() throws Exception { * @throws Exception If failed. */ public void testRemoveAllSkipStore() throws Exception { - if (isMultiJvm()) - fail("https://issues.apache.org/jira/browse/IGNITE-1088"); - if (!storeEnabled()) return; @@ -3655,7 +3652,7 @@ private void checkTtl(boolean inTx, boolean oldEntry) throws Exception { } // Avoid reloading from store. - map.remove(key); + storeStgy.removeFromStore(key); assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() { @SuppressWarnings("unchecked") @@ -4169,12 +4166,12 @@ public void testToMap() throws Exception { for (int i = 0; i < gridCount(); i++) { for (Cache.Entry entry : jcache(i)) - map.put(entry.getKey(), entry.getValue()); + storeStgy.putToStore(entry.getKey(), entry.getValue()); } - assert map.size() == 2; - assert (Integer)map.get("key1") == 1; - assert (Integer)map.get("key2") == 2; + assert storeStgy.getStoreSize() == 2; + assert (Integer)storeStgy.getFromStore("key1") == 1; + assert (Integer)storeStgy.getFromStore("key2") == 2; } /** @@ -4777,7 +4774,7 @@ public void testWithSkipStore() throws Exception { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertEquals(i, map.get(key)); + assertEquals(i, storeStgy.getFromStore(key)); } for (int i = 0; i < keys.size(); ++i) { @@ -4786,13 +4783,13 @@ public void testWithSkipStore() throws Exception { Integer val1 = -1; cacheSkipStore.put(key, val1); - assertEquals(i, map.get(key)); + assertEquals(i, storeStgy.getFromStore(key)); assertEquals(val1, cacheSkipStore.get(key)); Integer val2 = -2; assertEquals(val1, cacheSkipStore.invoke(key, new SetValueProcessor(val2))); - assertEquals(i, map.get(key)); + assertEquals(i, storeStgy.getFromStore(key)); assertEquals(val2, cacheSkipStore.get(key)); } @@ -4801,7 +4798,7 @@ public void testWithSkipStore() throws Exception { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStgy.isInStore(key)); } for (String key : keys) { @@ -4809,37 +4806,37 @@ public void testWithSkipStore() throws Exception { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStgy.isInStore(key)); - map.put(key, 0); + storeStgy.putToStore(key, 0); Integer val = -1; assertNull(cacheSkipStore.invoke(key, new SetValueProcessor(val))); - assertEquals(0, map.get(key)); + assertEquals(0, storeStgy.getFromStore(key)); assertEquals(val, cacheSkipStore.get(key)); cache.remove(key); - map.put(key, 0); + storeStgy.putToStore(key, 0); assertTrue(cacheSkipStore.putIfAbsent(key, val)); assertEquals(val, cacheSkipStore.get(key)); - assertEquals(0, map.get(key)); + assertEquals(0, storeStgy.getFromStore(key)); cache.remove(key); - map.put(key, 0); + storeStgy.putToStore(key, 0); assertNull(cacheSkipStore.getAndPut(key, val)); assertEquals(val, cacheSkipStore.get(key)); - assertEquals(0, map.get(key)); + assertEquals(0, storeStgy.getFromStore(key)); cache.remove(key); } assertFalse(cacheSkipStore.iterator().hasNext()); - assertTrue(map.size() == 0); + assertTrue(storeStgy.getStoreSize() == 0); assertTrue(cache.size(ALL) == 0); // putAll/removeAll from multiple nodes. @@ -4854,7 +4851,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStgy.isInStore(key)); } cache.putAll(data); @@ -4862,7 +4859,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStgy.isInStore(key)); } cacheSkipStore.removeAll(data.keySet()); @@ -4870,7 +4867,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStgy.isInStore(key)); } cacheSkipStore.putAll(data); @@ -4878,7 +4875,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStgy.isInStore(key)); } cacheSkipStore.removeAll(data.keySet()); @@ -4886,7 +4883,7 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStgy.isInStore(key)); } cache.removeAll(data.keySet()); @@ -4894,24 +4891,24 @@ public void testWithSkipStore() throws Exception { for (String key : keys) { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStgy.isInStore(key)); } - assertTrue(map.size() == 0); + assertTrue(storeStgy.getStoreSize() == 0); // Miscellaneous checks. String newKey = "New key"; - assertFalse(map.containsKey(newKey)); + assertFalse(storeStgy.isInStore(newKey)); cacheSkipStore.put(newKey, 1); - assertFalse(map.containsKey(newKey)); + assertFalse(storeStgy.isInStore(newKey)); cache.put(newKey, 1); - assertTrue(map.containsKey(newKey)); + assertTrue(storeStgy.isInStore(newKey)); Iterator> it = cacheSkipStore.iterator(); @@ -4921,20 +4918,20 @@ public void testWithSkipStore() throws Exception { String rmvKey = entry.getKey(); - assertTrue(map.containsKey(rmvKey)); + assertTrue(storeStgy.isInStore(rmvKey)); it.remove(); assertNull(cacheSkipStore.get(rmvKey)); - assertTrue(map.containsKey(rmvKey)); + assertTrue(storeStgy.isInStore(rmvKey)); assertTrue(cache.size(ALL) == 0); assertTrue(cacheSkipStore.size(ALL) == 0); cache.remove(rmvKey); - assertTrue(map.size() == 0); + assertTrue(storeStgy.getStoreSize() == 0); } /** @@ -4964,7 +4961,7 @@ public void testWithSkipStoreRemoveAll() throws Exception { for (String key : data.keySet()) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStgy.isInStore(key)); } cacheSkipStore.removeAll(); @@ -4972,7 +4969,7 @@ public void testWithSkipStoreRemoveAll() throws Exception { for (String key : data.keySet()) { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStgy.isInStore(key)); } cache.removeAll(); @@ -4980,7 +4977,7 @@ public void testWithSkipStoreRemoveAll() throws Exception { for (String key : data.keySet()) { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStgy.isInStore(key)); } } @@ -5054,7 +5051,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key : keys) { assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStgy.isInStore(key)); } tx.commit(); @@ -5063,10 +5060,10 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key : keys) { assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStgy.isInStore(key)); } - assertEquals(0, map.size()); + assertEquals(0, storeStgy.getStoreSize()); // cacheSkipStore putAll(..)/removeAll(..) check. try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { @@ -5080,10 +5077,10 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStgy.isInStore(key)); } - map.putAll(data); + storeStgy.putAllToStore(data); try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { cacheSkipStore.removeAll(data.keySet()); @@ -5094,12 +5091,12 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key : keys) { assertNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStgy.isInStore(key)); cache.remove(key); } - assertTrue(map.size() == 0); + assertTrue(storeStgy.getStoreSize() == 0); // cache putAll(..)/removeAll(..) check. try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { @@ -5108,7 +5105,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key : keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStgy.isInStore(key)); } cache.removeAll(data.keySet()); @@ -5116,13 +5113,13 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key : keys) { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStgy.isInStore(key)); } tx.commit(); } - assertTrue(map.size() == 0); + assertTrue(storeStgy.getStoreSize() == 0); // putAll(..) from both cacheSkipStore and cache. try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { @@ -5143,7 +5140,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key : keys) { assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStgy.isInStore(key)); } tx.commit(); @@ -5154,7 +5151,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStgy.isInStore(key)); } for (int i = keys.size() / 2; i < keys.size(); i++) { @@ -5162,7 +5159,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, assertNotNull(cacheSkipStore.get(key)); assertNotNull(cache.get(key)); - assertTrue(map.containsKey(key)); + assertTrue(storeStgy.isInStore(key)); } cache.removeAll(data.keySet()); @@ -5170,7 +5167,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, for (String key : keys) { assertNull(cacheSkipStore.get(key)); assertNull(cache.get(key)); - assertFalse(map.containsKey(key)); + assertFalse(storeStgy.isInStore(key)); } // Check that read-through is disabled when cacheSkipStore is used. @@ -5179,7 +5176,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, assertTrue(cacheSkipStore.size(ALL) == 0); assertTrue(cache.size(ALL) == 0); - assertTrue(map.size() != 0); + assertTrue(storeStgy.getStoreSize() != 0); try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { assertTrue(cacheSkipStore.getAll(data.keySet()).size() == 0); @@ -5202,7 +5199,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { for (String key : data.keySet()) { - map.put(key, 0); + storeStgy.putToStore(key, 0); assertNull(cacheSkipStore.invoke(key, new SetValueProcessor(val))); } @@ -5211,7 +5208,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, } for (String key : data.keySet()) { - assertEquals(0, map.get(key)); + assertEquals(0, storeStgy.getFromStore(key)); assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); @@ -5221,7 +5218,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { for (String key : data.keySet()) { - map.put(key, 0); + storeStgy.putToStore(key, 0); assertTrue(cacheSkipStore.putIfAbsent(key, val)); } @@ -5230,7 +5227,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, } for (String key : data.keySet()) { - assertEquals(0, map.get(key)); + assertEquals(0, storeStgy.getFromStore(key)); assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); @@ -5240,7 +5237,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) { for (String key : data.keySet()) { - map.put(key, 0); + storeStgy.putToStore(key, 0); assertNull(cacheSkipStore.getAndPut(key, val)); } @@ -5249,7 +5246,7 @@ private void checkSkipStoreWithTransaction(IgniteCache cache, } for (String key : data.keySet()) { - assertEquals(0, map.get(key)); + assertEquals(0, storeStgy.getFromStore(key)); assertEquals(val, cacheSkipStore.get(key)); assertEquals(val, cache.get(key)); @@ -5268,7 +5265,7 @@ private void checkEmpty(IgniteCache cache, IgniteCache CACHE_STORE_PARAM = Parameters.complexParameter( - Parameters.parameter("setCacheStoreFactory", Parameters.factory(IgniteCacheConfigVariationsAbstractTest.TestStoreFactory.class)), + Parameters.parameter("setCacheStoreFactory", Parameters.factory(MapCacheStoreStrategy.MapStoreFactory.class)), Parameters.parameter("setReadThrough", true), Parameters.parameter("setWriteThrough", true), Parameters.parameter("setCacheStoreSessionListenerFactories", noopCacheStoreSessionListenerFactory()) @@ -71,7 +72,7 @@ public class ConfigVariations { /** */ private static final ConfigParameter SIMPLE_CACHE_STORE_PARAM = Parameters.complexParameter( - Parameters.parameter("setCacheStoreFactory", Parameters.factory(IgniteCacheConfigVariationsAbstractTest.TestStoreFactory.class)), + Parameters.parameter("setCacheStoreFactory", Parameters.factory(MapCacheStoreStrategy.MapStoreFactory.class)), Parameters.parameter("setReadThrough", true), Parameters.parameter("setWriteThrough", true) ); diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java index bcea6031017a0..403835aae7a50 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java @@ -17,9 +17,7 @@ package org.apache.ignite.testframework.junits; -import java.util.Map; import javax.cache.Cache; -import javax.cache.configuration.Factory; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -29,8 +27,6 @@ import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; -import org.apache.ignite.cache.store.CacheStore; -import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; @@ -38,15 +34,16 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.H2CacheStoreStrategy; +import org.apache.ignite.internal.processors.cache.MapCacheStoreStrategy; +import org.apache.ignite.internal.processors.cache.TestCacheStoreStrategy; import org.apache.ignite.internal.util.lang.GridAbsPredicateX; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.configvariations.CacheStartMode; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; @@ -62,8 +59,8 @@ public abstract class IgniteCacheConfigVariationsAbstractTest extends IgniteConf /** Test timeout. */ private static final long TEST_TIMEOUT = 30 * 1000; - /** Store map. */ - protected static final Map map = new ConcurrentHashMap8<>(); + /** */ + protected static TestCacheStoreStrategy storeStgy; /** {@inheritDoc} */ @Override protected long getTestTimeout() { @@ -72,6 +69,7 @@ public abstract class IgniteCacheConfigVariationsAbstractTest extends IgniteConf /** {@inheritDoc} */ @Override protected final void beforeTestsStarted() throws Exception { + initStoreStrategy(); assert testsCfg != null; assert !testsCfg.withClients() || testsCfg.gridCount() >= 3; @@ -135,6 +133,12 @@ else if (cacheStartMode == null || cacheStartMode == CacheStartMode.DYNAMIC) { } } + /** Initialize {@link #storeStgy} with respect to the nature of the test */ + void initStoreStrategy() throws IgniteCheckedException { + if (storeStgy == null) + storeStgy = isMultiJvm() ? new H2CacheStoreStrategy() : new MapCacheStoreStrategy(); + } + /** * Starts caches dynamically. * @@ -189,7 +193,7 @@ private void startCachesDinamically() throws Exception { } } - map.clear(); + storeStgy.resetStore(); super.afterTestsStopped(); } @@ -313,7 +317,7 @@ private void startCachesDinamically() throws Exception { if (cacheIsNotEmptyMsg == null) assertEquals("Cache is not empty", 0, jcache().localSize(CachePeekMode.ALL)); - resetStore(); + storeStgy.resetStore(); // Restore cache if current cache has garbage. if (cacheIsNotEmptyMsg != null) { @@ -350,13 +354,6 @@ private void startCachesDinamically() throws Exception { assertEquals(0, jcache().size()); } - /** - * Cleans up cache store. - */ - protected void resetStore() { - map.clear(); - } - /** * Put entry to cache store. * @@ -367,7 +364,7 @@ protected void putToStore(Object key, Object val) { if (!storeEnabled()) throw new IllegalStateException("Failed to put to store because store is disabled."); - map.put(key, val); + storeStgy.putToStore(key, val); } /** @@ -431,31 +428,6 @@ protected boolean swapEnabled() { return cacheConfiguration().isSwapEnabled(); } - /** - * @return Write through storage emulator. - */ - public static CacheStore cacheStore() { - return new CacheStoreAdapter() { - @Override public void loadCache(IgniteBiInClosure clo, - Object... args) { - for (Map.Entry e : map.entrySet()) - clo.apply(e.getKey(), e.getValue()); - } - - @Override public Object load(Object key) { - return map.get(key); - } - - @Override public void write(Cache.Entry e) { - map.put(e.getKey(), e.getValue()); - } - - @Override public void delete(Object key) { - map.remove(key); - } - }; - } - /** * @return {@code true} if near cache should be enabled. */ @@ -572,13 +544,4 @@ protected static boolean offheapTiered(IgniteCache cache) { protected static boolean containsKey(IgniteCache cache, Object key) throws Exception { return offheapTiered(cache) ? cache.localPeek(key, CachePeekMode.OFFHEAP) != null : cache.containsKey(key); } - - /** - * Serializable factory. - */ - public static class TestStoreFactory implements Factory { - @Override public CacheStore create() { - return cacheStore(); - } - } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java index d834eb3060f97..73b665ed2f614 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryIndexSelfTest.java @@ -74,7 +74,7 @@ public void testWithoutStoreLoad() throws Exception { */ public void testWithStoreLoad() throws Exception { for (int i = 0; i < ENTRY_CNT; i++) - putToStore(i, new CacheValue(i)); + storeStgy.putToStore(i, new CacheValue(i)); IgniteCache cache0 = grid(0).cache(null); From da9b24ea29c34d3906009f973994674b04b018c6 Mon Sep 17 00:00:00 2001 From: Alexander Paschenko Date: Wed, 29 Jun 2016 16:45:10 +0300 Subject: [PATCH 05/10] Compilation fix (trivial) --- .../internal/processors/cache/H2CacheStoreStrategy.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java index 02718707168f2..e15afa423b159 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java @@ -293,7 +293,8 @@ public static class H2CacheStore extends CacheStoreAdapter { /** {@inheritDoc} */ @Override public Object load(Object key) throws CacheLoaderException { try { - Object res = getFromDb(ses.attachment(), key); + Connection conn = ses.attachment(); + Object res = getFromDb(conn, key); updateStats("reads"); return res; } @@ -305,7 +306,8 @@ public static class H2CacheStore extends CacheStoreAdapter { /** {@inheritDoc} */ @Override public void write(Cache.Entry entry) throws CacheWriterException { try { - putToDb(ses.attachment(), entry.getKey(), entry.getValue()); + Connection conn = ses.attachment(); + putToDb(conn, entry.getKey(), entry.getValue()); updateStats("writes"); } catch (SQLException e) { @@ -317,7 +319,8 @@ public static class H2CacheStore extends CacheStoreAdapter { /** {@inheritDoc} */ @Override public void delete(Object key) throws CacheWriterException { try { - removeFromDb(ses.attachment(), key); + Connection conn = ses.attachment(); + removeFromDb(conn, key); updateStats("removes"); } catch (SQLException e) { From c34f48288f5ce6b7e83bc3ba61169b4badad4e30 Mon Sep 17 00:00:00 2001 From: Alexander Paschenko Date: Wed, 29 Jun 2016 18:53:46 +0300 Subject: [PATCH 06/10] H2 version raise --- parent/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parent/pom.xml b/parent/pom.xml index d78deacf5b701..66d35d2484eac 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -69,7 +69,7 @@ 18.0 14.0.1 16.0.1 - 1.3.175 + 1.4.179 2.4.1 4.5.1 4.4.3 From 835b1b8ae0d8590e986c4b3ae4d2f36b29662bd5 Mon Sep 17 00:00:00 2001 From: Alexander Paschenko Date: Wed, 29 Jun 2016 19:10:36 +0300 Subject: [PATCH 07/10] H2CacheStoreStrategy - raised max connections number for test pool --- .../ignite/internal/processors/cache/H2CacheStoreStrategy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java index e15afa423b159..6f9f54a7da336 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java @@ -249,7 +249,7 @@ public static class H2CacheStoreSessionListenerFactory implements Factory Date: Wed, 29 Jun 2016 19:35:48 +0300 Subject: [PATCH 08/10] H2CacheStoreStrategy - store stats in separate tables for better concurrency --- .../cache/H2CacheStoreStrategy.java | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java index 6f9f54a7da336..49d3f79f9031c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java @@ -57,14 +57,17 @@ public class H2CacheStoreStrategy implements TestCacheStoreStrategy { "create table if not exists CACHE(k binary not null, v binary not null, PRIMARY KEY(k));"; /** Script that creates STATS table */ - private static final String CREATE_STATS_TABLE = - "create table if not exists STATS(id bigint not null, reads int not null, writes int not null, " + - "removes int not null, PRIMARY KEY(id));"; + private static final String CREATE_STATS_TABLES = + "create table if not exists READS(id bigint auto_increment);\n" + + "create table if not exists WRITES(id bigint auto_increment);\n" + + "create table if not exists REMOVES(id bigint auto_increment);"; + /** Script that populates STATS table */ private static final String POPULATE_STATS_TABLE = - "delete from STATS;\n" + - "insert into STATS(id, reads, writes, removes) values(1, 0, 0, 0);"; + "delete from READS;\n" + + "delete from WRITES;\n" + + "delete from REMOVES;"; /** */ public H2CacheStoreStrategy() throws IgniteCheckedException { @@ -74,7 +77,7 @@ public H2CacheStoreStrategy() throws IgniteCheckedException { try (Connection conn = connection()) { RunScript.execute(conn, new StringReader(CREATE_CACHE_TABLE)); - RunScript.execute(conn, new StringReader(CREATE_STATS_TABLE)); + RunScript.execute(conn, new StringReader(CREATE_STATS_TABLES)); RunScript.execute(conn, new StringReader(POPULATE_STATS_TABLE)); } } @@ -85,17 +88,21 @@ public H2CacheStoreStrategy() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public int getReads() { - return querySingleInt("select reads from STATS limit 0,1;", "Failed to query number of reads from STATS table"); + return queryStats("reads"); } /** {@inheritDoc} */ @Override public int getWrites() { - return querySingleInt("select writes from STATS limit 0,1;", "Failed to query number of writes from STATS table"); + return queryStats("writes"); } /** {@inheritDoc} */ @Override public int getRemoves() { - return querySingleInt("select removes from STATS limit 0,1;", "Failed to query number of removals from STATS table"); + return queryStats("removes"); + } + + private int queryStats(String table) { + return querySingleInt("select count(*) from " + table, "Failed to query store stats [table=" + table + "]"); } /** {@inheritDoc} */ @@ -389,16 +396,16 @@ static void removeFromDb(Connection conn, Object key) throws SQLException { } /** - * Increment stored stats for given field - * @param field field name + * Increment stored stats for given operation + * @param tableName field name */ - private void updateStats(String field) { + private void updateStats(String tableName) { Connection conn = ses.attachment(); assert conn != null; Statement stmt = null; try { stmt = conn.createStatement(); - stmt.executeUpdate("update STATS set " + field + " = " + field + " + 1;"); + stmt.executeUpdate("insert into " + tableName + " default values"); } catch (SQLException e) { throw new IgniteException("Failed to update H2 store usage stats", e); From f9fa1742aeb87da2353cacf5a5641e1fadfceb8d Mon Sep 17 00:00:00 2001 From: Alexander Paschenko Date: Thu, 30 Jun 2016 13:09:57 +0300 Subject: [PATCH 09/10] Enabled few tests for multi JVM mode --- .../cache/IgniteCacheConfigVariationsFullApiTest.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java index 2e9a037313229..7e6d0150cc99c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java @@ -2568,9 +2568,6 @@ public void testDeletedEntriesFlag() throws Exception { * @throws Exception If failed. */ public void testRemoveLoad() throws Exception { - if (isMultiJvm()) - fail("https://issues.apache.org/jira/browse/IGNITE-1088"); - if (!storeEnabled()) return; @@ -4723,9 +4720,6 @@ protected void testGlobalClearKey(boolean async, Collection keysToRmv) t * @throws Exception If failed. */ public void testWithSkipStore() throws Exception { - if (isMultiJvm()) - fail("https://issues.apache.org/jira/browse/IGNITE-1088"); - if (!storeEnabled()) return; @@ -4938,9 +4932,6 @@ public void testWithSkipStore() throws Exception { * @throws Exception If failed. */ public void testWithSkipStoreRemoveAll() throws Exception { - if (isMultiJvm()) - fail("https://issues.apache.org/jira/browse/IGNITE-1088"); - if (atomicityMode() == TRANSACTIONAL || (atomicityMode() == ATOMIC && nearEnabled())) // TODO IGNITE-373. return; From 2776f9ff571a8eb0260931630d4b7a9539bb349d Mon Sep 17 00:00:00 2001 From: sboikov Date: Fri, 1 Jul 2016 10:00:42 +0300 Subject: [PATCH 10/10] ignite-1088 Minor --- .../GridCacheAbstractFullApiSelfTest.java | 10 +- .../cache/GridCacheAbstractSelfTest.java | 10 +- .../cache/H2CacheStoreStrategy.java | 97 +++++++++++-------- .../cache/MapCacheStoreStrategy.java | 7 +- .../cache/TestCacheStoreStrategy.java | 36 ++++--- .../configvariations/ConfigVariations.java | 1 - parent/pom.xml | 2 +- 7 files changed, 93 insertions(+), 70 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index bc2593138fbe1..dff4d4327a033 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -223,10 +223,10 @@ protected CacheMemoryMode memoryMode() { /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { initStoreStrategy(); + if (cacheStartType() == CacheStartMode.STATIC) super.beforeTestsStarted(); else { - initStoreStrategy(); cacheCfgMap = Collections.synchronizedMap(new HashMap()); if (cacheStartType() == CacheStartMode.NODES_THEN_CACHES) { @@ -269,9 +269,7 @@ protected CacheMemoryMode memoryMode() { } /** - * Checks that skipStore flag gets overriden inside a transaction. - * - * @throws Exception if failed. + * Checks that skipStore flag gets overridden inside a transaction. */ public void testWriteThroughTx() { String key = "writeThroughKey"; @@ -297,9 +295,7 @@ public void testWriteThroughTx() { } /** - * Checks that skipStore flag gets overriden inside a transaction. - * - * @throws Exception if failed. + * Checks that skipStore flag gets overridden inside a transaction. */ public void testNoReadThroughTx() { String key = "writeThroughKey"; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java index adb0b543a864f..dcf5d98d57fba 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java @@ -95,10 +95,15 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { @Override protected void afterTestsStopped() throws Exception { stopAllGrids(); - storeStgy.resetStore(); + if (storeStgy != null) + storeStgy.resetStore(); } - /** Initialize {@link #storeStgy} with respect to the nature of the test */ + /** + * Initializes {@link #storeStgy} with respect to the nature of the test. + * + * @throws IgniteCheckedException If failed. + */ void initStoreStrategy() throws IgniteCheckedException { if (storeStgy == null) storeStgy = isMultiJvm() ? new H2CacheStoreStrategy() : new MapCacheStoreStrategy(); @@ -216,6 +221,7 @@ protected CacheConfiguration cacheConfiguration(String gridName) throws Exceptio CacheConfiguration cfg = defaultCacheConfiguration(); Factory> storeFactory = storeStgy.getStoreFactory(); + CacheStore store = storeFactory.create(); if (store != null) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java index 49d3f79f9031c..ccb299438ce86 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/H2CacheStoreStrategy.java @@ -47,29 +47,33 @@ import org.h2.tools.RunScript; import org.h2.tools.Server; -/** {@link TestCacheStoreStrategy} backed by H2 in-memory database */ +/** + * {@link TestCacheStoreStrategy} backed by H2 in-memory database. + */ public class H2CacheStoreStrategy implements TestCacheStoreStrategy { - /** Pool to get {@link Connection}s from */ + /** Pool to get {@link Connection}s from. */ private final JdbcConnectionPool dataSrc; - /** Script that creates CACHE table */ + /** Script that creates CACHE table. */ private static final String CREATE_CACHE_TABLE = "create table if not exists CACHE(k binary not null, v binary not null, PRIMARY KEY(k));"; - /** Script that creates STATS table */ + /** Script that creates STATS table. */ private static final String CREATE_STATS_TABLES = "create table if not exists READS(id bigint auto_increment);\n" + "create table if not exists WRITES(id bigint auto_increment);\n" + "create table if not exists REMOVES(id bigint auto_increment);"; - /** Script that populates STATS table */ private static final String POPULATE_STATS_TABLE = "delete from READS;\n" + "delete from WRITES;\n" + "delete from REMOVES;"; - /** */ + + /** + * @throws IgniteCheckedException If failed. + */ public H2CacheStoreStrategy() throws IgniteCheckedException { try { Server.createTcpServer().start(); @@ -101,8 +105,12 @@ public H2CacheStoreStrategy() throws IgniteCheckedException { return queryStats("removes"); } - private int queryStats(String table) { - return querySingleInt("select count(*) from " + table, "Failed to query store stats [table=" + table + "]"); + /** + * @param tbl Table name. + * @return Update statistics. + */ + private int queryStats(String tbl) { + return querySingleInt("select count(*) from " + tbl, "Failed to query store stats [table=" + tbl + "]"); } /** {@inheritDoc} */ @@ -203,10 +211,11 @@ private Connection connection() throws SQLException { } /** - * Retrieve single int value from {@link ResultSet} returned by given query - * @param qry Query string (fully populated, with params) - * @param errorMsg Message for {@link IgniteException} to bear in case of failure - * @return requested value + * Retrieves single int value from {@link ResultSet} returned by given query. + * + * @param qry Query string (fully populated, with params). + * @param errorMsg Message for {@link IgniteException} to bear in case of failure. + * @return Requested value */ private int querySingleInt(String qry, String errorMsg) { Connection conn = null; @@ -232,8 +241,8 @@ private int querySingleInt(String qry, String errorMsg) { } /** {@inheritDoc} */ - @Override public void updateCacheConfiguration(CacheConfiguration configuration) { - configuration.setCacheStoreSessionListenerFactories(new H2CacheStoreSessionListenerFactory()); + @Override public void updateCacheConfiguration(CacheConfiguration cfg) { + cfg.setCacheStoreSessionListenerFactories(new H2CacheStoreSessionListenerFactory()); } /** {@inheritDoc} */ @@ -241,7 +250,7 @@ private int querySingleInt(String qry, String errorMsg) { return new H2StoreFactory(); } - /** Serializable H2 backed cache store factory */ + /** Serializable H2 backed cache store factory. */ public static class H2StoreFactory implements Factory> { /** {@inheritDoc} */ @Override public CacheStore create() { @@ -249,7 +258,7 @@ public static class H2StoreFactory implements Factory } } - /** Serializable {@link Factory} producing H2 backed {@link CacheStoreSessionListener}s */ + /** Serializable {@link Factory} producing H2 backed {@link CacheStoreSessionListener}s. */ public static class H2CacheStoreSessionListenerFactory implements Factory { /** * @return Connection pool @@ -280,6 +289,8 @@ public static class H2CacheStore extends CacheStoreAdapter { /** {@inheritDoc} */ @Override public void loadCache(IgniteBiInClosure clo, Object... args) { Connection conn = ses.attachment(); + assert conn != null; + Statement stmt = null; ResultSet rs = null; try { @@ -336,11 +347,12 @@ public static class H2CacheStore extends CacheStoreAdapter { } /** - * Select from H2 and deserialize from bytes the value pointed at by key - * @param conn {@link Connection} to use - * @param key key to llok for the value by - * @return Stored object or null if the key is missing from DB - * @throws SQLException + * Selects from H2 and deserialize from bytes the value pointed by key. + * + * @param conn {@link Connection} to use. + * @param key Key to look for. + * @return Stored object or null if the key is missing from DB. + * @throws SQLException If failed. */ static Object getFromDb(Connection conn, Object key) throws SQLException { PreparedStatement stmt = null; @@ -358,11 +370,12 @@ static Object getFromDb(Connection conn, Object key) throws SQLException { } /** - * Put key-value pair to H2 - * @param conn {@link Connection} to use - * @param key key - * @param val value - * @throws SQLException if failed + * Puts key-value pair to H2. + * + * @param conn {@link Connection} to use. + * @param key Key. + * @param val Value. + * @throws SQLException If failed. */ static void putToDb(Connection conn, Object key, Object val) throws SQLException { PreparedStatement stmt = null; @@ -378,10 +391,11 @@ static void putToDb(Connection conn, Object key, Object val) throws SQLException } /** - * Remove given key and its value from H2 - * @param conn {@link Connection} to invoke query upon - * @param key to remove - * @throws SQLException if failed + * Removes given key and its value from H2. + * + * @param conn {@link Connection} to invoke query upon. + * @param key Key to remove. + * @throws SQLException if failed. */ static void removeFromDb(Connection conn, Object key) throws SQLException { PreparedStatement stmt = null; @@ -396,16 +410,17 @@ static void removeFromDb(Connection conn, Object key) throws SQLException { } /** - * Increment stored stats for given operation - * @param tableName field name + * Increments stored stats for given operation. + * + * @param tblName Table name */ - private void updateStats(String tableName) { + private void updateStats(String tblName) { Connection conn = ses.attachment(); assert conn != null; Statement stmt = null; try { stmt = conn.createStatement(); - stmt.executeUpdate("insert into " + tableName + " default values"); + stmt.executeUpdate("insert into " + tblName + " default values"); } catch (SQLException e) { throw new IgniteException("Failed to update H2 store usage stats", e); @@ -416,9 +431,10 @@ private void updateStats(String tableName) { } /** - * Turn given arbitrary {@link Object} to byte array - * @param obj {@link Object} to serialize - * @return bytes representation of given {@link Object} + * Turns given arbitrary object to byte array. + * + * @param obj Object to serialize + * @return Bytes representation of given object. */ static byte[] serialize(Object obj) { try (ByteArrayOutputStream b = new ByteArrayOutputStream()) { @@ -433,9 +449,10 @@ static byte[] serialize(Object obj) { } /** - * Deserialize an object from its byte array representation - * @param bytes byte array representation of the {@link Object} - * @return deserialized {@link Object} + * Deserializes an object from its byte array representation. + * + * @param bytes Byte array representation of the object. + * @return Deserialized object. */ public static Object deserialize(byte[] bytes) { try (ByteArrayInputStream b = new ByteArrayInputStream(bytes)) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java index 537edeae4f8c2..800d781bbeafc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java @@ -28,7 +28,9 @@ import org.apache.ignite.lang.IgniteBiInClosure; import org.jsr166.ConcurrentHashMap8; -/** {@link TestCacheStoreStrategy} implemented as a wrapper around {@link #map} */ +/** + * {@link TestCacheStoreStrategy} implemented as a wrapper around {@link #map} + */ public class MapCacheStoreStrategy implements TestCacheStoreStrategy { /** Removes counter. */ private final static AtomicInteger removes = new AtomicInteger(); @@ -97,7 +99,7 @@ public class MapCacheStoreStrategy implements TestCacheStoreStrategy { } /** {@inheritDoc} */ - @Override public void updateCacheConfiguration(CacheConfiguration configuration) { + @Override public void updateCacheConfiguration(CacheConfiguration cfg) { // No-op. } @@ -116,7 +118,6 @@ public static class MapStoreFactory implements Factory { - /** {@inheritDoc} */ @Override public void loadCache(IgniteBiInClosure clo, Object... args) { for (Map.Entry e : map.entrySet()) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java index d5d0d9b091a83..9ee174a36ea71 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java @@ -22,35 +22,37 @@ import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.configuration.CacheConfiguration; -/** Interface for cache store backend manipulation and stats routines */ +/** + * Interface for cache store backend manipulation and stats routines. + */ public interface TestCacheStoreStrategy { /** - * @return Number of reads to store + * @return Number of reads to store. */ public int getReads(); /** - * @return Number of writes to store + * @return Number of writes to store. */ public int getWrites(); /** - * @return Number of removals from store + * @return Number of removals from store. */ public int getRemoves(); /** - * @return Total number of items in the store + * @return Total number of items in the store. */ public int getStoreSize(); /** - * Clear store contents + * Clear store contents. */ public void resetStore(); /** - * Put entry to cache store + * Put entry to cache store. * * @param key Key. * @param val Value. @@ -58,13 +60,13 @@ public interface TestCacheStoreStrategy { public void putToStore(Object key, Object val); /** - * @param data items to put to store + * @param data Items to put to store. */ public void putAllToStore(Map data); /** - * @param key to look for - * @return {@link Object} pointed to by given key or null if no object is present + * @param key Key to look for. + * @return {@link Object} pointed to by given key or {@code null} if no object is present. */ public Object getFromStore(Object key); @@ -74,19 +76,21 @@ public interface TestCacheStoreStrategy { public void removeFromStore(Object key); /** - * @param key to look for - * @return true if object pointed to by key is in store, false otherwise + * @param key to look for. + * @return {@code True} if object pointed to by key is in store, false otherwise. */ public boolean isInStore(Object key); /** - * Called from {@link #cacheConfiguration(String)}, this method allows implementations to tune cache config - * @param configuration {@link CacheConfiguration} to tune + * Called from {@link GridCacheAbstractSelfTest#cacheConfiguration(String)}, + * this method allows implementations to tune cache config. + * + * @param cfg {@link CacheConfiguration} to tune. */ - public void updateCacheConfiguration(CacheConfiguration configuration); + public void updateCacheConfiguration(CacheConfiguration cfg); /** - * @return {@link Factory} for write-through storage emulator + * @return {@link Factory} for write-through storage emulator. */ public Factory> getStoreFactory(); } diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java b/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java index 24a2d6efecb64..4666581bead00 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java @@ -47,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.MapCacheStoreStrategy; import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi; -import org.apache.ignite.testframework.junits.IgniteCacheConfigVariationsAbstractTest; import static org.apache.ignite.internal.util.lang.GridFunc.asArray; diff --git a/parent/pom.xml b/parent/pom.xml index 66d35d2484eac..d78deacf5b701 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -69,7 +69,7 @@ 18.0 14.0.1 16.0.1 - 1.4.179 + 1.3.175 2.4.1 4.5.1 4.4.3