From 2590dc8c47038805bdf31eb02e70d295c567dbd5 Mon Sep 17 00:00:00 2001 From: Andy Seaborne Date: Fri, 12 Aug 2016 14:24:42 +0100 Subject: [PATCH] Fix for snapshot-style promotion. With tests. --- .../tdb/transaction/TransactionManager.java | 55 +++- .../tdb/transaction/TestTransPromote.java | 307 +++++++++++++----- 2 files changed, 261 insertions(+), 101 deletions(-) diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java index e4dcfae1d37..1a65277807b 100644 --- a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java +++ b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java @@ -326,19 +326,34 @@ public DatasetGraphTxn begin(ReadWrite mode, String label) { if ( txn.getState() != TxnState.ACTIVE ) throw new TDBTransactionException("promote: transaction is not active") ; - DatasetGraphTDB basedsg = txn.getBaseDataset() ; - // if read commiter - pick up any currentReaderView (last commited transaction) - if ( ! readCommited ) { - // Compare by object identity. - if ( currentReaderView.get() != basedsg ) - throw new TDBTransactionException("Dataset changed - can't promote") ; - } + if ( readCommited ) { + // Read commit - pick up whatever is current at the point setup. + // Need to go through begin for the writers lock. + DatasetGraphTxn dsgtxn2 = begin( ReadWrite.WRITE, txn.getLabel()) ; + return dsgtxn2 ; + } + + // Don't promote if the database has moved on. + // 1/ No active writers. + // Ideally, wiait to see if it aborts but abort is uncommon. + // Easy implementation -- if any active writers, don't promote. + if ( activeWriters.get() > 0 ) + throw new TDBTransactionException("Dataset may be changing - active writer - can't promote") ; +// // Would this block corrctly? ... drops the sync lock? +// acquireWriterLock(true) ; + + // 2/ Check the database view has not moved on. + DatasetGraphTDB current = determineBaseDataset() ; + DatasetGraphTDB starting = txn.getBaseDataset() ; + // Compare by object identity. + if ( current != starting ) + throw new TDBTransactionException("Dataset changed - can't promote") ; // Need to go through begin for the writers lock. DatasetGraphTxn dsgtxn2 = begin( ReadWrite.WRITE, txn.getLabel()) ; return dsgtxn2 ; } - + // If DatasetGraphTransaction has a sync lock on sConn, this // does not need to be sync'ed. But it's possible to use some // of the low level object directly so we'll play safe. @@ -355,16 +370,7 @@ public DatasetGraphTxn begin(ReadWrite mode, String label) { case WRITE : System.out.print("w") ; break ; } - DatasetGraphTDB dsg = baseDataset ; - // *** But, if there are pending, committed transactions, use latest. - if ( !commitedAwaitingFlush.isEmpty() ) { - if ( DEBUG ) - System.out.print(commitedAwaitingFlush.size()) ; - dsg = commitedAwaitingFlush.get(commitedAwaitingFlush.size() - 1).getActiveDataset().getView() ; - } else { - if ( DEBUG ) - System.out.print('_') ; - } + DatasetGraphTDB dsg = determineBaseDataset() ; Transaction txn = createTransaction(dsg, mode, label) ; log("begin$", txn) ; @@ -389,6 +395,19 @@ public DatasetGraphTxn begin(ReadWrite mode, String label) { return dsgTxn ; } + private DatasetGraphTDB determineBaseDataset() { + // if ( DEBUG ) { + // if ( !commitedAwaitingFlush.isEmpty() ) + // System.out.print(commitedAwaitingFlush.size()) ; + // } else { + // System.out.print('_') ; + // } + DatasetGraphTDB dsg = baseDataset ; + // But, if there are pending, committed transactions, use latest. + if ( !commitedAwaitingFlush.isEmpty() ) + dsg = commitedAwaitingFlush.get(commitedAwaitingFlush.size() - 1).getActiveDataset().getView() ; + return dsg ; + } private Transaction createTransaction(DatasetGraphTDB dsg, ReadWrite mode, String label) { Transaction txn = new Transaction(dsg, mode, transactionId.getAndIncrement(), label, this) ; return txn ; diff --git a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java index e51cc29df67..f843e761c71 100644 --- a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java +++ b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java @@ -16,9 +16,10 @@ * limitations under the License. */ -package org.apache.jena.tdb.transaction; +package org.apache.jena.tdb.transaction ; -import static org.junit.Assert.* ; +import static org.junit.Assert.assertEquals ; +import static org.junit.Assert.fail ; import java.util.concurrent.Semaphore ; import java.util.concurrent.atomic.AtomicInteger ; @@ -30,158 +31,298 @@ import org.apache.jena.sparql.sse.SSE ; import org.apache.jena.system.ThreadTxn ; import org.apache.jena.system.Txn ; +import org.apache.jena.tdb.TDB ; import org.apache.jena.tdb.TDBFactory ; import org.apache.jena.tdb.sys.SystemTDB ; -import org.apache.jena.tdb.transaction.DatasetGraphTransaction ; import org.apache.log4j.Level ; import org.apache.log4j.Logger ; -import org.junit.AfterClass ; -import org.junit.BeforeClass ; -import org.junit.Test ; +import org.junit.* ; -/** Tests for transactions that start read and then promote to write */ +/** Tests for transactions that start read and then promote to write */ public class TestTransPromote { // Currently, - // this feature is off and needs enabling via DatasetGraphTransaction.promotion - // promotiion is implicit whe a write happens. - - - + // this feature is off and needs enabling via DatasetGraphTransaction.promotion + // promotiion is implicit whe a write happens. + // See beforeClass / afterClass. - - private static Logger logger = Logger.getLogger(SystemTDB.errlog.getName()) ; - private static Level level ; - static boolean oldPromotion ; - - @BeforeClass static public void beforeClass() { - oldPromotion = DatasetGraphTransaction.promotion ; - DatasetGraphTransaction.promotion = true ; - level = logger.getLevel() ; - //logger.setLevel(Level.ERROR) ; + + private static Logger logger1 = Logger.getLogger(SystemTDB.errlog.getName()) ; + private static Level level1 ; + private static Logger logger2 = Logger.getLogger(TDB.logInfoName) ; + private static Level level2 ; + static boolean stdPromotion ; + static boolean stdReadCommitted ; + + @BeforeClass + static public void beforeClass() { + stdPromotion = DatasetGraphTransaction.promotion ; + stdReadCommitted = DatasetGraphTransaction.readCommittedPromotion ; + level1 = logger1.getLevel() ; + level2 = logger2.getLevel() ; + + // logger1.setLevel(Level.ERROR) ; + // logger2.setLevel(Level.ERROR) ; } - - @AfterClass static public void afterClass() { + + @AfterClass + static public void afterClass() { // Restore logging setting. - logger.setLevel(level); - DatasetGraphTransaction.promotion = oldPromotion ; + logger2.setLevel(level2) ; + logger1.setLevel(level1) ; + DatasetGraphTransaction.promotion = stdPromotion ; + DatasetGraphTransaction.readCommittedPromotion = stdReadCommitted ; + } + + @Before + public void before() { + DatasetGraphTransaction.promotion = true ; + DatasetGraphTransaction.readCommittedPromotion = true ; + } + + @After + public void after() { + DatasetGraphTransaction.promotion = true ; + DatasetGraphTransaction.readCommittedPromotion = true ; } + private static Quad q1 = SSE.parseQuad("(_ :s :p1 1)") ; private static Quad q2 = SSE.parseQuad("(_ :s :p2 2)") ; private static Quad q3 = SSE.parseQuad("(_ :s :p3 3)") ; - - protected DatasetGraph create() { return TDBFactory.createDatasetGraph() ; } - + + protected DatasetGraph create() { + return TDBFactory.createDatasetGraph() ; + } + protected static void assertCount(long expected, DatasetGraph dsg) { - dsg.begin(ReadWrite.READ); + dsg.begin(ReadWrite.READ) ; long x = Iter.count(dsg.find()) ; dsg.end() ; assertEquals(expected, x) ; } + + // "strict" = don't see intermedioate changes. + // "readCommitted" = do see + + // Subclass / parameterized - @Test public void promote_01() { + @Test public void promote_snapshot_01() { run_01(false) ; } + @Test public void promote_readCommitted_01() { run_01(true) ; } + + // READ-add + private void run_01(boolean b) { + DatasetGraphTransaction.readCommittedPromotion = b ; DatasetGraph dsg = create() ; - dsg.begin(ReadWrite.READ); + + dsg.begin(ReadWrite.READ) ; dsg.add(q1) ; - dsg.commit(); + dsg.commit() ; dsg.end() ; } - @Test public void promote_02() { + @Test public void promote_snapshot_02() { run_02(false) ; } + @Test public void promote_readCommitted_02() { run_02(true) ; } + + // Previous transaction then READ-add + private void run_02(boolean b) { + DatasetGraphTransaction.readCommittedPromotion = b ; DatasetGraph dsg = create() ; - dsg.begin(ReadWrite.READ); + + dsg.begin(ReadWrite.READ) ;dsg.end() ; + + dsg.begin(ReadWrite.READ) ; dsg.add(q1) ; - dsg.add(q2) ; - dsg.commit(); + dsg.commit() ; dsg.end() ; - assertCount(2, dsg) ; } + + @Test public void promote_snapshot_03() { run_03(false) ; } + @Test public void promote_readCommitted_03() { run_03(true) ; } - // Causes the warning. - @Test public void promote_03() { + private void run_03(boolean b) { + DatasetGraphTransaction.readCommittedPromotion = b ; DatasetGraph dsg = create() ; - dsg.begin(ReadWrite.READ); + + dsg.begin(ReadWrite.WRITE) ;dsg.commit() ; dsg.end() ; + + dsg.begin(ReadWrite.READ) ; dsg.add(q1) ; + dsg.commit() ; + dsg.end() ; + } + + @Test public void promote_snapshot_04() { run_04(false) ; } + @Test public void promote_readCommitted_04() { run_04(true) ; } + + private void run_04(boolean b) { + DatasetGraphTransaction.readCommittedPromotion = b ; + DatasetGraph dsg = create() ; + dsg.begin(ReadWrite.WRITE) ;dsg.abort() ; dsg.end() ; + + dsg.begin(ReadWrite.READ) ; + dsg.add(q1) ; + dsg.commit() ; + dsg.end() ; + } + + @Test public void promote_snapshot_05() { run_05(false) ; } + @Test public void promote_readCommitted_05() { run_05(true) ; } + + private void run_05(boolean b) { + DatasetGraphTransaction.readCommittedPromotion = b ; + DatasetGraph dsg = create() ; + dsg.begin(ReadWrite.READ) ; + dsg.add(q1) ; + // bad - forced abort. // Causes a WARN. - logger.setLevel(Level.ERROR) ; + logger1.setLevel(Level.ERROR) ; dsg.end() ; - logger.setLevel(level) ; - + logger1.setLevel(level1) ; + assertCount(0, dsg) ; } + + @Test public void promote_snapshot_06() { run_06(false) ; } + @Test public void promote_readCommitted_06() { run_06(true) ; } - @Test public void promote_04() { + // Async writer after promotion. + private void run_06(boolean b) { + DatasetGraphTransaction.readCommittedPromotion = b ; DatasetGraph dsg = create() ; AtomicInteger a = new AtomicInteger(0) ; - + Semaphore sema = new Semaphore(0) ; - Thread t = new Thread(()->{ - sema.release(); - Txn.execWrite(dsg, ()->dsg.add(q3)) ; - sema.release(); + Thread t = new Thread(() -> { + sema.release() ; + Txn.execWrite(dsg, () -> dsg.add(q3)) ; + sema.release() ; }) ; - - dsg.begin(ReadWrite.READ); + + dsg.begin(ReadWrite.READ) ; // Promote dsg.add(q1) ; - t.start(); + t.start() ; // First release. - sema.acquireUninterruptibly(); - // Thread blocked. + sema.acquireUninterruptibly() ; + // Thread blocked. dsg.add(q2) ; - dsg.commit(); + dsg.commit() ; dsg.end() ; - + // Until thread exits. - sema.acquireUninterruptibly(); + sema.acquireUninterruptibly() ; assertCount(3, dsg) ; } + + @Test public void promote_snapshot_07() { run_07(false) ; } + @Test public void promote_readCommitted_07() { run_07(true) ; } - @Test public void promote_05() { + // Async writer after promotion. + private void run_07(boolean b) { + DatasetGraphTransaction.readCommittedPromotion = b ; DatasetGraph dsg = create() ; // Start long running reader. - ThreadTxn tt = ThreadTxn.threadTxnRead(dsg, ()->{ + ThreadTxn tt = ThreadTxn.threadTxnRead(dsg, () -> { long x = Iter.count(dsg.find()) ; - if ( x != 0 ) + if ( x != 0 ) throw new RuntimeException() ; }) ; - + // Start R->W here - dsg.begin(ReadWrite.READ); + dsg.begin(ReadWrite.READ) ; dsg.add(q1) ; dsg.add(q2) ; - dsg.commit(); + dsg.commit() ; dsg.end() ; - tt.run(); + tt.run() ; } - @Test public void promote_06() { - promoteRC(true) ; - } - - @Test(expected=TDBTransactionException.class) - public void promote_07() { - promoteRC(false) ; - } - - private void promoteRC(boolean allowReadCommitted) { - DatasetGraphTransaction.readCommittedPromotion = allowReadCommitted ; + @Test public void promote_snapshot_08() { run_08(false) ; } + @Test public void promote_readCommitted_08() { run_08(true) ; } + + // Async writer after promotion trasnaction ends. + private void run_08(boolean b) { + DatasetGraphTransaction.readCommittedPromotion = b ; DatasetGraph dsg = create() ; + // Start R->W here + dsg.begin(ReadWrite.READ) ; + dsg.add(q1) ; + dsg.add(q2) ; + dsg.commit() ; + dsg.end() ; + Txn.execRead(dsg, () -> { + long x = Iter.count(dsg.find()) ; + assertEquals(2, x) ; + }) ; + } + + // Tests for XXX Read-committed yes/no, and whether the other transaction commits or aborts. + + @Test + public void promote_10() { promote_readCommit_txnCommit(true, true) ; } + + @Test + public void promote_11() { promote_readCommit_txnCommit(true, false) ; } + + @Test(expected = TDBTransactionException.class) + public void promote_12() { promote_readCommit_txnCommit(false, true) ; } + + @Test + public void promote_13() { promote_readCommit_txnCommit(false, false) ; } - ThreadTxn tt = ThreadTxn.threadTxnWrite(dsg, ()->{dsg.add(q3) ;}) ; + private void promote_readCommit_txnCommit(boolean allowReadCommitted, boolean asyncCommit) { + logger2.setLevel(Level.ERROR); + DatasetGraphTransaction.readCommittedPromotion = allowReadCommitted ; + DatasetGraph dsg = create() ; - dsg.begin(ReadWrite.READ); + ThreadTxn tt = asyncCommit? + ThreadTxn.threadTxnWrite(dsg, () -> dsg.add(q3) ) : + ThreadTxn.threadTxnWriteAbort(dsg, () -> dsg.add(q3)) ; + + dsg.begin(ReadWrite.READ) ; // Other runs - tt.run(); - // Can promote if readCommited + tt.run() ; + // Can promote if readCommited // Can't promote if not readCommited dsg.add(q1) ; - assertTrue(dsg.contains(q3)) ; - dsg.commit(); + if ( ! allowReadCommitted && asyncCommit ) + fail("Should not be here") ; + + assertEquals(asyncCommit, dsg.contains(q3)) ; + dsg.commit() ; + dsg.end() ; + logger2.setLevel(level2); + } + + // Test whether a writer casuses a snapshot isolation + // promotion to fail like it should + @Test(expected=TDBTransactionException.class) + public void promote_clash_1() { + DatasetGraphTransaction.readCommittedPromotion = false ; + DatasetGraph dsg = create() ; + Semaphore sema1 = new Semaphore(0) ; + Semaphore sema2 = new Semaphore(0) ; + Runnable r = ()->{ + dsg.begin(ReadWrite.WRITE) ; + sema1.release(1); + sema2.acquireUninterruptibly(1) ; + dsg.commit() ; + dsg.end() ; + } ; + + // Create a writer that waits. + new Thread(r).start(); + sema1.acquireUninterruptibly(); + // The thread is in the write. + dsg.begin(ReadWrite.READ) ; + // If read commited this will block. + // If snapshot, this will though an exception due to the active writer. + dsg.add(q1) ; + fail("Should not be here") ; + dsg.commit() ; dsg.end() ; } - }