Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Nearly finished with testing Transactions, I need to test a couple er…

…ror scenarios related to transaction durability.

Signed-off-by: gburgett <gordon.burgett@gmail.com>
  • Loading branch information...
commit d9ad002ba97e270f1d2b0dd2973ccd8c5a7e968b 1 parent f3a3613
@gburgett authored
Showing with 1,278 additions and 129 deletions.
  1. +24 −0 java/XFlat/src/org/gburgett/xflat/db/EngineBase.java
  2. +19 −1 java/XFlat/src/org/gburgett/xflat/db/EngineTransactionManager.java
  3. +5 −0 java/XFlat/src/org/gburgett/xflat/db/IdAccessor.java
  4. +19 −15 java/XFlat/src/org/gburgett/xflat/db/ShardedEngineBase.java
  5. +1 −1  java/XFlat/src/org/gburgett/xflat/db/TableMetadata.java
  6. +12 −8 java/XFlat/src/org/gburgett/xflat/db/TableMetadataFactory.java
  7. +57 −24 java/XFlat/src/org/gburgett/xflat/db/XFlatDatabase.java
  8. +136 −44 java/XFlat/src/org/gburgett/xflat/engine/CachedDocumentEngine.java
  9. +220 −19 java/XFlat/src/org/gburgett/xflat/transaction/ThreadContextTransactionManager.java
  10. +1 −1  java/XFlat/src/org/gburgett/xflat/transaction/Transaction.java
  11. +5 −1 java/XFlat/src/org/gburgett/xflat/transaction/TransactionException.java
  12. +289 −12 java/XFlat/test/org/gburgett/xflat/db/EngineTestsBase.java
  13. +372 −0 java/XFlat/test/org/gburgett/xflat/db/EngineTransactionManagerTestBase.java
  14. +6 −3 java/XFlat/test/org/gburgett/xflat/engine/IdShardedEngineTest.java
  15. +30 −0 java/XFlat/test/org/gburgett/xflat/transaction/FakeThreadContextTransactionManager.java
  16. +30 −0 java/XFlat/test/org/gburgett/xflat/transaction/ThreadContextTransactionManagerTest.java
  17. +52 −0 java/XFlat/test/org/gburgett/xflat/util/FakeDocumentFileWrapper.java
