Skip to content

Commit

Permalink
JENA-1223: Have a data version and use to verify promotion.
Browse files Browse the repository at this point in the history
  • Loading branch information
afs committed Aug 13, 2016
1 parent 0142c3b commit 39a8c3e
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class Transaction
private final List<BlockMgrJournal> blkMgrs = new ArrayList<>() ;
// The dataset this is a transaction over - may be a commited, pending dataset.
private final DatasetGraphTDB basedsg ;
private final long version ;

private final List<Iterator<?>> iterators ; // Tracking iterators
private DatasetGraphTxn activedsg ;
Expand All @@ -52,7 +53,7 @@ enum TxnOutcome { UNFINISHED, W_ABORTED, W_COMMITED, R_CLOSED, R_ABORTED, R_COMM

private boolean changesPending ;

public Transaction(DatasetGraphTDB dsg, ReadWrite mode, long id, String label, TransactionManager txnMgr) {
public Transaction(DatasetGraphTDB dsg, long version, ReadWrite mode, long id, String label, TransactionManager txnMgr) {
this.id = id ;
if (label == null )
label = "Txn" ;
Expand All @@ -65,6 +66,7 @@ public Transaction(DatasetGraphTDB dsg, ReadWrite mode, long id, String label, T
this.label = label ;
this.txnMgr = txnMgr ;
this.basedsg = dsg ;
this.version = version ;
this.mode = mode ;
this.journal = ( txnMgr == null ) ? null : txnMgr.getJournal() ;
activedsg = null ; // Don't know yet.
Expand Down Expand Up @@ -268,6 +270,7 @@ public void close() {
public TransactionManager getTxnMgr() { return txnMgr ; }

public DatasetGraphTxn getActiveDataset() { return activedsg ; }
public long getVersion() { return version ; }

/*package*/ void setActiveDataset(DatasetGraphTxn activedsg) {
this.activedsg = activedsg ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ private void record(Transaction txn, TxnPoint state) {
List<Transaction> commitedAwaitingFlush = new ArrayList<>() ;

static AtomicLong transactionId = new AtomicLong(1) ;
// The version of the data starting at 1. The contract is that this is
// never the same for two version (it may not be monotonic increasing).
// Currently, it is sequentially increasing.
// Set on each commit.
private AtomicLong version = new AtomicLong(0) ;

// Accessed by SysTxnState
// These must be AtomicLong
Expand Down Expand Up @@ -342,13 +347,18 @@ public DatasetGraphTxn begin(ReadWrite mode, String label) {
// // Would this block corrctly? ... drops the sync lock?
// acquireWriterLock(true) ;

// 2/ Check the database view has not moved on.
DatasetGraphTDB current = determineBaseDataset() ;
DatasetGraphTDB starting = txn.getBaseDataset() ;
// Compare by object identity.
if ( current != starting )
throw new TDBTransactionException("Dataset changed - can't promote") ;
// 2/ Check the database view has not moved on. No active writers at the moment.

if ( txn.getVersion() != version.get() )
throw new TDBTransactionException("Dataset changed - can't promote") ;

// This is an equivalent test.
// // Compare by object identity.
// DatasetGraphTDB current = determineBaseDataset() ;
// DatasetGraphTDB starting = txn.getBaseDataset() ;
// if ( current != starting )
// throw new TDBTransactionException("Dataset changed - can't promote") ;

// Need to go through begin for the writers lock.
DatasetGraphTxn dsgtxn2 = begin( ReadWrite.WRITE, txn.getLabel()) ;
return dsgtxn2 ;
Expand Down Expand Up @@ -409,7 +419,7 @@ private DatasetGraphTDB determineBaseDataset() {
return dsg ;
}
private Transaction createTransaction(DatasetGraphTDB dsg, ReadWrite mode, String label) {
Transaction txn = new Transaction(dsg, mode, transactionId.getAndIncrement(), label, this) ;
Transaction txn = new Transaction(dsg, version.get(), mode, transactionId.getAndIncrement(), label, this) ;
return txn ;
}

Expand Down Expand Up @@ -452,6 +462,7 @@ public void notifyCommit(Transaction transaction) {
switch ( transaction.getMode() ) {
case READ: break ;
case WRITE:
version.incrementAndGet() ;
currentReaderView.set(null) ; // Clear the READ transaction cache.
releaseWriterLock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static void contains(NodeTable nt, Node...nodes)

Transaction createTxn(long id)
{
return new Transaction(null, ReadWrite.WRITE, id, null, null) ;
return new Transaction(null, 99, ReadWrite.WRITE, id, null, null) ;
}

@Test public void nodetrans_01()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class AbstractTestObjectFileTrans extends BaseTest
@Before
public void setup()
{
txn = new Transaction(null, ReadWrite.WRITE, ++count, null, tm) ;
txn = new Transaction(null, 5, ReadWrite.WRITE, ++count, null, tm) ;
file1 = createFile("base") ;
file2 = createFile("log") ;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ private void run_08(boolean b) {
}) ;
}

// Tests for XXX Read-committed yes/no, and whether the other transaction commits or aborts.
// Tests for XXX Read-committed yes/no (false = snapshot isolation, true = read committed),
// and whether the other transaction commits (true) or aborts (false).

@Test
public void promote_10() { promote_readCommit_txnCommit(true, true) ; }
Expand Down Expand Up @@ -297,8 +298,9 @@ private void promote_readCommit_txnCommit(boolean allowReadCommitted, boolean as
logger2.setLevel(level2);
}

// Test whether a writer casuses a snapshot isolation
// promotion to fail like it should
// Test whether an active writer causes a snapshot isolation
// promotion to fail like it should.
// No equivalent read commiter version - it would block.
@Test(expected=TDBTransactionException.class)
public void promote_clash_1() {
DatasetGraphTransaction.readCommittedPromotion = false ;
Expand All @@ -318,8 +320,9 @@ public void promote_clash_1() {
sema1.acquireUninterruptibly();
// The thread is in the write.
dsg.begin(ReadWrite.READ) ;
// If read commited this will block.
// If snapshot, this will though an exception due to the active writer.
// ++ If read commited, the promotion will block. ++
// because the other-thread writer is blocked.
// If snapshot, this will cause an exception due to the active writer.
dsg.add(q1) ;
fail("Should not be here") ;
dsg.commit() ;
Expand Down

0 comments on commit 39a8c3e

Please sign in to comment.