Skip to content

Commit

Permalink
IGNITE-891 - Cache store improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Valentin Kulichenko committed May 19, 2015
1 parent 79258ba commit 16f045f
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 63 deletions.
Expand Up @@ -160,7 +160,7 @@ public boolean removeAll(@Nullable IgniteInternalTx tx, Collection<Object> keys)
* @param commit Commit.
* @throws IgniteCheckedException If failed.
*/
public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException;
public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException;

/**
* End session initiated by write-behind store.
Expand Down
Expand Up @@ -637,8 +637,7 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx,
}

/** {@inheritDoc} */
@Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys)
throws IgniteCheckedException {
@Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys) throws IgniteCheckedException {
if (F.isEmpty(keys))
return true;

Expand Down Expand Up @@ -700,7 +699,7 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx,
}

/** {@inheritDoc} */
@Override public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException {
@Override public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException {
assert store != null;

sessionInit0(tx);
Expand All @@ -711,10 +710,11 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx,
lsnr.onSessionEnd(locSes, commit);
}

store.sessionEnd(commit);
if (!sesHolder.get().storeEnded(store))
store.sessionEnd(commit);
}
finally {
if (sesHolder != null) {
if (last && sesHolder != null) {
sesHolder.set(null);

tx.removeMeta(SES_ATTR);
Expand Down Expand Up @@ -752,7 +752,6 @@ private void handleClassCastException(ClassCastException e) throws IgniteChecked
*/
private void sessionInit0(@Nullable IgniteInternalTx tx) {
assert sesHolder != null;
assert sesHolder.get() == null;

SessionData ses;

Expand Down Expand Up @@ -794,7 +793,8 @@ private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws
lsnr.onSessionEnd(locSes, !threwEx);
}

store.sessionEnd(!threwEx);
if (!sesHolder.get().storeEnded(store))
store.sessionEnd(!threwEx);
}
}
catch (Exception e) {
Expand Down Expand Up @@ -840,6 +840,9 @@ private static class SessionData {
/** */
private boolean started;

/** */
private final Set<CacheStore> endedStores = new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>());

/**
* @param tx Current transaction.
* @param cacheName Cache name.
Expand Down Expand Up @@ -893,6 +896,14 @@ private boolean started() {
return started;
}

/**
* @param store Cache store.
* @return Whether session already ended on this store instance.
*/
private boolean storeEnded(CacheStore store) {
return !endedStores.add(store);
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SessionData.class, this, "tx", CU.txString(tx));
Expand Down
Expand Up @@ -531,11 +531,13 @@ protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws Ign
}

// Batch-process puts if cache ID has changed.
if (writeStore != null && writeStore != cacheCtx.store() && putMap != null && !putMap.isEmpty()) {
writeStore.putAll(this, putMap);
if (writeStore != null && writeStore != cacheCtx.store()) {
if (putMap != null && !putMap.isEmpty()) {
writeStore.putAll(this, putMap);

// Reset.
putMap.clear();
// Reset.
putMap.clear();
}

writeStore = null;
}
Expand Down Expand Up @@ -574,11 +576,13 @@ else if (op == DELETE) {
writeStore = null;
}

if (writeStore != null && writeStore != cacheCtx.store() && rmvCol != null && !rmvCol.isEmpty()) {
writeStore.removeAll(this, rmvCol);
if (writeStore != null && writeStore != cacheCtx.store()) {
if (rmvCol != null && !rmvCol.isEmpty()) {
writeStore.removeAll(this, rmvCol);

// Reset.
rmvCol.clear();
// Reset.
rmvCol.clear();
}

writeStore = null;
}
Expand Down Expand Up @@ -623,8 +627,7 @@ else if (log.isDebugEnabled())
}

// Commit while locks are held.
for (CacheStoreManager store : stores)
store.sessionEnd(this, true);
sessionEnd(stores, true);
}
catch (IgniteCheckedException ex) {
commitError(ex);
Expand All @@ -649,6 +652,10 @@ else if (log.isDebugEnabled())

throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex);
}
finally {
if (isRollbackOnly())
sessionEnd(stores, false);
}
}
}