View
24 java/XFlat/src/org/gburgett/xflat/db/EngineBase.java
@@ -363,6 +363,7 @@ protected void setId(Element row, String id){
row.setAttribute("id", id, XFlatDatabase.xFlatNs);
}
+ //<editor-fold desc="transactions">
/**
* Checks whether this engine has any transactional updates in an uncommitted
@@ -373,6 +374,29 @@ protected void setId(Element row, String id){
protected abstract boolean hasUncomittedData();
/**
+ * Called when a transaction is committed to write the committed data to disk.
+ * After this method returns, the data should be stored in non-volatile storage.
+ * @param tx
+ */
+ public void commit(Transaction tx){
+
+ }
+
+ /**
+ * Called when a transaction is committed to revert the given transaction ID.
+ * This may be called even if a transaction was previously committed in this engine,
+ * because it was not fully committed across all engines.
+ * @param tx
+ * @param isRecovering true if this transaction is being reverted during recovery
+ * at startup.
+ */
+ public void revert(long tx, boolean isRecovering){
+
+ }
+
+ //</editor-fold>
+
+ /**
* Represents one row in the database. The row contains a set of
* {@link RowData} which represents the committed and uncommitted data in
* the row. The row data is mapped by its transaction ID.
View
20 java/XFlat/src/org/gburgett/xflat/db/EngineTransactionManager.java
@@ -12,7 +12,7 @@
*
* @author Gordon
*/
-public abstract class EngineTransactionManager implements TransactionManager {
+public abstract class EngineTransactionManager implements TransactionManager, AutoCloseable {
/**
* Gets a new commit ID for a transactionless write operation.
@@ -121,5 +121,23 @@ protected long generateNewId(){
return id;
}
+
+ /**
+ * Returns true if any transactions are currently open.
+ * @return
+ */
+ public abstract boolean anyOpenTransactions();
+
+ /**
+ * Attempts to recover from an unexpected shutdown if necessary.
+ * @param db
+ */
+ public abstract void recover(XFlatDatabase db);
+ /**
+ * Closes any resources in use by this transaction manager in preparation
+ * for shutdown. Any exceptions at this point should be logged but not
+ * rethrown since this is only used during shutdown.
+ */
+ public abstract void close();
}
View
5 java/XFlat/src/org/gburgett/xflat/db/IdAccessor.java
@@ -50,6 +50,11 @@ private IdAccessor(Class<T> pojoType, PropertyDescriptor idProperty, Field idFie
* @return The accessor, which may have already been created and cached.
*/
public static <U> IdAccessor<U> forClass(Class<U> pojoType){
+ if(pojoType.isPrimitive() ||
+ String.class.equals(pojoType)){
+ return null;
+ }
+
IdAccessor<U> ret = (IdAccessor<U>)cachedAccessors.get(pojoType);
if(ret != null){
return ret;
View
34 java/XFlat/src/org/gburgett/xflat/db/ShardedEngineBase.java
@@ -123,7 +123,7 @@ private EngineBase getEngine(Interval<T> interval){
File file = new File(directory, name + ".xml");
this.knownShards.put(interval, file);
- metadata = this.getMetadataFactory().makeTableMetadata(name, file);
+ metadata = this.getMetadataFactory().makeTableMetadata(this.getTableName(), file);
metadata.config = TableConfig.Default; //not even really used for our purposes
TableMetadata weWereLate = openShards.putIfAbsent(interval, metadata);
@@ -154,21 +154,12 @@ private EngineBase getEngine(Interval<T> interval){
throw new XflatException("Attempt to read or write to an engine in an uninitialized state");
}
- //all operations are writes for the purposes of the sharded engine.
- //NOT TRUE! Need to fix this!
- ensureWriteReady();
try{
-
- try{
- return action.act(getEngine(range));
- }
- catch(EngineStateException ex){
- //try one more time with a potentially new engine, if we still fail then let it go
- return action.act(getEngine(range));
- }
-
- }finally{
- writeComplete();
+ return action.act(getEngine(range));
+ }
+ catch(EngineStateException ex){
+ //try one more time with a potentially new engine, if we still fail then let it go
+ return action.act(getEngine(range));
}
}
@@ -214,6 +205,19 @@ else if(state == EngineState.Running){
}
@Override
+ public void revert(long txId, boolean isRecovering){
+ if(!isRecovering){
+ //the individual shard engines will also have been registered.
+ return;
+ }
+
+ //we will need to revert over all known shards in order to recover.
+ for(Interval<T> interval : this.knownShards.keySet()){
+ this.getEngine(interval).revert(txId, isRecovering);
+ }
+ }
+
+ @Override
protected boolean spinUp() {
if(!this.state.compareAndSet(EngineState.Uninitialized, EngineState.SpinningUp)){
return false;
View
2  java/XFlat/src/org/gburgett/xflat/db/TableMetadata.java
@@ -51,7 +51,7 @@ public String getName(){
public boolean canSpinDown(){
EngineBase engine = this.engine.get();
- return lastActivity + 3000 < System.currentTimeMillis() && engine == null || !engine.hasUncomittedData();
+ return lastActivity + config.getInactivityShutdownMs() < System.currentTimeMillis() && engine == null || !engine.hasUncomittedData();
}
public EngineBase getEngine(){
View
20 java/XFlat/src/org/gburgett/xflat/db/TableMetadataFactory.java
@@ -133,20 +133,23 @@ private TableMetadata makeNewTableMetadata(String name, File engineFile, TableCo
Class<? extends IdGenerator> generatorClass = config.getIdGenerator();
if(generatorClass != null){
ret.idGenerator = makeIdGenerator(generatorClass);
- if(!ret.idGenerator.supports(idType)){
+ if(idType != null && !ret.idGenerator.supports(idType)){
throw new XflatException("Id Generator " + generatorClass.getName() +
" does not support type " + idType);
}
}
else {
+
//pick using our strategy
- for(Class<? extends IdGenerator> g : dbConfig.getIdGeneratorStrategy()){
- IdGenerator gen = makeIdGenerator(g);
- if(gen.supports(idType)){
- ret.idGenerator = gen;
- break;
+ if(idType != null)
+ for(Class<? extends IdGenerator> g : dbConfig.getIdGeneratorStrategy()){
+ IdGenerator gen = makeIdGenerator(g);
+ if(gen.supports(idType)){
+ ret.idGenerator = gen;
+ break;
+ }
}
- }
+
if(ret.idGenerator == null){
throw new XflatException("Could not pick id generator for type " + idType);
}
@@ -185,7 +188,8 @@ private TableMetadata makeTableMetadataFromDocument(String name, File engineFile
}
}
ret.idGenerator = makeIdGenerator(generatorClass);
- if(!ret.idGenerator.supports(idType)){
+
+ if(idType != null && !ret.idGenerator.supports(idType)){
throw new XflatException("Id Generator " + generatorClass + " does not support " +
" ID type " + idType);
}
View
81 java/XFlat/src/org/gburgett/xflat/db/XFlatDatabase.java
@@ -34,6 +34,7 @@
import org.gburgett.xflat.convert.converters.StringConverters;
import org.gburgett.xflat.transaction.ThreadContextTransactionManager;
import org.gburgett.xflat.transaction.TransactionManager;
+import org.gburgett.xflat.util.DocumentFileWrapper;
import org.jdom2.Document;
import org.jdom2.Element;
import org.jdom2.Namespace;
@@ -203,7 +204,7 @@ public void Initialize(){
this.executorService = new ScheduledThreadPoolExecutor(this.config.getThreadCount());
if(this.transactionManager == null){
- this.transactionManager = new ThreadContextTransactionManager();
+ this.transactionManager = new ThreadContextTransactionManager(new DocumentFileWrapper(new File(directory, "xflat_transaction")));
}
this.InitializeScheduledTasks();
@@ -211,6 +212,9 @@ public void Initialize(){
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
+ //recover transactional state if necessary
+ this.transactionManager.recover(this);
+
}catch(Exception ex){
this.state.set(DatabaseState.Uninitialized);
throw new XflatException("Initialization error", ex);
@@ -224,10 +228,14 @@ public void Initialize(){
*/
public void shutdown(){
try{
- this.shutdown(0);
+ this.doShutdown(0);
}catch(TimeoutException ex){
throw new RuntimeException("A timeout occured that should never have happened", ex);
}
+ finally{
+ //close all resources
+ this.getEngineTransactionManager().close();
+ }
}
/**
@@ -239,6 +247,16 @@ public void shutdown(){
* the timeout expired.
*/
public void shutdown(int timeout) throws TimeoutException{
+ try{
+ this.doShutdown(timeout);
+ }
+ finally{
+ //close all resources
+ this.getEngineTransactionManager().close();
+ }
+ }
+
+ private void doShutdown(int timeout) throws TimeoutException{
if(!this.state.compareAndSet(DatabaseState.Running, DatabaseState.ShuttingDown)){
return;
}
@@ -398,6 +416,18 @@ public void extendConversionService(PojoConverter extender){
@Override
public <T> Table<T> getTable(Class<T> type, String name){
+
+ TableMetadata table = getMetadata(type, name);
+
+ TableBase ret = table.getTable(type);
+ return (Table<T>)ret;
+ }
+
+ public EngineBase getEngine(String name){
+ return getMetadata(null, name).provideEngine();
+ }
+
+ private TableMetadata getMetadata(Class<?> type, String name){
if(this.state.get() == DatabaseState.Uninitialized){
throw new IllegalStateException("Database has not been initialized");
}
@@ -408,34 +438,38 @@ public void extendConversionService(PojoConverter extender){
if(name == null || name.startsWith("xflat_")){
throw new IllegalArgumentException("Table name cannot be null or start with 'xflat_': " + name);
}
-
- if(!this.getConversionService().canConvert(type, Element.class) ||
- !this.getConversionService().canConvert(Element.class, type)){
-
- try {
- //try to load the pojo converter
- loadPojoConverter();
-
- } catch (Exception ex) {
- throw new UnsupportedOperationException("No conversion available between " +
- type + " and " + Element.class, ex);
- }
-
- //check again
+ if(type != null){
if(!this.getConversionService().canConvert(type, Element.class) ||
- !this.getConversionService().canConvert(Element.class, type)){
- throw new UnsupportedOperationException("No conversion available between " +
- type + " and " + Element.class);
+ !this.getConversionService().canConvert(Element.class, type)){
+
+ try {
+ //try to load the pojo converter
+ loadPojoConverter();
+
+ } catch (Exception ex) {
+ throw new UnsupportedOperationException("No conversion available between " +
+ type + " and " + Element.class, ex);
+ }
+
+ //check again
+ if(!this.getConversionService().canConvert(type, Element.class) ||
+ !this.getConversionService().canConvert(Element.class, type)){
+ throw new UnsupportedOperationException("No conversion available between " +
+ type + " and " + Element.class);
+ }
}
}
+
//see if we have a cached engine already
TableMetadata table = this.tables.get(name);
if(table == null){
TableConfig tblConfig = this.tableConfigs.get(name);
Class<?> idType = String.class;
- IdAccessor accessor = IdAccessor.forClass(type);
- if(accessor.hasId()){
- idType = accessor.getIdType();
+ if(type != null){
+ IdAccessor accessor = IdAccessor.forClass(type);
+ if(accessor != null && accessor.hasId()){
+ idType = accessor.getIdType();
+ }
}
table = this.metadataFactory.makeTableMetadata(name, new File(getDirectory(), name + ".xml"), tblConfig, idType);
@@ -447,8 +481,7 @@ public void extendConversionService(PojoConverter extender){
}
}
- TableBase ret = table.getTable(type);
- return (Table<T>)ret;
+ return table;
}
private AtomicBoolean pojoConverterLoaded = new AtomicBoolean(false);
View
180 java/XFlat/src/org/gburgett/xflat/engine/CachedDocumentEngine.java
@@ -52,6 +52,8 @@
private ConcurrentMap<String, Row> uncommittedRows = null;
+ private final Object syncRoot = new Object();
+
private DocumentFileWrapper file;
public DocumentFileWrapper getFile(){
return file;
@@ -97,7 +99,9 @@ public void insertRow(String id, Element data) throws DuplicateKeyException {
if(chosen == null || chosen.data == null){
//we're good to insert our transactional data
row.rowData.put(txId, rData);
- this.uncommittedRows.put(id, row);
+
+ if(tx != null || this.getTransactionManager().anyOpenTransactions())
+ this.uncommittedRows.put(id, row);
}
else{
throw new DuplicateKeyException(id);
@@ -155,7 +159,6 @@ public void replaceRow(String id, Element data) throws KeyNotFoundException {
Transaction tx = this.getTransactionManager().getTransaction();
long txId = getTxId(tx);
-
Row row = this.cache.get(id);
if(row == null){
throw new KeyNotFoundException(id);
@@ -173,6 +176,8 @@ public void replaceRow(String id, Element data) throws KeyNotFoundException {
newData.commitId = txId;
}
row.rowData.put(txId, newData);
+ if(tx != null || this.getTransactionManager().anyOpenTransactions())
+ this.uncommittedRows.put(id, row);
}
setLastActivity(System.currentTimeMillis());
@@ -218,7 +223,8 @@ public boolean update(String id, XpathUpdate update) throws KeyNotFoundException
if(ret){
//no need to put a new version if no data was modified
row.rowData.put(txId, newData);
- this.uncommittedRows.put(id, row);
+ if(tx != null || this.getTransactionManager().anyOpenTransactions())
+ this.uncommittedRows.put(id, row);
}
}
}
@@ -279,7 +285,7 @@ public int update(XpathQuery query, XpathUpdate update) {
if(updates > 0){
//no need to put a new version if no data was modified
row.rowData.put(txId, newData);
- if(newData.commitId == -1)
+ if(newData.commitId == -1 && (tx != null || this.getTransactionManager().anyOpenTransactions()))
this.uncommittedRows.put(row.rowId, row);
}
@@ -330,12 +336,14 @@ public boolean upsertRow(String id, Element data) {
//takes care of the "or update"
existingRow.rowData.put(txId, newData);
- this.uncommittedRows.put(id, existingRow);
+ if(tx != null || this.getTransactionManager().anyOpenTransactions())
+ this.uncommittedRows.put(id, existingRow);
}
}
else{
didInsert = true;
- this.uncommittedRows.put(id, newRow);
+ if(tx != null || this.getTransactionManager().anyOpenTransactions())
+ this.uncommittedRows.put(id, newRow);
}
}
@@ -377,7 +385,8 @@ public void deleteRow(String id) throws KeyNotFoundException {
//a RowData that is null means it was deleted.
row.rowData.put(txId, newData);
- this.uncommittedRows.put(row.rowId, row);
+ if(tx != null || this.getTransactionManager().anyOpenTransactions())
+ this.uncommittedRows.put(row.rowId, row);
}
setLastActivity(System.currentTimeMillis());
@@ -418,7 +427,8 @@ public int deleteAll(XpathQuery query) {
newData.commitId = txId;
}
row.rowData.put(txId, newData);
- this.uncommittedRows.put(row.rowId, row);
+ if(tx != null || this.getTransactionManager().anyOpenTransactions())
+ this.uncommittedRows.put(row.rowId, row);
numRemoved++;
}
@@ -438,40 +448,30 @@ public int deleteAll(XpathQuery query) {
//</editor-fold>
+
+
private void update(){
-
- Set<Row> rowsToRemove = new HashSet<>();
- Set<Long> remainingTransactions = new HashSet<>();
-
- Iterator<Row> it = this.uncommittedRows.values().iterator();
- while(it.hasNext()){
- Row row = it.next();
- synchronized(row){
- it.remove();
-
- if(row.cleanup()){
- rowsToRemove.add(row);
- }
- else{
- //remember the remaining transactions
- for(RowData data : row.rowData.values()){
- if(data.commitId == -1){
- remainingTransactions.add(data.transactionId);
- }
- }
+ synchronized(syncRoot){
+ if(this.currentlyCommitting.get() != -1){
+ if(this.getTransactionManager().isTransactionCommitted(this.currentlyCommitting.get()) == -1 &&
+ !this.getTransactionManager().isTransactionReverted(this.currentlyCommitting.get())){
+ //the transaction is neither committed nor reverted, it is in the process of committing.
+ //We'll have to come back to this update later when it is finished.
+ return;
}
}
- }
-
- if(rowsToRemove.size() > 0){
- //we have to lock the table in order to actually remove any rows.
- try{
- this.getTableLock();
-
- for(Row row : rowsToRemove){
- //doublecheck - do another cleanup, don't want to be sloppy here.
+
+ Set<Row> rowsToRemove = new HashSet<>();
+ Set<Long> remainingTransactions = new HashSet<>();
+
+ Iterator<Row> it = this.uncommittedRows.values().iterator();
+ while(it.hasNext()){
+ Row row = it.next();
+ synchronized(row){
+ it.remove();
+
if(row.cleanup()){
- this.cache.remove(row.rowId);
+ rowsToRemove.add(row);
}
else{
//remember the remaining transactions
@@ -483,15 +483,107 @@ private void update(){
}
}
}
- finally{
- this.releaseTableLock();
+
+ if(rowsToRemove.size() > 0){
+ //we have to lock the table in order to actually remove any rows.
+ try{
+ this.getTableLock();
+
+ for(Row row : rowsToRemove){
+ //doublecheck - do another cleanup, don't want to be sloppy here.
+ if(row.cleanup()){
+ this.cache.remove(row.rowId);
+ }
+ else{
+ //remember the remaining transactions
+ for(RowData data : row.rowData.values()){
+ if(data.commitId == -1){
+ remainingTransactions.add(data.transactionId);
+ }
+ }
+ }
+ }
+ }
+ finally{
+ this.releaseTableLock();
+ }
}
+
+ //unbind the engine from all transactions except the remaining transactions
+ this.getTransactionManager().unbindEngineExceptFrom(this, remainingTransactions);
}
-
- //unbind the engine from all transactions except the remaining transactions
- this.getTransactionManager().unbindEngineExceptFrom(this, remainingTransactions);
}
+ private AtomicLong currentlyCommitting = new AtomicLong(-1);
+
+ @Override
+ public void commit(Transaction tx){
+ synchronized(syncRoot){
+ if(!currentlyCommitting.compareAndSet(-1, tx.getTransactionId())){
+ //see if this transaction is completely finished committing, or if it reverted
+ if(this.getTransactionManager().isTransactionCommitted(tx.getTransactionId()) == -1){
+ throw new IllegalStateException("Cannot commit two transactions simultaneously");
+ }
+ else{
+ //the transaction successfully committed, we can move on.
+ currentlyCommitting.set(-1);
+ }
+ }
+
+ Iterator<Row> it = this.uncommittedRows.values().iterator();
+ while(it.hasNext()){
+ Row row = it.next();
+
+ this.log.info("committing row " + row.rowId);
+ synchronized(row){
+ //don't remove the row, only do that in cleanup.
+ //We don't want to cleanup cause we still might need the old data,
+ //just set the transaction status to committed.
+
+ RowData got = row.rowData.get(tx.getTransactionId());
+ if(got != null){
+ got.commitId = tx.getCommitId();
+ }
+ }
+ }
+
+ //we must immediately dump the cache, we cannot say we are committed
+ //until the data is on disk.
+ dumpCacheNow();
+ }
+ }
+
+ @Override
+ public void revert(long txId, boolean isRecovering){
+ synchronized(syncRoot){
+ boolean mustDump = false;
+
+ Iterator<Row> it = this.uncommittedRows.values().iterator();
+ while(it.hasNext()){
+ Row row = it.next();
+ synchronized(row){
+ //remove the row data, since it's now uncommitted.
+
+ RowData got = row.rowData.remove(txId);
+ if(got.commitId != -1){
+ //this transaction was persisted to the DB. We're going to need
+ //to dump the cache at the end.
+ mustDump = true;
+ }
+ }
+ }
+
+ if(mustDump){
+ this.dumpCacheNow();
+ }
+ //else we can leave dumping the cache for the cleanup task.
+
+ //reset the currently committing if that was set
+ currentlyCommitting.compareAndSet(txId, -1);
+ }
+ }
+
+
@Override
protected boolean spinUp() {
if(!this.state.compareAndSet(EngineState.Uninitialized, EngineState.SpinningUp)){
@@ -834,7 +926,7 @@ private void dumpCacheNow(){
@Override
protected boolean hasUncomittedData() {
- return this.uncommittedRows == null ? true : this.uncommittedRows.isEmpty();
+ return this.uncommittedRows == null ? false : !this.uncommittedRows.isEmpty();
}
View
239 java/XFlat/src/org/gburgett/xflat/transaction/ThreadContextTransactionManager.java
@@ -4,17 +4,29 @@
*/
package org.gburgett.xflat.transaction;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.logging.LogFactory;
+import org.gburgett.xflat.XflatException;
+import org.gburgett.xflat.convert.ConversionException;
+import org.gburgett.xflat.convert.Converter;
import org.gburgett.xflat.db.EngineBase;
import org.gburgett.xflat.db.EngineTransactionManager;
+import org.gburgett.xflat.db.XFlatDatabase;
+import org.gburgett.xflat.util.DocumentFileWrapper;
+import org.jdom2.Document;
+import org.jdom2.Element;
+import org.jdom2.JDOMException;
/**
*
@@ -22,14 +34,24 @@
*/
public class ThreadContextTransactionManager extends EngineTransactionManager {
- private Map<Thread, ThreadedTransaction> currentTransactions = new ConcurrentHashMap<>();
+ private Map<Long, ThreadedTransaction> currentTransactions = new ConcurrentHashMap<>();
private Map<Long, ThreadedTransaction> committedTransactions = new ConcurrentHashMap<>();
+ private DocumentFileWrapper journalWrapper;
+ private Document transactionJournal = null;
+
+ public ThreadContextTransactionManager(DocumentFileWrapper wrapper){
+ this.journalWrapper = wrapper;
+ }
+
+ protected Long getContextId(){
+ return Thread.currentThread().getId();
+ }
@Override
public Transaction getTransaction() {
- return currentTransactions.get(Thread.currentThread());
+ return currentTransactions.get(getContextId());
}
@Override
@@ -39,12 +61,12 @@ public Transaction openTransaction() {
@Override
public Transaction openTransaction(TransactionOptions options) {
- if(currentTransactions.get(Thread.currentThread()) != null){
+ if(currentTransactions.get(getContextId()) != null){
throw new IllegalStateException("Transaction already open on current thread.");
}
ThreadedTransaction ret = new ThreadedTransaction(generateNewId(), options);
- if(currentTransactions.put(Thread.currentThread(), ret) != null){
+ if(currentTransactions.put(getContextId(), ret) != null){
//how could this happen? I dunno, programs surprise me all the time.
throw new IllegalStateException("Transaction already open on current thread");
}
@@ -57,7 +79,7 @@ public long isTransactionCommitted(long transactionId) {
ThreadedTransaction tx = committedTransactions.get(transactionId);
return tx == null ? -1 : tx.getCommitId();
}
-
+
@Override
public boolean isTransactionReverted(long transactionId) {
//if we find it in the current transactions, check the transaction
@@ -97,16 +119,22 @@ public long getLowestOpenTransaction() {
@Override
public void bindEngineToCurrentTransaction(EngineBase engine) {
- ThreadedTransaction tx = currentTransactions.get(Thread.currentThread());
+ ThreadedTransaction tx = currentTransactions.get(getContextId());
if(tx == null){
return;
}
+ //we can get away with just adding it in an unsynchronized context because
+ //this is never going to be called at the same time as unbind, since unbind
+ //always happens in the context of a commit or revert (which is the same thread as this)
+ //or another thread's cleanup after the transaction is closed.
tx.boundEngines.add(engine);
+
+ LogFactory.getLog(getClass()).trace("engines bound to TX " + tx.getTransactionId() + ": " + tx.boundEngines.size());
}
@Override
- public void unbindEngineFromTransaction(EngineBase engine, Long transactionId) {
+ public synchronized void unbindEngineFromTransaction(EngineBase engine, Long transactionId) {
ThreadedTransaction tx = null;
for(ThreadedTransaction t : this.currentTransactions.values()){
if(t.getTransactionId() == transactionId){
@@ -133,7 +161,7 @@ public void unbindEngineFromTransaction(EngineBase engine, Long transactionId) {
}
@Override
- public void unbindEngineExceptFrom(EngineBase engine, Collection<Long> transactionIds) {
+ public synchronized void unbindEngineExceptFrom(EngineBase engine, Collection<Long> transactionIds) {
for(ThreadedTransaction tx : this.currentTransactions.values()){
if(transactionIds.contains(tx.getTransactionId())){
continue;
@@ -159,8 +187,127 @@ public void unbindEngineExceptFrom(EngineBase engine, Collection<Long> transacti
}
}
}
+
+ @Override
+ public boolean anyOpenTransactions() {
+ return !this.currentTransactions.isEmpty();
+ }
+
+ private void loadJournal() throws IOException, JDOMException{
+ transactionJournal = journalWrapper.readFile();
+ if(transactionJournal == null){
+ transactionJournal = new Document();
+ transactionJournal.setRootElement(new Element("transactionJournal"));
+ }
+ }
+ private synchronized void commit(ThreadedTransaction tx){
+ //journal the entry so we can recover if catastrophic failure occurs
+ TransactionJournalEntry entry = new TransactionJournalEntry();
+ entry.txId = tx.id;
+ entry.commitId = tx.commitId;
+ for(EngineBase e : tx.boundEngines){
+ entry.tableNames.add(e.getTableName());
+ }
+
+ try {
+ if(transactionJournal == null){
+ loadJournal();
+ }
+ transactionJournal.getRootElement().addContent(toElement.convert(entry));
+ journalWrapper.writeFile(transactionJournal);
+ } catch (ConversionException | IOException | JDOMException ex) {
+ throw new TransactionException("Unable to commit, could not access journal file " + journalWrapper, ex);
+ }
+
+ //commit all, and if any fail revert all.
+ try{
+ for(EngineBase e : tx.boundEngines){
+ LogFactory.getLog(getClass()).trace("committing to bound engine " + e.getTableName());
+ e.commit(tx);
+ }
+ }catch(Exception ex){
+ try{
+ //uncommit
+ tx.commitId = -1;
+ revert(tx.boundEngines, tx.getTransactionId(), false);
+ }catch(TransactionException ex2){
+ throw new TransactionException("Unable to commit, " + ex2.getMessage(), ex);
+ }
+ throw new TransactionException("Unable to commit", ex);
+ }
+
+ //we're all committed, so we can finally say so.
+ committedTransactions.put(tx.id, tx);
+ }
+ private void revert(Iterable<EngineBase> boundEngines, long txId, boolean isRecovering){
+ Set<String> failedReverts = null;
+ Exception last = null;
+ for(EngineBase e : boundEngines){
+
+ try{
+ e.revert(txId, isRecovering);
+ }catch(Exception ex){
+ LogFactory.getLog(getClass()).error(ex);
+ if(failedReverts == null)
+ failedReverts = new HashSet<>();
+ failedReverts.add(e.getTableName());
+ last = ex;
+ }
+ }
+ if(failedReverts != null && failedReverts.size() > 0){
+ StringBuilder msg = new StringBuilder("Unable to revert all bound engines, the data in the following engines may be corrupt: ");
+ for(String s : failedReverts){
+ msg.append(s).append(", ");
+ }
+ throw new TransactionException(msg.toString(), last);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void recover(XFlatDatabase db) {
+ //open the journal
+
+ try {
+ if(transactionJournal == null){
+ loadJournal();
+ }
+ } catch (IOException | JDOMException ex) {
+ throw new XflatException("Unable to recover, could not access journal file " + journalWrapper, ex);
+ }
+
+ try{
+ Iterator<Element> children = transactionJournal.getRootElement().getChildren().iterator();
+ while(children.hasNext()){
+ TransactionJournalEntry entry;
+ try {
+ entry = fromElement.convert(children.next());
+ } catch (ConversionException ex) {
+ //entry is corrupt, remove and continue
+ children.remove();
+ continue;
+ }
+
+ List<EngineBase> toRevert = new ArrayList<>();
+ for(String table : entry.tableNames){
+ toRevert.add(db.getEngine(table));
+ }
+
+ //revert the transaction in all the engines
+ revert(toRevert, entry.txId, true);
+
+ //save the journal after each successful revert
+ this.journalWrapper.writeFile(transactionJournal);
+ }
+ }catch(TransactionException | IOException ex){
+ throw new XflatException("Unable to recover", ex);
+ }
+ }
/**
* A Transaction that is meant to exist within the context of one thread.
@@ -178,7 +325,10 @@ public void unbindEngineExceptFrom(EngineBase engine, Collection<Long> transacti
private AtomicReference<Set<TransactionListener>> listeners = new AtomicReference<>(null);
- final Set<EngineBase> boundEngines = new ConcurrentSkipListSet<>();
+ //we can get away with this being an unsynchronized HashSet because it will only ever be added to by one
+ //thread, and then only so long as the transaction is open, and then will be removed from
+ //by a different thread, but only one at a time, synchronized elsewhere, and after all adds are finished.
+ final Set<EngineBase> boundEngines = new HashSet<>();
private long commitId = -1;
@Override
@@ -204,20 +354,19 @@ public void commit() throws TransactionException {
}
commitId = generateNewId();
- committedTransactions.put(id, this);
+ ThreadContextTransactionManager.this.commit(this);
+ //soon as commit returns, we are committed.
+ this.isCompleted.set(true);
}
@Override
- public void rollback() {
+ public void revert() {
if(this.isCompleted.get()){
throw new IllegalTransactionStateException("Cannot rollback a completed transaction");
}
- doRollback();
- }
-
- private void doRollback(){
- //do nothing
+ ThreadContextTransactionManager.this.revert(this.boundEngines, this.id, false);
+ this.isCompleted.set(true);
}
@Override
@@ -233,7 +382,8 @@ public long getTransactionId() {
@Override
public void close() {
if(isCompleted.compareAndSet(false, true)){
- doRollback();
+ //we completed in the close, need to revert.
+ ThreadContextTransactionManager.this.revert(this.boundEngines, this.id, false);
}
//remove the transaction from the current transactions map
@@ -252,7 +402,7 @@ public boolean isCommitted() {
@Override
public boolean isReverted() {
- return isCompleted.get() && commitId > -1;
+ return isCompleted.get() && commitId == -1;
}
@Override
@@ -278,4 +428,55 @@ public void removeTransactionListener(TransactionListener listener) {
}
}
+
+ private class TransactionJournalEntry{
+ public long txId;
+ public long commitId;
+
+ public List<String> tableNames = new ArrayList<>();
+ }
+
+ private Converter<TransactionJournalEntry, Element> toElement = new Converter<TransactionJournalEntry, Element>(){
+ @Override
+ public Element convert(TransactionJournalEntry source) throws ConversionException {
+ Element ret = new Element("entry");
+ ret.setAttribute("txId", Long.toString(source.txId));
+ ret.setAttribute("commit", Long.toString(source.commitId));
+
+ for(String s : source.tableNames){
+ ret.addContent(new Element("table").setText(s));
+ }
+
+ return ret;
+ }
+ };
+
+ private Converter<Element, TransactionJournalEntry> fromElement = new Converter<Element, TransactionJournalEntry>(){
+ @Override
+ public TransactionJournalEntry convert(Element source) throws ConversionException {
+ TransactionJournalEntry ret = new TransactionJournalEntry();
+
+ try{
+ String txId = source.getAttributeValue("txId");
+ if(txId == null){
+ throw new ConversionException("txId attribute required");
+ }
+ ret.txId = Long.parseLong(txId);
+
+ String commitId = source.getAttributeValue("commit");
+ if(commitId != null){
+ ret.commitId = Long.parseLong(commitId);
+ }
+
+ for(Element e : source.getChildren("table")){
+ ret.tableNames.add(e.getText());
+ }
+ }
+ catch(NumberFormatException ex){
+ throw new ConversionException("Conversion failure", ex);
+ }
+
+ return ret;
+ }
+ };
}
View
2  java/XFlat/src/org/gburgett/xflat/transaction/Transaction.java
@@ -14,7 +14,7 @@
void commit() throws TransactionException;
- void rollback();
+ void revert();
void setRollbackOnly();
View
6 java/XFlat/src/org/gburgett/xflat/transaction/TransactionException.java
@@ -16,7 +16,11 @@
*
* @param msg the detail message.
*/
- TransactionException(String msg) {
+ protected TransactionException(String msg, Exception inner) {
+ super(msg, inner);
+ }
+
+ protected TransactionException(String msg) {
super(msg);
}
}
View
301 java/XFlat/test/org/gburgett/xflat/db/EngineTestsBase.java
@@ -6,6 +6,7 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -24,10 +25,12 @@
import org.gburgett.xflat.convert.DefaultConversionService;
import org.gburgett.xflat.convert.converters.JDOMConverters;
import org.gburgett.xflat.convert.converters.StringConverters;
-import org.gburgett.xflat.db.EngineState;
import org.gburgett.xflat.query.XpathQuery;
import org.gburgett.xflat.query.XpathUpdate;
-import org.gburgett.xflat.transaction.ThreadContextTransactionManager;
+import org.gburgett.xflat.transaction.FakeThreadContextTransactionManager;
+import org.gburgett.xflat.transaction.Transaction;
+import org.gburgett.xflat.transaction.TransactionOptions;
+import org.gburgett.xflat.util.FakeDocumentFileWrapper;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
@@ -48,7 +51,6 @@
*/
public abstract class EngineTestsBase<TEngine extends EngineBase> {
- protected static ScheduledExecutorService executorService;
protected static ConversionService conversionService;
protected static org.jdom2.xpath.XPathFactory xpath = org.jdom2.xpath.XPathFactory.instance();
@@ -57,7 +59,6 @@
@BeforeClass
public static void setUpClass() {
- executorService = new ScheduledThreadPoolExecutor(4);
conversionService = new DefaultConversionService();
StringConverters.registerTo(conversionService);
JDOMConverters.registerTo(conversionService);
@@ -144,7 +145,7 @@ public void spinDownComplete(EngineBase.SpinDownEvent event) {
if(synchronous){
ScheduledFuture<?> timeout =
- executorService.schedule(new Runnable(){
+ ctx.executorService.schedule(new Runnable(){
@Override
public void run() {
if(engine.getState() == EngineState.SpunDown){
@@ -176,7 +177,8 @@ public void run() {
throw new TimeoutException("spin down timed out with engine state " + didTimeOut.get());
}
else{
- timeout.cancel(true);
+ if(timeout != null)
+ timeout.cancel(true);
}
}
@@ -193,7 +195,7 @@ protected void verifySpinDownComplete(TestContext ctx) throws InterruptedExcepti
private TEngine setupEngine(TestContext ctx){
TEngine instance = this.createInstance(ctx);
- instance.setExecutorService(executorService);
+ instance.setExecutorService(ctx.executorService);
instance.setConversionService(conversionService);
instance.setTransactionManager(ctx.transactionManager);
@@ -248,6 +250,7 @@ protected Element findId(Iterable<Element> rows, String id){
//<editor-fold desc="tests">
+ //<editor-fold desc="transactionless">
@Test
public void testInsert_NoValuesYet_Inserts() throws Exception {
System.out.println("testInsert_NoValuesYet_Inserts");
@@ -288,8 +291,6 @@ public void testInsert_HasValues_Inserts() throws Exception {
TestContext ctx = getContext();
-
-
Document inFile = Utils.makeDocument(ctx.instance.getTableName(),
new Element("data").setText("other text data")
);
@@ -868,8 +869,6 @@ public void testUpdate_MatchingRowHasUpdateableField_FieldIsUpdated() throws Exc
TestContext ctx = getContext();
-
-
Document inFile = Utils.makeDocument(ctx.instance.getTableName(),
new Element("other").setText("other text data"),
new Element("third")
@@ -1205,6 +1204,247 @@ public void testDeleteAll_MatchesMultiple_MultipleDeleted() throws Exception {
//</editor-fold>
+ //<editor-fold desc="transactional">
+
+ @Test
+ public void testUpdate_InTransaction_RevertRemovesData() throws Exception {
+ System.out.println("testUpdate_InTransaction_RevertRemovesData");
+
+ TestContext ctx = getContext();
+
+ Document inFile = Utils.makeDocument(ctx.instance.getTableName(),
+ new Element("third")
+ .setAttribute("fooInt", "17")
+ .setText("third text data")
+ );
+
+ prepFileContents(ctx, inFile);
+ spinUp(ctx);
+
+ XpathQuery query = XpathQuery.eq(xpath.compile("*/@fooInt"), 17);
+ XpathUpdate update = XpathUpdate.set(xpath.compile("third"), "updated text");
+
+ //ACT
+ try(Transaction tx = ctx.transactionManager.openTransaction()){
+
+ int result = ctx.instance.update(query, update);
+
+ assertEquals("Should update in TX", 1, result);
+
+ Element row = ctx.instance.readRow("0");
+ assertEquals("Should update in TX", "updated text", row.getValue());
+
+ tx.revert();
+
+ row = ctx.instance.readRow("0");
+ assertEquals("Should have reverted TX", "third text data", row.getValue());
+ }
+
+ spinDown(ctx);
+
+ Document doc = getFileContents(ctx);
+ List<Element> children = doc.getRootElement().getChildren("row", XFlatDatabase.xFlatNs);
+
+ assertEquals("Should have reverted data", "third text data", children.get(0).getChild("third").getText());
+ }
+
+ @Test
+ public void testReplaceRow_InTransaction_CommitModifiesData() throws Exception {
+ System.out.println("testReplaceRow_InTransaction_CommitModifiesData");
+ TestContext ctx = getContext();
+
+ Document inFile = Utils.makeDocument(ctx.instance.getTableName(),
+ new Element("third")
+ .setAttribute("fooInt", "17")
+ .setText("third text data")
+ );
+
+ prepFileContents(ctx, inFile);
+ ctx.executorService = mock(ScheduledExecutorService.class);
+
+ spinUp(ctx);
+
+ Element fourth = new Element("fourth")
+ .setAttribute("fooInt", "17")
+ .setText("fourth text data");
+
+ //ACT
+ try(Transaction tx = ctx.transactionManager.openTransaction()){
+
+ ctx.instance.replaceRow("0", fourth);
+
+ Element row = ctx.instance.readRow("0");
+ assertEquals("Should update in TX", "fourth text data", row.getValue());
+
+ tx.commit();
+
+ row = ctx.instance.readRow("0");
+ assertEquals("Should have not reverted TX", "fourth text data", row.getValue());
+ }
+
+ spinDown(ctx);
+
+ Document doc = getFileContents(ctx);
+ List<Element> children = doc.getRootElement().getChildren("row", XFlatDatabase.xFlatNs);
+
+ assertEquals("Should have committed data", "fourth text data", children.get(0).getChild("fourth").getText());
+ }
+
+ @Test
+ public void testDeleteRow_InTransaction_RevertReturnsRow() throws Exception {
+ System.out.println("testDeleteRow_InTransaction_RevertReturnsRow");
+
+ TestContext ctx = getContext();
+
+ Document inFile = Utils.makeDocument(ctx.instance.getTableName(),
+ new Element("third")
+ .setAttribute("fooInt", "17")
+ .setText("third text data")
+ );
+
+ prepFileContents(ctx, inFile);
+ spinUp(ctx);
+
+ //ACT
+ try(Transaction tx = ctx.transactionManager.openTransaction()){
+
+ ctx.instance.deleteRow("0");
+
+ Element row = ctx.instance.readRow("0");
+ assertNull("Should delete in TX", row);
+
+ tx.revert();
+
+ row = ctx.instance.readRow("0");
+ assertNotNull("Should have reverted TX", row);
+ assertEquals("Should have reverted TX", "third text data", row.getValue());
+ }
+
+ spinDown(ctx);
+
+ Document doc = getFileContents(ctx);
+ List<Element> children = doc.getRootElement().getChildren("row", XFlatDatabase.xFlatNs);
+
+ assertEquals("Should have reverted data", "third text data", children.get(0).getChild("third").getText());
+ }
+
+ @Test
+ public void testInsert_InTransaction_HasReadIsolation() throws Exception {
+ System.out.println("testInsert_InTransaction_HasReadIsolation");
+
+ TestContext ctx = getContext();
+
+ prepFileContents(ctx, null);
+ spinUp(ctx);
+
+ Element outsideTransaction;
+ Element insideTransaction;
+ try(Transaction tx = ctx.transactionManager.openTransaction()){
+
+ Element rowData = new Element("data").setText("some text data");
+
+ //ACT
+ ctx.instance.insertRow("1", rowData);
+
+ //swap out the transaction
+ ctx.transactionManager.setContextId(1L);
+ outsideTransaction = ctx.instance.readRow("1");
+
+ //swap the transaction back
+ ctx.transactionManager.setContextId(0L);
+ insideTransaction = ctx.instance.readRow("1");
+
+ tx.commit();
+ }
+
+ assertNull("Outside the TX, should have no data", outsideTransaction);
+
+ assertEquals("Inside the TX, should have the data", "data", insideTransaction.getName());
+ assertEquals("Inside the TX, should have the data", "some text data", insideTransaction.getText());
+
+ Element fromEngine = ctx.instance.readRow("1");
+ assertEquals("Should have committed TX data", "data", fromEngine.getName());
+ assertEquals("Should have committed TX data", "some text data", fromEngine.getText());
+
+
+ spinDown(ctx);
+ }
+
+ @Test
+ public void testQuery_InTransaction_HasReadIsolation() throws Exception {
+ System.out.println("testQuery_InTransaction_HasReadIsolation");
+
+ TestContext ctx = getContext();
+
+ Document inFile = Utils.makeDocument(ctx.instance.getTableName(),
+ new Element("other")
+ .setAttribute("fooInt", "17")
+ .setText("other text data"),
+ new Element("third")
+ .setAttribute("fooInt", "17")
+ .setText("third text data"),
+ new Element("fourth")
+ .setAttribute("fooInt", "18")
+ .setText("fourth text data")
+ );
+ prepFileContents(ctx, inFile);
+
+ spinUp(ctx);
+
+ XpathQuery query = XpathQuery.eq(xpath.compile("*/@fooInt"), 17);
+
+
+ List<Element> fromCursor = new ArrayList<>();
+ try(Transaction tx = ctx.transactionManager.openTransaction(TransactionOptions.Default.setReadOnly(true))){
+
+ Element rowData = new Element("data")
+ .setAttribute("fooInt", "17")
+ .setText("some text data");
+
+ //now that we've opened the transaction, switch contexts and insert
+ ctx.transactionManager.setContextId(1L);
+ ctx.instance.insertRow("4", rowData);
+
+ //ACT
+ //switch back and query
+ ctx.transactionManager.setContextId(0L);
+ try(Cursor<Element> cursor = ctx.instance.queryTable(query)){
+ Iterator<Element> it = cursor.iterator();
+
+ fromCursor.add(it.next());
+
+
+ //switch contexts and insert again
+ ctx.transactionManager.setContextId(1L);
+ ctx.instance.insertRow("5", new Element("fifth")
+ .setAttribute("fooInt", "17")
+ .setText("fifth text data"));
+
+
+ ctx.transactionManager.setContextId(0L);
+ while(it.hasNext()){
+ fromCursor.add(it.next());
+ }
+ }
+ }
+
+ assertEquals("Should have cursored over 2 items", 2, fromCursor.size());
+ assertThat("should have correct 2 items", fromCursor,
+ Matchers.containsInAnyOrder(
+ Matchers.allOf(
+ isNamed("other"), hasText("other text data")
+ ),
+ Matchers.allOf(
+ isNamed("third"), hasText("third text data")
+ )));
+
+ spinDown(ctx);
+ }
+
+ //</editor-fold>
+
+ //</editor-fold>
+
private Matcher<Element> hasChildThat(final String childName, final Matcher<Element> matcher){
return new TypeSafeMatcher<Element>(){
@Override
@@ -1250,6 +1490,38 @@ public void describeTo(Description description) {
};
}
+ private Matcher<Element> isNamed(final String name){
+ return new TypeSafeMatcher<Element>(){
+ @Override
+ protected boolean matchesSafely(Element item) {
+ return item.getName().equals(name);
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("is named ")
+ .appendText(name);
+ }
+
+ };
+ }
+
+ private Matcher<Element> hasText(final String text){
+ return new TypeSafeMatcher<Element>(){
+ @Override
+ protected boolean matchesSafely(Element item) {
+ return item.getText().equals(text);
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("has text ")
+ .appendText(text);
+ }
+
+ };
+ }
+
protected class TestContext{
public TEngine instance;
@@ -1259,10 +1531,15 @@ public void describeTo(Description description) {
public long id;
- public EngineTransactionManager transactionManager = new ThreadContextTransactionManager();
+ public AtomicReference<Document> transactionJournal = new AtomicReference<>(new Document().setRootElement(new Element("transactionJournal")));
+
+ public FakeThreadContextTransactionManager transactionManager = new FakeThreadContextTransactionManager(new FakeDocumentFileWrapper(transactionJournal));
public final Map<String, Object> additionalContext;
+
+ public ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2);
+
public TestContext(){
this.id = Thread.currentThread().getId();
additionalContext = new HashMap<>();
View
372 java/XFlat/test/org/gburgett/xflat/db/EngineTransactionManagerTestBase.java
@@ -0,0 +1,372 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.gburgett.xflat.db;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.gburgett.xflat.transaction.Transaction;
+import org.gburgett.xflat.transaction.TransactionException;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ *
+ * @author Gordon
+ */
+public abstract class EngineTransactionManagerTestBase {
+
+ public EngineTransactionManagerTestBase() {
+ }
+
+ @Before
+ public void setUp() {
+ }
+
+ @After
+ public void tearDown() {
+ }
+
+
+ protected abstract EngineTransactionManager getInstance();
+
+ @Test
+ public void testBeginTransaction_TransactionIsOpen() throws Exception {
+ System.out.println("testBeginTransaction_TransactionIsOpen");
+
+ try(EngineTransactionManager instance = getInstance())
+ {
+
+ Transaction tx = instance.openTransaction();
+
+ assertFalse("TX should not be committed", tx.isCommitted());
+ assertEquals("TX should not be committed", -1, instance.isTransactionCommitted(tx.getTransactionId()));
+ assertFalse("TX should not be reverted", tx.isReverted());
+ assertFalse("TX should not be reverted", instance.isTransactionReverted(tx.getTransactionId()));
+ assertTrue("Should be one open TX", instance.anyOpenTransactions());
+ assertEquals(tx.getTransactionId(), instance.getLowestOpenTransaction());
+
+
+
+ tx.commit();
+
+ assertTrue("TX should be committed", tx.isCommitted());
+ assertEquals("TX should be committed", tx.getCommitId(), instance.isTransactionCommitted(tx.getTransactionId()));
+ assertFalse("TX should not be reverted", tx.isReverted());
+ assertFalse("TX should not be reverted", instance.isTransactionReverted(tx.getTransactionId()));
+ assertThat("TX should have higher commit ID", tx.getCommitId(), Matchers.greaterThan(tx.getTransactionId()));
+
+ tx.close();
+
+ assertFalse("Should be no open TX", instance.anyOpenTransactions());
+ assertEquals(Long.MAX_VALUE, instance.getLowestOpenTransaction());
+
+ }
+ }
+
+ @Test
+ public void testRevertTransaction_TransactionIsReverted() throws Exception {
+ System.out.println("testRevertTransaction_TransactionIsReverted");
+
+ try(EngineTransactionManager instance = getInstance())
+ {
+
+ Transaction tx = instance.openTransaction();
+
+ tx.revert();
+
+ assertFalse("TX should not be committed", tx.isCommitted());
+ assertEquals("TX should not be committed", -1, instance.isTransactionCommitted(tx.getTransactionId()));
+ assertTrue("TX should be reverted", tx.isReverted());
+ assertTrue("TX should be reverted", instance.isTransactionReverted(tx.getTransactionId()));
+ assertEquals("TX should have no commit ID", -1, tx.getCommitId());
+
+ tx.close();
+
+ assertFalse("Should be no open TX", instance.anyOpenTransactions());
+ assertEquals(Long.MAX_VALUE, instance.getLowestOpenTransaction());
+ }
+ }
+
+ @Test
+ public void testGetTransaction_GetsCurrentTransaction() throws Exception {
+ System.out.println("testGetTransaction_GetsCurrentTransaction");
+
+ try(EngineTransactionManager instance = getInstance())
+ {
+
+ Transaction tx = instance.openTransaction();
+
+ Transaction current = instance.getTransaction();
+
+ assertEquals("Should be same transaction", tx.getTransactionId(), current.getTransactionId());
+
+ tx.close();
+
+ assertNull("getTransaction should be null after closing", instance.getTransaction());
+
+ }
+
+ }
+
+ @Test
+ public void testTransactionlessCommitId_BeforeAndAfterTx_LTAndGTTxId() throws Exception {
+ System.out.println("testTransactionlessCommitId_BeforeAndAfterTx_LTAndGTTxId");
+
+ try(EngineTransactionManager instance = getInstance())
+ {
+
+ long id1 = instance.transactionlessCommitId();
+
+ Transaction tx = instance.openTransaction();
+
+ long id2 = instance.transactionlessCommitId();
+
+ tx.close();
+
+ assertThat(id1, Matchers.lessThan(tx.getTransactionId()));
+ assertThat(id2, Matchers.greaterThan(tx.getTransactionId()));
+ }
+ }
+
+ @Test
+ public void testTransactionIDs_HeavyUse_ThreadSafe() throws Exception {
+ System.out.println("testTransactionIDs_HeavyUse_ThreadSafe");
+
+ try(final EngineTransactionManager instance = getInstance())
+ {
+ final Set<List<Long>> ids = Collections.synchronizedSet(new HashSet<List<Long>>());
+
+ Runnable r = new Runnable(){
+ @Override
+ public void run() {
+ List<Long> idList = new ArrayList<>(500);
+
+ for(int i = 0; i < 500; i++){
+ idList.add(instance.transactionlessCommitId());
+ }
+
+ ids.add(idList);
+ }
+ };
+
+ Thread th1 = new Thread(r);
+ Thread th2 = new Thread(r);
+ Thread th3 = new Thread(r);
+
+ long start = instance.transactionlessCommitId();
+
+ th1.start();
+ th2.start();
+ th3.start();
+ r.run();
+
+ th1.join();
+ th2.join();
+ th3.join();
+
+ long end = instance.transactionlessCommitId();
+
+
+ int maxUniquifier = 0;
+
+ Set<Long> finalIds = new HashSet<>();
+ for(List<Long> idList : ids){
+ for(Long l : idList){
+ assertTrue("Duplicate IDs generated", finalIds.add(l));
+ assertThat("id not greater than start", l, Matchers.greaterThan(start));
+ assertThat("id not less than end", l, Matchers.lessThan(end));
+
+ int i = (int)(l.longValue() & 0xFFFFL);
+ maxUniquifier = i > maxUniquifier ? i : maxUniquifier;
+ }
+ }
+
+ System.out.println("Max uniquifier: " + maxUniquifier);
+ }
+ }
+
+ @Test
+ public void testBindEngine_BoundEngineNotifiedOfRevert() throws Exception {
+ System.out.println("testBindEngine_BoundEngineNotifiedOfRevert");
+
+ try(EngineTransactionManager instance = getInstance())
+ {
+ Transaction tx = instance.openTransaction();
+
+ EngineBase e = mock(EngineBase.class);
+
+ instance.bindEngineToCurrentTransaction(e);
+
+ tx.revert();
+
+ verify(e).revert(tx.getTransactionId(), false);
+
+ tx.close();
+ }
+ }
+
+ @Test
+ public void testBindEngine_BoundEngineNotifiedOfCommit() throws Exception {
+ System.out.println("testBindEngine_BoundEngineNotifiedOfCommit");
+
+ try(EngineTransactionManager instance = getInstance())
+ {
+
+ Transaction tx = instance.openTransaction();
+
+ EngineBase e = mock(EngineBase.class);
+
+ instance.bindEngineToCurrentTransaction(e);
+
+ tx.commit();
+
+ verify(e).commit(tx);
+
+ tx.close();
+ }
+ }
+
+ @Test
+ public void testBindEngine_ExceptionDuringCommit_BoundEngineReverted() throws Exception {
+ System.out.println("testBindEngine_ExceptionDuringCommit_BoundEngineReverted");
+
+ try(EngineTransactionManager instance = getInstance())
+ {
+
+ Transaction tx = instance.openTransaction();
+
+ EngineBase e = mock(EngineBase.class);
+ doThrow(new TransactionException("Test"){})
+ .when(e).commit(any(Transaction.class));
+
+
+ instance.bindEngineToCurrentTransaction(e);
+
+ try{
+ tx.commit();
+ fail("Did not throw TransactionException");
+ }
+ catch(TransactionException ex){
+ //expected
+ }
+
+ verify(e).revert(tx.getTransactionId(), false);
+
+ tx.close();
+ }
+ }
+
+ @Test
+ public void testMultipleBoundEngines_SecondEngineThrows_FirstBoundEngineRevertedAfterCommit() throws Exception {
+ System.out.println("testMultipleBoundEngines_SecondEngineThrows_FirstBoundEngineRevertedAfterCommit");
+
+ try(EngineTransactionManager instance = getInstance())
+ {
+ Transaction tx = instance.openTransaction();
+
+ final AtomicReference<EngineBase> committedEngine = new AtomicReference<>(null);
+
+ Answer a = new Answer(){
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ EngineBase eng = (EngineBase)invocation.getMock();
+ if(!committedEngine.compareAndSet(null, eng)){
+ //the second one should throw
+ throw new TransactionException("Test"){};
+ }
+ return null;
+ }
+ };
+
+ EngineBase e = mock(EngineBase.class);
+ EngineBase e2 = mock(EngineBase.class);
+ doAnswer(a)
+ .when(e).commit(any(Transaction.class));
+ doAnswer(a)
+ .when(e2).commit(any(Transaction.class));
+
+ instance.bindEngineToCurrentTransaction(e);
+ instance.bindEngineToCurrentTransaction(e2);
+
+ try{
+ tx.commit();
+ fail("Did not throw TransactionException");
+ }
+ catch(TransactionException ex){
+ //expected
+ }
+
+ verify(e).revert(tx.getTransactionId(), false);
+ verify(e2).revert(tx.getTransactionId(), false);
+
+ verify(committedEngine.get()).commit(tx);
+
+ tx.close();
+
+ }
+ }
+
+ @Test
+ public void testBoundEngineFails_InstanceClosed_Recovers() throws Exception {
+ System.out.println("testBoundEngineFails_InstanceClosed_Recovers");
+
+ long txId;
+
+ try(EngineTransactionManager instance = getInstance())
+ {
+
+ Transaction tx = instance.openTransaction();
+ txId = tx.getTransactionId();
+
+ EngineBase e = mock(EngineBase.class);
+ doThrow(new TransactionException("Test"){})
+ .when(e).commit(any(Transaction.class));
+ doThrow(new Error("Expected"))
+ .when(e).revert(any(Long.class), anyBoolean());
+ doReturn("Name")
+ .when(e).getTableName();
+
+ instance.bindEngineToCurrentTransaction(e);
+
+ try{
+ tx.commit();
+ fail("Did not throw TransactionException");
+ }
+ catch(Error err){
+ if(!"Expected".equals(err.getMessage()))
+ throw err;
+
+ //expected
+ }
+
+ tx.close();
+ }
+
+ EngineBase e = mock(EngineBase.class);
+
+ XFlatDatabase db = mock(XFlatDatabase.class);
+ when(db.getEngine("Name"))
+ .thenReturn(e);
+
+ try(EngineTransactionManager instance = getInstance())
+ {
+ instance.recover(db);
+ }
+
+ //verify recovered
+ verify(e).revert(txId, true);
+ }
+
+}
View
9 java/XFlat/test/org/gburgett/xflat/engine/IdShardedEngineTest.java
@@ -11,6 +11,7 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.gburgett.xflat.ShardsetConfig;
@@ -28,6 +29,7 @@
import org.gburgett.xflat.query.IntervalProvider;
import org.gburgett.xflat.transaction.ThreadContextTransactionManager;
import org.gburgett.xflat.util.DocumentFileWrapper;
+import org.gburgett.xflat.util.FakeDocumentFileWrapper;
import org.jdom2.Document;
import org.jdom2.Element;
import org.jdom2.JDOMException;
@@ -56,7 +58,7 @@ protected IdShardedEngine createInstance(TestContext ctx) {
final Map<String, Document> docs = new ConcurrentHashMap<>();
ctx.additionalContext.put("docs", docs);
- XFlatDatabase db = new XFlatDatabase(workspace, executorService);
+ XFlatDatabase db = new XFlatDatabase(workspace, ctx.executorService);
db.extendConversionService(new PojoConverter(){
@Override
public ConversionService extend(ConversionService service) {
@@ -106,8 +108,8 @@ public void writeFile(String fileName, Document doc){
return ret;
}
});
- db.setTransactionManager(new ThreadContextTransactionManager());
-
+ db.setTransactionManager(ctx.transactionManager);
+
IntervalProvider provider = NumericIntervalProvider.forInteger(1, 100);
ctx.additionalContext.put("rangeProvider", provider);
ShardsetConfig cfg = ShardsetConfig.create(XpathQuery.Id, Integer.class, provider);
@@ -132,6 +134,7 @@ public void writeFile(String fileName, Document doc){
IdShardedEngine ret = new IdShardedEngine(file, name, cfg);
setMetadataFactory(ret, new TableMetadataFactory(db, file));
+
return ret;
}
View
30 java/XFlat/test/org/gburgett/xflat/transaction/FakeThreadContextTransactionManager.java
@@ -0,0 +1,30 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.gburgett.xflat.transaction;
+
+import org.gburgett.xflat.util.DocumentFileWrapper;
+
+/**
+ * Overrides the ThreadContextTransactionManager so the context ID can be swapped.
+ * The context ID is normally bound to the current thread's ID, this fake transaction manager
+ * can swap that out within a single thread.
+ * @author Gordon
+ */
+public class FakeThreadContextTransactionManager extends ThreadContextTransactionManager {
+ private long currentContextId = 0;
+
+ public FakeThreadContextTransactionManager(DocumentFileWrapper wrapper){
+ super(wrapper);
+ }
+
+ @Override
+ public Long getContextId(){
+ return currentContextId;
+ }
+
+ public void setContextId(Long contextId){
+ this.currentContextId = contextId;
+ }
+}
View
30 java/XFlat/test/org/gburgett/xflat/transaction/ThreadContextTransactionManagerTest.java
@@ -0,0 +1,30 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.gburgett.xflat.transaction;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.gburgett.xflat.db.EngineTransactionManagerTestBase;
+import org.gburgett.xflat.util.FakeDocumentFileWrapper;
+import org.jdom2.Document;
+import org.junit.After;
+
+/**
+ *
+ * @author Gordon
+ */
+public class ThreadContextTransactionManagerTest extends EngineTransactionManagerTestBase {
+
+ private AtomicReference<Document> doc = new AtomicReference<>(null);
+
+ @After
+ public void tearDown(){
+ doc.set(null);
+ }
+
+ @Override
+ public ThreadContextTransactionManager getInstance(){
+ return new ThreadContextTransactionManager(new FakeDocumentFileWrapper(doc));
+ }
+}
View
52 java/XFlat/test/org/gburgett/xflat/util/FakeDocumentFileWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.gburgett.xflat.util;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.jdom2.Document;
+
+/**
+ *
+ * @author Gordon
+ */
+public class FakeDocumentFileWrapper extends DocumentFileWrapper{
+
+ private AtomicReference<Document> doc;
+
+ public FakeDocumentFileWrapper(AtomicReference<Document> doc){
+ super(null, null, null);
+ this.doc = doc;
+ }
+
+ @Override
+ public boolean exists(){
+ return doc.get() != null;
+ }
+
+ @Override
+ public Document readFile(){
+ return doc.get();
+ }
+
+ @Override
+ public void writeFile(Document doc){
+ this.doc.set(doc);
+ }
+
+ @Override
+ public void writeFile(String fileName, Document doc){
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Document readFile(String fileName){
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString(){
+ return "mock";
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.