Skip to content

Commit

Permalink
JENA-1469: Merge commit 'refs/pull/346/head' of github.com:apache/jena
Browse files Browse the repository at this point in the history
This closes #346.
  • Loading branch information
afs committed Jan 30, 2018
2 parents ebb1088 + fb17239 commit 6228bbc
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,18 +244,6 @@ private void run_08(TxnType txnType) {
}) ;
}

@Test
public void promote_10() { promote_readCommit_txnCommit(TxnType.READ_COMMITTED_PROMOTE, true) ; }

@Test
public void promote_11() { promote_readCommit_txnCommit(TxnType.READ_COMMITTED_PROMOTE, false) ; }

@Test
public void promote_12() {
expect(()->promote_readCommit_txnCommit(TxnType.READ_PROMOTE, true) ,
getTransactionExceptionClass()) ;
}

@SafeVarargs
private final void expect(Runnable runnable, Class<? extends Exception>...classes) {
try {
Expand All @@ -270,6 +258,18 @@ private final void expect(Runnable runnable, Class<? extends Exception>...classe
}
}

@Test
public void promote_10() { promote_readCommit_txnCommit(TxnType.READ_COMMITTED_PROMOTE, true) ; }

@Test
public void promote_11() { promote_readCommit_txnCommit(TxnType.READ_COMMITTED_PROMOTE, false) ; }

@Test
public void promote_12() {
expect(()->promote_readCommit_txnCommit(TxnType.READ_PROMOTE, true) ,
getTransactionExceptionClass()) ;
}

@Test
public void promote_13() { promote_readCommit_txnCommit(TxnType.READ_PROMOTE, false) ; }

Expand Down Expand Up @@ -388,7 +388,7 @@ private void promote_clash_active_writer(ExecutorService executor, boolean activ
} catch (InterruptedException | ExecutionException e1) { throw new RuntimeException(e1) ; }
}

// This would locks up because of a WRITE-WRITE deadly embrace.
// This would lock up because of a WRITE-WRITE deadly embrace.
// @Test(expected=JenaTransactionException.class)
// public void promote11() { test2(TxnMode.WRITE); }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,17 @@ public void transaction_p05() {
ds.end();
}

// JENA-1469
@Test
public void transaction_p06() {
transaction_promote_write(TxnType.READ_COMMITTED_PROMOTE);
}

@Test
public void transaction_p07() {
transaction_promote_write(TxnType.READ_PROMOTE);
}