Expand Down Expand Up @@ -984,13 +991,12 @@ else if (op == READ) {
cctx.tm().resetContext();
}
}
else {
else if (!internal()) {
Collection<CacheStoreManager> stores = stores();

if (stores != null && !stores.isEmpty() && !internal()) {
if (stores != null && !stores.isEmpty()) {
try {
for (CacheStoreManager store : stores)
store.sessionEnd(this, true);
sessionEnd(stores, true);
}
catch (IgniteCheckedException e) {
commitError(e);
Expand Down Expand Up @@ -1091,13 +1097,11 @@ public Collection<GridCacheVersion> rolledbackVersions() {

cctx.tm().rollbackTx(this);

Collection<CacheStoreManager> stores = stores();
if (!internal()) {
Collection<CacheStoreManager> stores = stores();

if (stores != null && !stores.isEmpty() && (near() || F.first(stores).isWriteToStoreFromDht())) {
if (!internal()) {
for (CacheStoreManager store : stores)
store.sessionEnd(this, false);
}
if (stores != null && !stores.isEmpty() && (near() || F.first(stores).isWriteToStoreFromDht()))
sessionEnd(stores, false);
}
}
catch (Error | IgniteCheckedException | RuntimeException e) {
Expand All @@ -1108,6 +1112,21 @@ public Collection<GridCacheVersion> rolledbackVersions() {
}
}

/**
* @param stores Store managers.
* @param commit Commit flag.
* @throws IgniteCheckedException In case of error.
*/
private void sessionEnd(Collection<CacheStoreManager> stores, boolean commit) throws IgniteCheckedException {
Iterator<CacheStoreManager> it = stores.iterator();

while (it.hasNext()) {
CacheStoreManager store = it.next();

store.sessionEnd(this, commit, !it.hasNext());
}
}

/**
* Checks if there is a cached or swapped value for
* {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean, boolean)} method.
Expand Down
Expand Up @@ -20,14 +20,17 @@
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.testframework.junits.common.*;
import org.apache.ignite.transactions.*;

import javax.cache.configuration.*;
import javax.cache.integration.*;
import java.io.*;
import java.sql.*;
import java.util.concurrent.atomic.*;

/**
Expand All @@ -37,6 +40,9 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
/** */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);

/** */
protected static final String URL = "jdbc:h2:mem:example;DB_CLOSE_DELAY=-1";

/** */
protected static final AtomicInteger loadCacheCnt = new AtomicInteger();

Expand All @@ -52,6 +58,12 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm
/** */
protected static final AtomicInteger reuseCnt = new AtomicInteger();

/** */
protected static final AtomicBoolean write = new AtomicBoolean();

/** */
protected static final AtomicBoolean fail = new AtomicBoolean();

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
Expand All @@ -77,11 +89,22 @@ public abstract class CacheStoreSessionListenerAbstractSelfTest extends GridComm

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
try (Connection conn = DriverManager.getConnection(URL)) {
conn.createStatement().executeUpdate("DROP TABLE IF EXISTS Table1");
conn.createStatement().executeUpdate("DROP TABLE IF EXISTS Table2");

conn.createStatement().executeUpdate("CREATE TABLE Table1 (key INT, value INT)");
conn.createStatement().executeUpdate("CREATE TABLE Table2 (key INT, value INT)");
}

loadCacheCnt.set(0);
loadCnt.set(0);
writeCnt.set(0);
deleteCnt.set(0);
reuseCnt.set(0);

write.set(false);
fail.set(false);
}

/**
Expand Down Expand Up @@ -173,6 +196,94 @@ public void testCrossCacheTransaction() throws Exception {
assertEquals(3, reuseCnt.get());
}

/**
* @throws Exception If failed.
*/
public void testCommit() throws Exception {
write.set(true);

CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);

try (
IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
) {
try (Transaction tx = ignite(0).transactions().txStart()) {
cache1.put(1, 1);
cache2.put(2, 2);

tx.commit();
}
}

try (Connection conn = DriverManager.getConnection(URL)) {
checkTable(conn, 1, false);
checkTable(conn, 2, false);
}
}

/**
* @throws Exception If failed.
*/
public void testRollback() throws Exception {
write.set(true);
fail.set(true);

CacheConfiguration<Integer, Integer> cfg1 = cacheConfiguration("cache1", CacheAtomicityMode.TRANSACTIONAL);
CacheConfiguration<Integer, Integer> cfg2 = cacheConfiguration("cache2", CacheAtomicityMode.TRANSACTIONAL);

try (
IgniteCache<Integer, Integer> cache1 = ignite(0).createCache(cfg1);
IgniteCache<Integer, Integer> cache2 = ignite(0).createCache(cfg2)
) {
try (Transaction tx = ignite(0).transactions().txStart()) {
cache1.put(1, 1);
cache2.put(2, 2);

tx.commit();

assert false : "Exception was not thrown.";
}
catch (IgniteException e) {
CacheWriterException we = X.cause(e, CacheWriterException.class);

assertNotNull(we);

assertEquals("Expected failure.", we.getMessage());
}
}

try (Connection conn = DriverManager.getConnection(URL)) {
checkTable(conn, 1, true);
checkTable(conn, 2, true);
}
}

/**
* @param conn Connection.
* @param idx Table index.
* @param empty If table expected to be empty.
* @throws Exception In case of error.
*/
private void checkTable(Connection conn, int idx, boolean empty) throws Exception {
ResultSet rs = conn.createStatement().executeQuery("SELECT key, value FROM Table" + idx);

int cnt = 0;

while (rs.next()) {
int key = rs.getInt(1);
int val = rs.getInt(2);

assertEquals(idx, key);
assertEquals(idx, val);

cnt++;
}

assertEquals(empty ? 0 : 1, cnt);
}

/**
* @param name Cache name.
* @param atomicity Atomicity mode.
Expand Down

0 comments on commit 16f045f

Please sign in to comment.