From fb172399ff2181a8f5f82e5988424defdf38da67 Mon Sep 17 00:00:00 2001 From: Andy Seaborne Date: Sat, 27 Jan 2018 17:11:22 +0000 Subject: [PATCH] JENA-1469: Adjust internal tracking state across promotion --- .../transaction/AbstractTestTransPromote.java | 26 ++++---- .../AbstractTestTransactionLifecycle.java | 24 +++++++- .../tdb/transaction/TransactionManager.java | 59 ++++++++++++------- 3 files changed, 72 insertions(+), 37 deletions(-) diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java b/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java index 9134668a1ea..1ca137989e9 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java @@ -244,18 +244,6 @@ private void run_08(TxnType txnType) { }) ; } - @Test - public void promote_10() { promote_readCommit_txnCommit(TxnType.READ_COMMITTED_PROMOTE, true) ; } - - @Test - public void promote_11() { promote_readCommit_txnCommit(TxnType.READ_COMMITTED_PROMOTE, false) ; } - - @Test - public void promote_12() { - expect(()->promote_readCommit_txnCommit(TxnType.READ_PROMOTE, true) , - getTransactionExceptionClass()) ; - } - @SafeVarargs private final void expect(Runnable runnable, Class...classes) { try { @@ -270,6 +258,18 @@ private final void expect(Runnable runnable, Class...classe } } + @Test + public void promote_10() { promote_readCommit_txnCommit(TxnType.READ_COMMITTED_PROMOTE, true) ; } + + @Test + public void promote_11() { promote_readCommit_txnCommit(TxnType.READ_COMMITTED_PROMOTE, false) ; } + + @Test + public void promote_12() { + expect(()->promote_readCommit_txnCommit(TxnType.READ_PROMOTE, true) , + getTransactionExceptionClass()) ; + } + @Test public void promote_13() { promote_readCommit_txnCommit(TxnType.READ_PROMOTE, false) ; } @@ -388,7 +388,7 @@ private void promote_clash_active_writer(ExecutorService executor, boolean activ } catch (InterruptedException | ExecutionException e1) { throw new RuntimeException(e1) ; } } - // This would locks up because of a WRITE-WRITE deadly embrace. + // This would lock up because of a WRITE-WRITE deadly embrace. // @Test(expected=JenaTransactionException.class) // public void promote11() { test2(TxnMode.WRITE); } diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransactionLifecycle.java b/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransactionLifecycle.java index c1043b899c4..ec804258ab5 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransactionLifecycle.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransactionLifecycle.java @@ -235,6 +235,17 @@ public void transaction_p05() { ds.end(); } + // JENA-1469 + @Test + public void transaction_p06() { + transaction_promote_write(TxnType.READ_COMMITTED_PROMOTE); + } + + @Test + public void transaction_p07() { + transaction_promote_write(TxnType.READ_PROMOTE); + } + @Test(expected=JenaException.class) public void transaction_err_read_promote() { assumeTrue(supportsPromote()) ; @@ -245,9 +256,18 @@ public void transaction_err_read_promote() { ds.commit(); ds.end(); } - - // XXX READ_PROMOTE -> update -> fail promote/boolean. + private void transaction_promote_write(TxnType txnType) { + Dataset ds = create() ; + ds.begin(txnType); + ds.promote(); + ds.commit(); + ds.end(); + ds.begin(TxnType.WRITE); + ds.commit(); + ds.end(); + } + // Patterns. @Test public void transaction_pattern_01() { diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java index 10e8b44c2e9..e9c4797f2f3 100644 --- a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java +++ b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java @@ -163,7 +163,7 @@ private interface TSM { void transactionCloses(Transaction txn) ; void readerStarts(Transaction txn) ; void readerFinishes(Transaction txn) ; - void transactionPromotes(Transaction txn) ; + void transactionPromotes(Transaction txnOld, Transaction txnNew) ; void writerStarts(Transaction txn) ; void writerCommits(Transaction txn) ; void writerAborts(Transaction txn) ; @@ -175,7 +175,7 @@ class TSM_Base implements TSM { @Override public void transactionCloses(Transaction txn) {} @Override public void readerStarts(Transaction txn) {} @Override public void readerFinishes(Transaction txn) {} - @Override public void transactionPromotes(Transaction txn) {} + @Override public void transactionPromotes(Transaction txnOld, Transaction txnNew) {} @Override public void writerStarts(Transaction txn) {} @Override public void writerCommits(Transaction txn) {} @Override public void writerAborts(Transaction txn) {} @@ -185,7 +185,8 @@ class TSM_Logger extends TSM_Base { TSM_Logger() {} @Override public void readerStarts(Transaction txn) { log("start", txn) ; } @Override public void readerFinishes(Transaction txn) { log("finish", txn) ; } - @Override public void transactionPromotes(Transaction txn) { log("promote", txn) ; } + @Override public void transactionPromotes(Transaction txnOld, Transaction txnNew) + { log("promote(old)", txnOld) ; log("promote(new)", txnNew) ; } @Override public void writerStarts(Transaction txn) { log("begin", txn) ; } @Override public void writerCommits(Transaction txn) { log("commit", txn) ; } @Override public void writerAborts(Transaction txn) { log("abort", txn) ; } @@ -196,7 +197,8 @@ class TSM_LoggerDebug extends TSM_Base { TSM_LoggerDebug() {} @Override public void readerStarts(Transaction txn) { logInternal("start", txn) ; } @Override public void readerFinishes(Transaction txn) { logInternal("finish", txn) ; } - @Override public void transactionPromotes(Transaction txn) { logInternal("promote", txn) ; } + @Override public void transactionPromotes(Transaction txnOld, Transaction txnNew) + { logInternal("promote(old)", txnOld) ; logInternal("promote(new)", txnNew) ; } @Override public void writerStarts(Transaction txn) { logInternal("begin", txn) ; } @Override public void writerCommits(Transaction txn) { logInternal("commit", txn) ; } @Override public void writerAborts(Transaction txn) { logInternal("abort", txn) ; } @@ -209,7 +211,10 @@ class TSM_Counters implements TSM { @Override public void transactionCloses(Transaction txn) { } @Override public void readerStarts(Transaction txn) { inc(activeReaders) ; } @Override public void readerFinishes(Transaction txn) { dec(activeReaders) ; inc(finishedReaders); } - @Override public void transactionPromotes(Transaction txn) { dec(activeReaders) ; inc(finishedReaders); inc(activeWriters); } + + @Override public void transactionPromotes(Transaction txnOld, Transaction txnNew) + { dec(activeReaders) ; inc(finishedReaders); inc(activeWriters); } + @Override public void writerStarts(Transaction txn) { inc(activeWriters) ; } @Override public void writerCommits(Transaction txn) { dec(activeWriters) ; inc(committedWriters) ; } @Override public void writerAborts(Transaction txn) { dec(activeWriters) ; inc(abortedWriters) ; } @@ -236,6 +241,12 @@ class TSM_WriteBackEndTxn extends TSM_Base { // Currently, the writer semaphore is managed explicitly in the main code. + @Override public void transactionPromotes(Transaction txnOld, Transaction txnNew) { + // Switch locks. + txnOld.getBaseDataset().getLock().leaveCriticalSection() ; + txnNew.getBaseDataset().getLock().enterCriticalSection(Lock.READ) ; + } + @Override public void readerFinishes(Transaction txn) { txn.getBaseDataset().getLock().leaveCriticalSection() ; @@ -342,7 +353,9 @@ private DatasetGraphTxn beginInternal(TxnType txnType, TxnType originalTxnType, acquireWriterLock(true) ; } // entry synchronized part - return begin$(txnType, originalTxnType, label) ; + DatasetGraphTxn dsgtxn = begin$(txnType, originalTxnType, label) ; + noteTxnStart(dsgtxn.getTransaction()) ; + return dsgtxn; } /** Ensure a DatasetGraphTxn is for a write transaction. @@ -377,11 +390,9 @@ private DatasetGraphTxn beginInternal(TxnType txnType, TxnType originalTxnType, // Can also promote - may need to wait for active writers. // Go through begin for the writers lock. if ( txnType == TxnType.READ_COMMITTED_PROMOTE ) { - // Full begin cycle. - DatasetGraphTxn dsgtxn2 = beginInternal(TxnType.WRITE, txnType, txn.getLabel()) ; - // Junk the old one. - noteTxnPromote(txn, dsgtxn2.getTransaction()); - return dsgtxn2 ; + acquireWriterLock(true); + // No need to sync - we just queue as a writer. + return promoteExec$(dsgtxn, txnType); } // First check, without the writer lock. Fast fail. @@ -402,25 +413,31 @@ private DatasetGraphTxn beginInternal(TxnType txnType, TxnType originalTxnType, // Potentially blocking - must be outside 'synchronized' so that any active writer // can commit/abort. Otherwise, we have deadlock. acquireWriterLock(true) ; - // Do the synchronized stuff. - return promote2$(dsgtxn, txnType) ; + return promoteSync$(dsgtxn, txnType) ; } synchronized - private DatasetGraphTxn promote2$(DatasetGraphTxn dsgtxn, TxnType originalTxnType) { + private DatasetGraphTxn promoteSync$(DatasetGraphTxn dsgtxn, TxnType originalTxnType) { Transaction txn = dsgtxn.getTransaction() ; // Writers may have happened between the first check of the active writers may have committed. if ( txn.getVersion() != version.get() ) { releaseWriterLock(); return null; } - // Use begin$ (not beginInternal) - we have the writers lock. + return promoteExec$(dsgtxn, originalTxnType); + } + + private DatasetGraphTxn promoteExec$(DatasetGraphTxn dsgtxn, TxnType originalTxnType) { + // Use begin$ (not beginInternal) + // We have the writers lock. + // We keep the exclusivity lock. + Transaction txn = dsgtxn.getTransaction() ; DatasetGraphTxn dsgtxn2 = begin$(TxnType.WRITE, originalTxnType, txn.getLabel()) ; noteTxnPromote(txn, dsgtxn2.getTransaction()); return dsgtxn2 ; } - + // If DatasetGraphTransaction has a sync lock on sConn, this // does not need to be sync'ed. But it's possible to use some // of the low level objects directly so we'll play safe. @@ -450,6 +467,7 @@ private DatasetGraphTxn beginInternal(TxnType txnType, TxnType originalTxnType, txn.setActiveDataset(dsgTxn) ; // Empty for READ ; only WRITE transactions have components that need notifying. + // Promote is a WRITE transaction starting. List components = dsgTxn.getTransaction().lifecycleComponents() ; if ( mode == ReadWrite.READ ) { @@ -460,8 +478,6 @@ private DatasetGraphTxn beginInternal(TxnType txnType, TxnType originalTxnType, for ( TransactionLifecycle component : components ) component.begin(dsgTxn.getTransaction()) ; - - noteTxnStart(txn) ; return dsgTxn ; } @@ -870,10 +886,9 @@ private void noteTxnStart(Transaction transaction) { private void noteTxnPromote(Transaction transaction, Transaction transaction2) { activeTransactions.remove(transaction); activeTransactions.add(transaction2); - transactionPromotes(transaction) ; + transactionPromotes(transaction, transaction2) ; } - private void noteTxnCommit(Transaction transaction) { switch (transaction.getTxnMode()) { @@ -1011,10 +1026,10 @@ private void readerFinishes(Transaction txn) { tsm.readerFinishes(txn) ; } - private void transactionPromotes(Transaction txn) { + private void transactionPromotes(Transaction txnOld, Transaction txnNew) { for ( TSM tsm : actions ) if ( tsm != null ) - tsm.transactionPromotes(txn); + tsm.transactionPromotes(txnOld, txnNew); } private void writerStarts(Transaction txn) {