@Test(expected=JenaException.class)
public void transaction_err_read_promote() {
assumeTrue(supportsPromote()) ;
Expand All @@ -245,9 +256,18 @@ public void transaction_err_read_promote() {
ds.commit();
ds.end();
}

// XXX READ_PROMOTE -> update -> fail promote/boolean.

private void transaction_promote_write(TxnType txnType) {
Dataset ds = create() ;
ds.begin(txnType);
ds.promote();
ds.commit();
ds.end();
ds.begin(TxnType.WRITE);
ds.commit();
ds.end();
}

// Patterns.
@Test
public void transaction_pattern_01() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private interface TSM {
void transactionCloses(Transaction txn) ;
void readerStarts(Transaction txn) ;
void readerFinishes(Transaction txn) ;
void transactionPromotes(Transaction txn) ;
void transactionPromotes(Transaction txnOld, Transaction txnNew) ;
void writerStarts(Transaction txn) ;
void writerCommits(Transaction txn) ;
void writerAborts(Transaction txn) ;
Expand All @@ -175,7 +175,7 @@ class TSM_Base implements TSM {
@Override public void transactionCloses(Transaction txn) {}
@Override public void readerStarts(Transaction txn) {}
@Override public void readerFinishes(Transaction txn) {}
@Override public void transactionPromotes(Transaction txn) {}
@Override public void transactionPromotes(Transaction txnOld, Transaction txnNew) {}
@Override public void writerStarts(Transaction txn) {}
@Override public void writerCommits(Transaction txn) {}
@Override public void writerAborts(Transaction txn) {}
Expand All @@ -185,7 +185,8 @@ class TSM_Logger extends TSM_Base {
TSM_Logger() {}
@Override public void readerStarts(Transaction txn) { log("start", txn) ; }
@Override public void readerFinishes(Transaction txn) { log("finish", txn) ; }
@Override public void transactionPromotes(Transaction txn) { log("promote", txn) ; }
@Override public void transactionPromotes(Transaction txnOld, Transaction txnNew)
{ log("promote(old)", txnOld) ; log("promote(new)", txnNew) ; }
@Override public void writerStarts(Transaction txn) { log("begin", txn) ; }
@Override public void writerCommits(Transaction txn) { log("commit", txn) ; }
@Override public void writerAborts(Transaction txn) { log("abort", txn) ; }
Expand All @@ -196,7 +197,8 @@ class TSM_LoggerDebug extends TSM_Base {
TSM_LoggerDebug() {}
@Override public void readerStarts(Transaction txn) { logInternal("start", txn) ; }
@Override public void readerFinishes(Transaction txn) { logInternal("finish", txn) ; }
@Override public void transactionPromotes(Transaction txn) { logInternal("promote", txn) ; }
@Override public void transactionPromotes(Transaction txnOld, Transaction txnNew)
{ logInternal("promote(old)", txnOld) ; logInternal("promote(new)", txnNew) ; }
@Override public void writerStarts(Transaction txn) { logInternal("begin", txn) ; }
@Override public void writerCommits(Transaction txn) { logInternal("commit", txn) ; }
@Override public void writerAborts(Transaction txn) { logInternal("abort", txn) ; }
Expand All @@ -209,7 +211,10 @@ class TSM_Counters implements TSM {
@Override public void transactionCloses(Transaction txn) { }
@Override public void readerStarts(Transaction txn) { inc(activeReaders) ; }
@Override public void readerFinishes(Transaction txn) { dec(activeReaders) ; inc(finishedReaders); }
@Override public void transactionPromotes(Transaction txn) { dec(activeReaders) ; inc(finishedReaders); inc(activeWriters); }

@Override public void transactionPromotes(Transaction txnOld, Transaction txnNew)
{ dec(activeReaders) ; inc(finishedReaders); inc(activeWriters); }

@Override public void writerStarts(Transaction txn) { inc(activeWriters) ; }
@Override public void writerCommits(Transaction txn) { dec(activeWriters) ; inc(committedWriters) ; }
@Override public void writerAborts(Transaction txn) { dec(activeWriters) ; inc(abortedWriters) ; }
Expand All @@ -236,6 +241,12 @@ class TSM_WriteBackEndTxn extends TSM_Base {

// Currently, the writer semaphore is managed explicitly in the main code.

@Override public void transactionPromotes(Transaction txnOld, Transaction txnNew) {
// Switch locks.
txnOld.getBaseDataset().getLock().leaveCriticalSection() ;
txnNew.getBaseDataset().getLock().enterCriticalSection(Lock.READ) ;
}

@Override
public void readerFinishes(Transaction txn) {
txn.getBaseDataset().getLock().leaveCriticalSection() ;
Expand Down Expand Up @@ -342,7 +353,9 @@ private DatasetGraphTxn beginInternal(TxnType txnType, TxnType originalTxnType,
acquireWriterLock(true) ;
}
// entry synchronized part
return begin$(txnType, originalTxnType, label) ;
DatasetGraphTxn dsgtxn = begin$(txnType, originalTxnType, label) ;
noteTxnStart(dsgtxn.getTransaction()) ;
return dsgtxn;
}

/** Ensure a DatasetGraphTxn is for a write transaction.
Expand Down Expand Up @@ -377,11 +390,9 @@ private DatasetGraphTxn beginInternal(TxnType txnType, TxnType originalTxnType,
// Can also promote - may need to wait for active writers.
// Go through begin for the writers lock.
if ( txnType == TxnType.READ_COMMITTED_PROMOTE ) {
// Full begin cycle.
DatasetGraphTxn dsgtxn2 = beginInternal(TxnType.WRITE, txnType, txn.getLabel()) ;
// Junk the old one.
noteTxnPromote(txn, dsgtxn2.getTransaction());
return dsgtxn2 ;
acquireWriterLock(true);
// No need to sync - we just queue as a writer.
return promoteExec$(dsgtxn, txnType);
}

// First check, without the writer lock. Fast fail.
Expand All @@ -402,25 +413,31 @@ private DatasetGraphTxn beginInternal(TxnType txnType, TxnType originalTxnType,
// Potentially blocking - must be outside 'synchronized' so that any active writer
// can commit/abort. Otherwise, we have deadlock.
acquireWriterLock(true) ;

// Do the synchronized stuff.
return promote2$(dsgtxn, txnType) ;
return promoteSync$(dsgtxn, txnType) ;
}

synchronized
private DatasetGraphTxn promote2$(DatasetGraphTxn dsgtxn, TxnType originalTxnType) {
private DatasetGraphTxn promoteSync$(DatasetGraphTxn dsgtxn, TxnType originalTxnType) {
Transaction txn = dsgtxn.getTransaction() ;
// Writers may have happened between the first check of the active writers may have committed.
if ( txn.getVersion() != version.get() ) {
releaseWriterLock();
return null;
}
// Use begin$ (not beginInternal) - we have the writers lock.
return promoteExec$(dsgtxn, originalTxnType);
}

private DatasetGraphTxn promoteExec$(DatasetGraphTxn dsgtxn, TxnType originalTxnType) {
// Use begin$ (not beginInternal)
// We have the writers lock.
// We keep the exclusivity lock.
Transaction txn = dsgtxn.getTransaction() ;
DatasetGraphTxn dsgtxn2 = begin$(TxnType.WRITE, originalTxnType, txn.getLabel()) ;
noteTxnPromote(txn, dsgtxn2.getTransaction());
return dsgtxn2 ;
}

// If DatasetGraphTransaction has a sync lock on sConn, this
// does not need to be sync'ed. But it's possible to use some
// of the low level objects directly so we'll play safe.
Expand Down Expand Up @@ -450,6 +467,7 @@ private DatasetGraphTxn beginInternal(TxnType txnType, TxnType originalTxnType,
txn.setActiveDataset(dsgTxn) ;

// Empty for READ ; only WRITE transactions have components that need notifying.
// Promote is a WRITE transaction starting.
List<TransactionLifecycle> components = dsgTxn.getTransaction().lifecycleComponents() ;

if ( mode == ReadWrite.READ ) {
Expand All @@ -460,8 +478,6 @@ private DatasetGraphTxn beginInternal(TxnType txnType, TxnType originalTxnType,

for ( TransactionLifecycle component : components )
component.begin(dsgTxn.getTransaction()) ;

noteTxnStart(txn) ;
return dsgTxn ;
}

Expand Down Expand Up @@ -870,10 +886,9 @@ private void noteTxnStart(Transaction transaction) {
private void noteTxnPromote(Transaction transaction, Transaction transaction2) {
activeTransactions.remove(transaction);
activeTransactions.add(transaction2);
transactionPromotes(transaction) ;
transactionPromotes(transaction, transaction2) ;
}


private void noteTxnCommit(Transaction transaction) {
switch (transaction.getTxnMode())
{
Expand Down Expand Up @@ -1011,10 +1026,10 @@ private void readerFinishes(Transaction txn) {
tsm.readerFinishes(txn) ;
}

private void transactionPromotes(Transaction txn) {
private void transactionPromotes(Transaction txnOld, Transaction txnNew) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.transactionPromotes(txn);
tsm.transactionPromotes(txnOld, txnNew);
}

private void writerStarts(Transaction txn) {
Expand Down

0 comments on commit 6228bbc

Please sign in to comment.