Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Almost got Transactions finished, a few more tests to fix, then I nee…

…d to write some.

Signed-off-by: gburgett <gordon.burgett@gmail.com>
  • Loading branch information...
commit f3a3613e2890602f039b0669ed07d5b8dbb9ee2f 1 parent 3bcc009
@gburgett authored
View
12 java/XFlat/src/org/gburgett/xflat/Database.java
@@ -5,6 +5,7 @@
package org.gburgett.xflat;
import org.gburgett.xflat.Table;
+import org.gburgett.xflat.transaction.TransactionManager;
/**
* An interface for a Database managing one or more Tables.
@@ -14,7 +15,7 @@
/**
* Gets a table that converts the data to the persistent class.
- *
+ * <p/>
* The table will be named with the class' {@link Class#getSimpleName() simple name}.
* @param <T> The generic class of the persistentType
* @param persistentClass The persistent class, which the Database
@@ -25,7 +26,7 @@
/**
* Gets the named table that converts the data to the given persistentClass.
- *
+ * <p/>
* Multiple different classes of data can be stored in the same named Table provided
* certain conditions are met.
* @param <T> The generic class of the persistentType
@@ -35,4 +36,11 @@
* @return A table for manipulating rows of the persistent class.
*/
public <T> Table<T> getTable(Class<T> persistentClass, String name);
+
+ /**
+ * Gets the database's {@link TransactionManager}. The TransactionManager
+ * allows opening transactions in the database.
+ * @return The database's TransactionManager.
+ */
+ public TransactionManager getTransactionManager();
}
View
306 java/XFlat/src/org/gburgett/xflat/db/EngineBase.java
@@ -4,20 +4,23 @@
*/
package org.gburgett.xflat.db;
+import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.gburgett.xflat.EngineStateException;
+import org.gburgett.xflat.XflatException;
import org.gburgett.xflat.convert.ConversionService;
+import org.gburgett.xflat.db.EngineBase.RowData;
import org.gburgett.xflat.transaction.Transaction;
import org.gburgett.xflat.transaction.TransactionManager;
-import org.jdom2.Attribute;
import org.jdom2.Element;
/**
@@ -150,23 +153,181 @@ protected void setConversionService(ConversionService conversionService) {
this.conversionService = conversionService;
}
- private TransactionManager transactionManager;
+ private EngineTransactionManager transactionManager;
/**
* Gets the transactionManager.
*/
- public TransactionManager getTransactionManager(){
+ protected EngineTransactionManager getTransactionManager(){
return this.transactionManager;
}
/**
* Sets the transactionManager.
*/
- public void setTransactionManager(TransactionManager transactionManager){
+ protected void setTransactionManager(EngineTransactionManager transactionManager){
this.transactionManager = transactionManager;
}
//</editor-fold>
+ private final AtomicLong tableLock = new AtomicLong(-1);
+ private int tableLockCount = 0;
+ private final Object tableLockSync = new Object();
+
+ private final AtomicInteger writesInProgress = new AtomicInteger(0);
+
+ /**
+ * Called before every write to ensure we are ready to write. <br/>
+ * This method also checks if there is a current table lock, and increments
+ * the {@link #writesInProgress} counter.
+ * <p/>
+ * If the engine is spinning down then we throw because engines are read-only
+ * when spinning down.
+ */
+ protected void ensureWriteReady(){
+ //check if there is a write lock on the table
+ long tblLock = tableLock.get();
+ if(tblLock != -1 && tblLock != Thread.currentThread().getId()){
+ synchronized(tableLockSync){
+ tblLock = tableLock.get();
+ while(tblLock != -1 && tblLock != Thread.currentThread().getId()){
+ try {
+ tableLockSync.wait();
+ } catch (InterruptedException ex) {
+ }
+
+ tblLock = tableLock.get();
+ }
+ }
+ }
+
+ //check the engine state
+ EngineState state = this.state.get();
+ if(state == EngineState.SpunDown ||
+ state == EngineState.SpinningDown){
+ throw new EngineStateException("Write operations not supported on an engine that is spinning down", state);
+ }
+
+ //we're about to write, so the engine must be bound to the current transaction
+ this.transactionManager.bindEngineToCurrentTransaction(this);
+
+ //increment the number of writes in progress
+ int inprog = this.writesInProgress.incrementAndGet();
+ if(inprog < 1){
+ //dunno how we got here, try to correct
+ this.writesInProgress.compareAndSet(inprog, 1);
+ log.info(String.format("Writes in progress was less than 1: %d", inprog));
+ }
+ }
+
+ /**
+ * Called inside a finally block within every write operation -
+ * this is a synchronizing measure for write locks
+ */
+ protected void writeComplete(){
+ //decrement the number of writes in progress
+ int inprog = this.writesInProgress.decrementAndGet();
+ if(inprog < 0){
+ this.writesInProgress.compareAndSet(inprog, 0);
+ log.info(String.format("Writes in progress was less than 1: %d", inprog));
+ }
+ }
+
+ /**
+ * Obtains a write lock on the table for this thread.
+ * <p/>
+ * New write operations will block until the lock is released with {@link #releaseTableLock() }.
+ * This method will wait after obtaining the lock until all in-progress write operations
+ * have terminated.
+ * <p/>
+ * Since I don't exactly trust this to never throw an exception, it would of
+ * course be good practice to always use the following pattern:
+ * <pre>
+ * try{
+ * engine.getTableLock();
+ *
+ * //do stuff
+ * }
+ * finally{
+ * engine.releaseTableLock();
+ * }
+ * </pre>
+ */
+ protected void getTableLock(){
+ long thread = Thread.currentThread().getId();
+
+ if(this.tableLock.get() == thread){
+ this.tableLockCount++;
+ return;
+ }
+
+ synchronized(tableLockSync){
+ while(!this.tableLock.compareAndSet(-1, thread)){
+ if(this.tableLock.get() == thread){
+ this.tableLockCount++;
+ return;
+ }
+
+ try {
+ //wait until we can obtain the lock for this thread.
+ tableLockSync.wait();
+ } catch (InterruptedException ex) {
+ }
+ }
+ this.tableLockCount++;
+
+ //spin wait on writes in progress - this should only decrement while we have a write lock
+ long start = System.currentTimeMillis();
+ long nanos = System.nanoTime();
+ while(this.writesInProgress.get() > 0){
+
+ //if we've been waiting longer than 500ms something is amiss
+ if(System.currentTimeMillis() - start > 500){
+ //release the lock before throwing
+ this.tableLock.compareAndSet(thread, -1);
+ this.tableLockCount--;
+ throw new XflatException(String.format("Cannot obtain table lock - %d long running writes in progress", this.writesInProgress.get()));
+ }
+
+ //if we've been spin-waiting longer than 500ns then sleep the thread
+ if(System.nanoTime() - nanos > 500){
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException ex) {
+ }
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Releases a write lock on the table that was obtained by this thread.
+ * If the current thread did not own the lock then this method does nothing.
+ * <p/>
+ * ALWAYS call this in a finally block after calling {@link #getTableLock() }
+ */
+ protected void releaseTableLock(){
+ if(this.tableLock.get() != Thread.currentThread().getId()){
+ return;
+ }
+
+ synchronized(tableLockSync){
+ if(this.tableLock.get() != Thread.currentThread().getId()){
+ return;
+ }
+
+ if(--this.tableLockCount == 0){
+ //last reentrant release encountered
+ if(this.tableLock.compareAndSet(Thread.currentThread().getId(), -1)){
+ //notify of lock released
+ tableLockSync.notifyAll();
+ }
+ }
+ }
+ }
+
+
/**
* Saves metadata to the given element. Metadata is things like indexes
* and other configuration.
@@ -202,8 +363,7 @@ protected void setId(Element row, String id){
row.setAttribute("id", id, XFlatDatabase.xFlatNs);
}
-
-
+
/**
* Checks whether this engine has any transactional updates in an uncommitted
* or unreverted state.
@@ -248,31 +408,15 @@ public Row(String id, RowData data){
* <p/>
* ALWAYS invoke this while synchronized on the Row.
* @param currentTransaction The current transaction, or null.
+ * @param transactionId The transaction ID to use if the current transaction is null
* @return The most recent committed RowData in this row, committed before the transaction.
*/
- public RowData chooseMostRecentCommitted(Transaction currentTransaction){
- if(currentTransaction == null){
- return chooseMostRecentCommitted(null, Long.MAX_VALUE);
+ public RowData chooseMostRecentCommitted(Transaction currentTransaction, long transactionId){
+ if(currentTransaction != null){
+ //override the given transaction ID just in case
+ transactionId = currentTransaction.getTransactionId();
}
- return chooseMostRecentCommitted(currentTransaction, currentTransaction.getTransactionId());
- }
-
- /**
- * Chooses the most recent committed RowData that was committed before the given transaction ID.
- * This prevents dirty reads in a non-transactional context by having a synchronizing transaction ID
- * which can be obtained from {@link TransactionManager#transactionlessCommitId() }
- * <p/>
- * ALWAYS invoke this while synchronized on the Row.
- * @param snapshotId The Transaction ID representing the time at which a snapshot of the data should be obtained.
- * @return The most recent committed RowData in this row, committed before the given snapshot.
- */
- public RowData chooseMostRecentCommitted(Long snapshotId){
- return chooseMostRecentCommitted(null, snapshotId);
- }
-
- private RowData chooseMostRecentCommitted(Transaction currentTransaction, long currentTxId){
-
RowData ret = null;
long retCommitId = -1;
@@ -285,8 +429,8 @@ private RowData chooseMostRecentCommitted(Transaction currentTransaction, long c
//committed version
if(currentTransaction != null && !currentTransaction.isReverted()){
- if(data.transactionId > -1 && currentTxId == data.transactionId){
- //this row data is the data in the current transaction
+ if(data.transactionId > -1 && transactionId == data.transactionId){
+ //this row data is in the current transaction
return data;
}
}
@@ -299,7 +443,7 @@ private RowData chooseMostRecentCommitted(Transaction currentTransaction, long c
if(data.commitId > -1){
//this row data has been committed
- if(currentTxId > data.commitId){
+ if(transactionId > data.commitId){
//the current transaction is null or began after the transaction was committed
if(retCommitId < data.commitId){
@@ -321,8 +465,89 @@ private RowData chooseMostRecentCommitted(Transaction currentTransaction, long c
return ret;
}
-
+ /**
+ * Chooses the most recent committed RowData that was committed before the given transaction ID.
+ * This prevents dirty reads in a non-transactional context by having a synchronizing transaction ID
+ * which can be obtained from {@link TransactionManager#transactionlessCommitId() }
+ * <p/>
+ * ALWAYS invoke this while synchronized on the Row.
+ * @param snapshotId The Transaction ID representing the time at which a snapshot of the data should be obtained.
+ * @return The most recent committed RowData in this row, committed before the given snapshot.
+ */
+ public RowData chooseMostRecentCommitted(Long snapshotId){
+ return chooseMostRecentCommitted(null, snapshotId);
+ }
+
+ /**
+ * Cleans up the transactional data in this row.
+ * Returns true if this row can then be removed because it contains no data.
+ * @param (optional) A set of transaction IDs that is added to when it is discovered
+ * that a transaction has been newly committed (and the associated RowData's commit ID
+ * is updated).
+ * @return true if this row has no RowData or its only RowData is "nothing".
+ */
+ public boolean cleanup(){
+
+ RowData mostRecent = null;
+ long lowest = transactionManager.getLowestOpenTransaction();
+
+ Set<RowData> toRemove = null;
+
+ Iterator<RowData> it = rowData.values().iterator();
+ while(it.hasNext()){
+ RowData data = it.next();
+
+ if(data.commitId == -1){
+ data.commitId = transactionManager.isTransactionCommitted(data.transactionId);
+ if(data.commitId == -1){
+ //the data is uncommitted
+
+ if(transactionManager.isTransactionReverted(data.transactionId)){
+ //don't need this anymore
+ it.remove();
+ }
+ continue;
+ }
+ }
+
+ //the data is committed
+
+ if(mostRecent == null){
+ mostRecent = data;
+ }
+ else{
+ if(data.commitId <= mostRecent.commitId){
+ //the most recent data is newer
+ if(mostRecent.commitId < lowest){
+ //there is no open transaction that would see this data instead of mostRecent
+ it.remove();
+ }
+ }
+ else{
+ //the data is newer
+ if(data.commitId < lowest){
+ //there is no open transaction that would see mostRecent instead of this data
+ if(toRemove == null){
+ toRemove = new HashSet<>();
+ }
+ toRemove.add(mostRecent);
+ mostRecent = data;
+ }
+ }
+ }
+ }
+
+ //remove the ones we couldn't remove during the iteration
+ if(toRemove != null && toRemove.size() > 0){
+ for(RowData data : toRemove){
+ rowData.remove(data.commitId);
+ }
+ }
+
+ //if there's no more row datas, or there is only one row data and it's value is "nothing", then return true.
+ return rowData.isEmpty() || (rowData.size() == 1 && rowData.values().iterator().next().data == null);
+ }
}
protected class RowData{
@@ -332,6 +557,11 @@ private RowData chooseMostRecentCommitted(Transaction currentTransaction, long c
public Element data = null;
/**
+ * A "db:row" element that wraps the data. This is useful for queries.
+ */
+ public Element rowElement = null;
+
+ /**
* The ID of the transaction that created this data snapshot
*/
public long transactionId = -1;
@@ -346,10 +576,16 @@ public RowData(long txId){
this.transactionId = txId;
}
- public RowData(long txId, Element data){
- this.data = data;
+ public RowData(long txId, Element data, String id){
+ if(data != null){
+ this.data = data;
+ this.rowElement = new Element("row", XFlatDatabase.xFlatNs)
+ .setAttribute("id", id, XFlatDatabase.xFlatNs)
+ .setContent(data);
+ }
this.transactionId = txId;
}
+
}
View
125 java/XFlat/src/org/gburgett/xflat/db/EngineTransactionManager.java
@@ -0,0 +1,125 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.gburgett.xflat.db;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+import org.gburgett.xflat.transaction.TransactionManager;
+
+/**
+ *
+ * @author Gordon
+ */
+public abstract class EngineTransactionManager implements TransactionManager {
+
+ /**
+ * Gets a new commit ID for a transactionless write operation.
+ * All transactionless writes can be thought of as transactions that are
+ * automatically committed. This allows us to provide isolation between
+ * transactions and transactionless writes.
+ * @return
+ */
+ public abstract long transactionlessCommitId();
+
+ /**
+ * Gets the ID of the earliest open transaction.
+ * @return The ID of the earliest open transaction.
+ */
+ public abstract long getLowestOpenTransaction();
+
+ /**
+ * Called by an engine in order to bind itself to a transaction. This means
+ * that the engine has transactional data for this transaction, so the
+ * transaction manager will not forget about the transaction so long as
+ * engine remains bound to it.
+ * <p/>
+ * If the engine is already bound to the transaction, this method does nothing.
+ * @param engine The engine to bind to a transaction.
+ */
+ public abstract void bindEngineToCurrentTransaction(EngineBase engine);
+
+ /**
+ * Called by an engine in order to unbind itself from a transaction. This means
+ * that the engine no longer has transactional data for this transaction; either
+ * the data in the transaction has been fully committed or fully cleaned. In either
+ * case the engine will no longer need to ask the transaction manager about
+ * the status of the transaction.
+ * @param engine The engine to unbind from a transaction.
+ * @param transactionId The ID of the transaction to unbind.
+ */
+ public abstract void unbindEngineFromTransaction(EngineBase engine, Long transactionId);
+
+ /**
+ * Unbinds the engine from all its bound transactions except the given collection.
+ * @see #unbindEngineFromTransaction(org.gburgett.xflat.db.EngineBase, java.lang.Long)
+ * @param engine
+ * @param transactionIds
+ */
+ public abstract void unbindEngineExceptFrom(EngineBase engine, Collection<Long> transactionIds);
+
+ /**
+ * Checks to see if the given transaction ID has been committed. If so,
+ * returns the transaction's commit ID. Otherwise returns -1.
+ * <p/>
+ * This method is only valid so long as at least one engine is bound
+ * to the transaction. If no engines are bound to the transaction,
+ * this may return erroneous data.
+ * @param transactionId The ID of the transaction to check.
+ * @return the transaction's commit ID if committed, -1 otherwise.
+ */
+ public abstract long isTransactionCommitted(long transactionId);
+
+ /**
+ * Checks to see if the given transaction ID has been reverted. If so,
+ * returns true, otherwise false.
+ * </p>
+ * This method is only valid so long as at least one engine is bound
+ * to the transaction. If no engines are bound to the transaction,
+ * this may return erroneous data.
+ * @param transactionId The ID of the transaction to check.
+ * @return true if the transaction is reverted, false otherwise.
+ */
+ public abstract boolean isTransactionReverted(long transactionId);
+
+
+ private AtomicLong lastId = new AtomicLong();
+
+ /**
+ * Generates a new Transaction ID. The ID is composed of the lower
+ * 48 bits of {@link System#currentTimeMillis() } plus a 16-bit uniquifier.
+ * unfortunately this means we have a y6k problem :P I'll let my descendants
+ * deal with it. (seriously, I calculated it and we will run out of IDs on
+ * 10/17/6429 at around 3am).
+ * @return A new ID for a transaction.
+ */
+ protected long generateNewId(){
+ long id;
+ long last;
+ do{
+ //bitshifting current time millis still gets us at least to the year 10,000 before it overflows.
+ id = System.currentTimeMillis() << 16;
+ last = lastId.get();
+ if((last & 0xFFFFFFFFFFFF0000l) == (id & 0xFFFFFFFFFFFF0000l)){
+ //the last ID was at the same millisecond as our new ID, need to use uniquifier.
+ int u = (int)(last & 0xFFFFl) + 1;
+ if(u > 0xFFFF){
+ try {
+ //we can't roll over, need to slow down rate of transaction generation.
+ Thread.sleep(1);
+ } catch (InterruptedException ex) {
+ //don't care
+ }
+ //try again, hopefully currentTimeMillis rolled over.
+ continue;
+ }
+
+ id = id | u;
+ }
+ }while(!lastId.compareAndSet(last, id));
+
+ return id;
+ }
+
+}
View
129 java/XFlat/src/org/gburgett/xflat/db/ShardedEngineBase.java
@@ -154,27 +154,33 @@ private EngineBase getEngine(Interval<T> interval){
throw new XflatException("Attempt to read or write to an engine in an uninitialized state");
}
+ //all operations are writes for the purposes of the sharded engine.
+ //NOT TRUE! Need to fix this!
+ ensureWriteReady();
try{
- return action.act(getEngine(range));
- }
- catch(EngineStateException ex){
- //try one more time with a potentially new engine, if we still fail then let it go
- return action.act(getEngine(range));
+
+ try{
+ return action.act(getEngine(range));
+ }
+ catch(EngineStateException ex){
+ //try one more time with a potentially new engine, if we still fail then let it go
+ return action.act(getEngine(range));
+ }
+
+ }finally{
+ writeComplete();
}
}
protected void update(){
-
-
Iterator<TableMetadata> it = openShards.values().iterator();
while(it.hasNext()){
TableMetadata table = it.next();
if(table.canSpinDown()){
- //remove right now - if between the check and the remove we got some activity
- //then oh well, we can spin up a new instance.
- it.remove();
+ EngineBase spinDown = table.spinDown(false);
+
+ //don't remove any metadata. It's too dangerous with the way the concurrency is structured.
- table.spinDown();
try {
this.getMetadataFactory().saveTableMetadata(table);
} catch (IOException ex) {
@@ -183,6 +189,7 @@ protected void update(){
}
}
}
+
}
@Override
@@ -197,7 +204,8 @@ protected boolean hasUncomittedData() {
}
else if(state == EngineState.Running){
for(TableMetadata table : this.openShards.values()){
- if(table.hasUncommittedData()){
+ EngineBase e = table.getEngine();
+ if(e != null && e.hasUncomittedData()){
return true;
}
}
@@ -255,58 +263,67 @@ protected boolean beginOperations() {
@Override
protected boolean spinDown(final SpinDownEventHandler completionEventHandler) {
- if(!this.state.compareAndSet(EngineState.Running, EngineState.SpinningDown)){
- //we're in the wrong state.
- return false;
- }
-
- synchronized(spinDownSyncRoot){
- for(Map.Entry<Interval<T>, TableMetadata> m : this.openShards.entrySet()){
- EngineBase spinningDown = m.getValue().spinDown();
- this.spinningDownEngines.put(m.getKey(), spinningDown);
+ try{
+ this.getTableLock();
+
+ if(!this.state.compareAndSet(EngineState.Running, EngineState.SpinningDown)){
+ //we're in the wrong state.
+ return false;
}
- }
-
- Runnable spinDownMonitor = new Runnable(){
- @Override
- public void run() {
- if(getState() != EngineState.SpinningDown){
- throw new RuntimeException("task complete");
+
+ synchronized(spinDownSyncRoot){
+ for(Map.Entry<Interval<T>, TableMetadata> m : this.openShards.entrySet()){
+ EngineBase spinningDown = m.getValue().spinDown(true);
+ this.spinningDownEngines.put(m.getKey(), spinningDown);
}
-
- synchronized(spinDownSyncRoot){
- if(isSpunDown()){
- if(state.compareAndSet(EngineState.SpinningDown, EngineState.SpunDown)){
- completionEventHandler.spinDownComplete(new SpinDownEvent(ShardedEngineBase.this));
- }
- else{
- //somehow we weren't in the spinning down state
- forceSpinDown();
- }
+ }
+
+ Runnable spinDownMonitor = new Runnable(){
+ @Override
+ public void run() {
+ if(getState() != EngineState.SpinningDown){
throw new RuntimeException("task complete");
}
-
-
- Iterator<EngineBase> it = spinningDownEngines.values().iterator();
- while(it.hasNext()){
- EngineBase spinningDown = it.next();
- EngineState state = spinningDown.getState();
- if(state == EngineState.SpunDown || state == EngineState.Uninitialized){
- it.remove();
+
+ synchronized(spinDownSyncRoot){
+ if(isSpunDown()){
+ if(state.compareAndSet(EngineState.SpinningDown, EngineState.SpunDown)){
+ if(completionEventHandler != null)
+ completionEventHandler.spinDownComplete(new SpinDownEvent(ShardedEngineBase.this));
+ }
+ else{
+ //somehow we weren't in the spinning down state
+ forceSpinDown();
+ }
+ throw new RuntimeException("task complete");
}
- else if(state == EngineState.Running){
- spinningDown.spinDown(null);
+
+
+ Iterator<EngineBase> it = spinningDownEngines.values().iterator();
+ while(it.hasNext()){
+ EngineBase spinningDown = it.next();
+ EngineState state = spinningDown.getState();
+ if(state == EngineState.SpunDown || state == EngineState.Uninitialized){
+ it.remove();
+ }
+ else if(state == EngineState.Running){
+ spinningDown.spinDown(null);
+ }
}
+ //give it a few more ms just in case
}
- //give it a few more ms just in case
}
- }
- };
-
- this.getExecutorService().scheduleWithFixedDelay(spinDownMonitor, 5, 10, TimeUnit.MILLISECONDS);
-
+ };
+
+ this.getExecutorService().scheduleWithFixedDelay(spinDownMonitor, 5, 10, TimeUnit.MILLISECONDS);
+
+
+ return true;
- return true;
+ }
+ finally{
+ this.releaseTableLock();
+ }
}
/**
@@ -325,7 +342,7 @@ protected boolean forceSpinDown() {
synchronized(spinDownSyncRoot){
for(Map.Entry<Interval<T>, TableMetadata> m : this.openShards.entrySet()){
- EngineBase spinningDown = m.getValue().spinDown();
+ EngineBase spinningDown = m.getValue().spinDown(true);
this.spinningDownEngines.put(m.getKey(), spinningDown);
}
View
35 java/XFlat/src/org/gburgett/xflat/db/TableMetadata.java
@@ -54,9 +54,8 @@ public boolean canSpinDown(){
return lastActivity + 3000 < System.currentTimeMillis() && engine == null || !engine.hasUncomittedData();
}
- public boolean hasUncommittedData(){
- EngineBase engine = this.engine.get();
- return engine != null && engine.hasUncomittedData();
+ public EngineBase getEngine(){
+ return this.engine.get();
}
EngineState getEngineState(){
@@ -130,6 +129,7 @@ private EngineBase makeNewEngine(File file){
ret.setConversionService(db.getConversionService());
ret.setExecutorService(db.getExecutorService());
+ ret.setTransactionManager(db.getEngineTransactionManager());
if(ret instanceof ShardedEngineBase){
//give it a metadata factory centered in its own file. If it uses this,
@@ -188,9 +188,10 @@ else if(state == EngineState.SpinningUp ||
return engine;
}
- public EngineBase spinDown(){
+ public EngineBase spinDown(boolean force){
synchronized(this){
- EngineBase engine = this.engine.getAndSet(null);
+ EngineBase engine = this.engine.get();
+
EngineState state;
if(engine == null ||
(state = engine.getState()) == EngineState.SpinningDown ||
@@ -199,19 +200,19 @@ public EngineBase spinDown(){
return engine;
}
- else{
- if(engine.hasUncomittedData()){
- //whoops! see if we can put it back quick
- engine = this.engine.getAndSet(engine);
- //continue like we were spinning this one down.
-
- if(engine == null ||
- (state = engine.getState()) == EngineState.SpinningDown ||
- state == EngineState.SpunDown){
-
- return engine;
- }
+
+ try{
+ engine.getTableLock();
+
+ if(engine.hasUncomittedData() && !force){
+ //can't spin it down, return the engine
+ return engine;
}
+
+ this.engine.compareAndSet(engine, null);
+ }finally{
+ //table lock no longer needed
+ engine.releaseTableLock();
}
Log l = LogFactory.getLog(getClass());
View
43 java/XFlat/src/org/gburgett/xflat/db/XFlatDatabase.java
@@ -21,8 +21,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.gburgett.xflat.Database;
@@ -34,10 +32,10 @@
import org.gburgett.xflat.convert.PojoConverter;
import org.gburgett.xflat.convert.converters.JDOMConverters;
import org.gburgett.xflat.convert.converters.StringConverters;
-import org.gburgett.xflat.util.DocumentFileWrapper;
+import org.gburgett.xflat.transaction.ThreadContextTransactionManager;
+import org.gburgett.xflat.transaction.TransactionManager;
import org.jdom2.Document;
import org.jdom2.Element;
-import org.jdom2.JDOMException;
import org.jdom2.Namespace;
/**
@@ -91,6 +89,23 @@ void setMetadataFactory(TableMetadataFactory factory){
this.metadataFactory = factory;
}
+ private EngineTransactionManager transactionManager;
+ /**
+ * Gets the transactionManager.
+ */
+ @Override
+ public TransactionManager getTransactionManager(){
+ return this.transactionManager;
+ }
+
+ EngineTransactionManager getEngineTransactionManager(){
+ return this.transactionManager;
+ }
+
+ public void setTransactionManager(EngineTransactionManager transactionManager){
+ this.transactionManager = transactionManager;
+ }
+
//</editor-fold>
private File directory;
@@ -187,6 +202,10 @@ public void Initialize(){
if(this.executorService == null)
this.executorService = new ScheduledThreadPoolExecutor(this.config.getThreadCount());
+ if(this.transactionManager == null){
+ this.transactionManager = new ThreadContextTransactionManager();
+ }
+
this.InitializeScheduledTasks();
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
@@ -238,9 +257,14 @@ public void shutdown(int timeout) throws TimeoutException{
Set<EngineBase> engines = new HashSet<>();
for(Map.Entry<String, TableMetadata> table : this.tables.entrySet()){
try{
- EngineBase e = table.getValue().spinDown();
- if(e != null)
+ EngineBase e = table.getValue().spinDown(true);
+ if(e != null){
+ if(e.getState() == EngineState.Running){
+ //don't care, force spin down
+ e.spinDown(null);
+ }
engines.add(e);
+ }
}catch(Exception ex){
//eat
}
@@ -343,13 +367,14 @@ public void run() {
* on the DB.
*/
private void update(){
- //TODO: hot-swap engines here
//check on inactivity shutdown
for(TableMetadata m : this.tables.values()){
- if(m.lastActivity + m.config.getInactivityShutdownMs() < System.currentTimeMillis()){
- m.spinDown();
+ if(m.canSpinDown()){
+ //spin down if no uncommitted data
+ m.spinDown(false);
}
+ //don't ever remove TableMetadata. It's too dangerous with the way we do locking and isn't worth it.
}
}
View
747 java/XFlat/src/org/gburgett/xflat/engine/CachedDocumentEngine.java
@@ -6,10 +6,12 @@
import java.io.File;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -22,7 +24,6 @@
import java.util.concurrent.atomic.AtomicReference;
import org.gburgett.xflat.Cursor;
import org.gburgett.xflat.DuplicateKeyException;
-import org.gburgett.xflat.EngineStateException;
import org.gburgett.xflat.KeyNotFoundException;
import org.gburgett.xflat.XflatException;
import org.gburgett.xflat.db.Engine;
@@ -34,7 +35,6 @@
import org.gburgett.xflat.transaction.Transaction;
import org.gburgett.xflat.util.DocumentFileWrapper;
import org.hamcrest.Matcher;
-import org.jdom2.Attribute;
import org.jdom2.Document;
import org.jdom2.Element;
import org.jdom2.JDOMException;
@@ -45,6 +45,7 @@
*/
public class CachedDocumentEngine extends EngineBase implements Engine {
+ //TODO: can we replace this by taking a table lock on spin-up?
private final AtomicBoolean operationsReady = new AtomicBoolean(false);
private ConcurrentMap<String, Row> cache = null;
@@ -66,10 +67,6 @@ public CachedDocumentEngine(DocumentFileWrapper file, String tableName){
this.file = file;
}
- private void setTxId(Element data, long txId){
- data.setAttribute("tx", Long.toString(txId), XFlatDatabase.xFlatNs);
- }
-
private long getTxId(Transaction tx){
return tx != null ?
tx.getTransactionId() :
@@ -80,36 +77,39 @@ private long getTxId(Transaction tx){
//<editor-fold desc="interface methods">
@Override
public void insertRow(String id, Element data) throws DuplicateKeyException {
- ensureReady();
-
- Transaction tx = this.getTransactionManager().getTransaction();
- long txId = getTxId(tx);
-
- RowData rData = new RowData(txId, data);
- if(tx == null){
- //transactionless means auto-commit
- rData.commitId = txId;
- }
-
- Row row = new Row(id, rData);
- row = this.cache.putIfAbsent(id, row);
- if(row != null){
- synchronized(row){
- //see if all the data was from after this transaction
- RowData chosen = row.chooseMostRecentCommitted(tx);
- if(chosen == null || chosen.data == null){
- //we're good to insert our transactional data
- row.rowData.put(txId, rData);
- this.uncommittedRows.put(id, row);
- }
- else{
- throw new DuplicateKeyException(id);
+ ensureWriteReady();
+ try{
+ Transaction tx = this.getTransactionManager().getTransaction();
+ long txId = getTxId(tx);
+
+ RowData rData = new RowData(txId, data, id);
+ if(tx == null){
+ //transactionless means auto-commit
+ rData.commitId = txId;
+ }
+
+ Row row = new Row(id, rData);
+ row = this.cache.putIfAbsent(id, row);
+ if(row != null){
+ synchronized(row){
+ //see if all the data was from after this transaction
+ RowData chosen = row.chooseMostRecentCommitted(tx, txId);
+ if(chosen == null || chosen.data == null){
+ //we're good to insert our transactional data
+ row.rowData.put(txId, rData);
+ this.uncommittedRows.put(id, row);
+ }
+ else{
+ throw new DuplicateKeyException(id);
+ }
}
}
- }
- setLastActivity(System.currentTimeMillis());
- dumpCache();
+ setLastActivity(System.currentTimeMillis());
+ dumpCache();
+ }finally{
+ writeComplete();
+ }
}
@Override
@@ -124,7 +124,8 @@ public Element readRow(String id) {
//lock the row
synchronized(row){
Transaction tx = this.getTransactionManager().getTransaction();
- RowData ret = row.chooseMostRecentCommitted(tx);
+ //we want either the most recent for this transaction or, if null, the most recent globally.
+ RowData ret = row.chooseMostRecentCommitted(tx, Long.MAX_VALUE);
if(ret == null || ret.data == null){
return null;
@@ -149,251 +150,347 @@ public Element readRow(String id) {
@Override
public void replaceRow(String id, Element data) throws KeyNotFoundException {
- ensureReady();
-
- Transaction tx = this.getTransactionManager().getTransaction();
- long txId = getTxId(tx);
+ ensureWriteReady();
+ try{
+ Transaction tx = this.getTransactionManager().getTransaction();
+ long txId = getTxId(tx);
-
- Row row = this.cache.get(id);
- if(row == null){
- throw new KeyNotFoundException(id);
- }
-
- synchronized(row){
- RowData toReplace = row.chooseMostRecentCommitted(tx);
- if(toReplace == null || toReplace.data == null){
+
+ Row row = this.cache.get(id);
+ if(row == null){
throw new KeyNotFoundException(id);
}
-
- RowData newData = new RowData(txId, data);
- if(tx == null){
- //transactionless means auto-commit
- newData.commitId = txId;
+
+ synchronized(row){
+ RowData toReplace = row.chooseMostRecentCommitted(tx, txId);
+ if(toReplace == null || toReplace.data == null){
+ throw new KeyNotFoundException(id);
+ }
+
+ RowData newData = new RowData(txId, data, id);
+ if(tx == null){
+ //transactionless means auto-commit
+ newData.commitId = txId;
+ }
+ row.rowData.put(txId, newData);
}
- row.rowData.put(txId, newData);
- }
+
+ setLastActivity(System.currentTimeMillis());
+ dumpCache();
- setLastActivity(System.currentTimeMillis());
- dumpCache();
+ }finally{
+ writeComplete();
+ }
}
@Override
public boolean update(String id, XpathUpdate update) throws KeyNotFoundException {
- ensureReady();
-
- Row row = this.cache.get(id);
- if(row == null){
- throw new KeyNotFoundException(id);
- }
-
- Transaction tx = this.getTransactionManager().getTransaction();
- long txId = getTxId(tx);
+ ensureWriteReady();
+ try{
+ Row row = this.cache.get(id);
+ if(row == null){
+ throw new KeyNotFoundException(id);
+ }
- update.setConversionService(this.getConversionService());
-
- boolean ret;
- try {
- //lock the row
- synchronized(row){
- RowData data = row.chooseMostRecentCommitted(tx);
- if(data == null || data.data == null){
- throw new KeyNotFoundException(id);
- }
- else{
- //apply to a copy, store the copy as a transactional state.
- RowData newData = new RowData(txId, data.data.clone());
- if(tx == null){
- //transactionless means auto-commit
- newData.commitId = txId;
+ Transaction tx = this.getTransactionManager().getTransaction();
+ long txId = getTxId(tx);
+
+ update.setConversionService(this.getConversionService());
+
+ boolean ret;
+ try {
+ //lock the row
+ synchronized(row){
+ RowData data = row.chooseMostRecentCommitted(tx, txId);
+ if(data == null || data.data == null){
+ throw new KeyNotFoundException(id);
}
-
- int updates = update.apply(newData.data);
- ret = updates > 0;
- if(ret){
- //no need to put a new version if no data was modified
- row.rowData.put(txId, newData);
+ else{
+ //apply to a copy, store the copy as a transactional state.
+ RowData newData = new RowData(txId, data.data.clone(), row.rowId);
+ if(tx == null){
+ //transactionless means auto-commit
+ newData.commitId = txId;
+ }
+
+ int updates = update.apply(newData.rowElement);
+ ret = updates > 0;
+ if(ret){
+ //no need to put a new version if no data was modified
+ row.rowData.put(txId, newData);
+ this.uncommittedRows.put(id, row);
+ }
}
}
+ } catch (JDOMException ex) {
+ if(log.isDebugEnabled())
+ log.debug("Exception while applying update " + update.toString(), ex);
+
+ ret = false;
}
- } catch (JDOMException ex) {
- if(log.isDebugEnabled())
- log.debug("Exception while applying update " + update.toString(), ex);
-
- ret = false;
+
+ setLastActivity(System.currentTimeMillis());
+
+ if(ret)
+ dumpCache();
+
+ return ret;
+ }finally{
+ writeComplete();
}
-
- setLastActivity(System.currentTimeMillis());
-
- if(ret)
- dumpCache();
-
- return ret;
}
@Override
public int update(XpathQuery query, XpathUpdate update) {
- ensureReady();
-
- query.setConversionService(this.getConversionService());
- update.setConversionService(this.getConversionService());
-
- Matcher<Element> rowMatcher = query.getRowMatcher();
-
- Transaction tx = this.getTransactionManager().getTransaction();
- long txId = getTxId(tx);
+ ensureWriteReady();
+ try{
+
+ query.setConversionService(this.getConversionService());
+ update.setConversionService(this.getConversionService());
-
- int rowsUpdated = 0;
-
- for(Row row : this.cache.values()){
- synchronized(row){
- RowData rData = row.chooseMostRecentCommitted(tx);
- if(rData == null || rData.data == null){
- continue;
- }
-
- if(!rowMatcher.matches(rData.data))
- continue;
-
- try {
- //apply to a copy, store the copy as a transactional state.
- RowData newData = new RowData(txId, rData.data.clone());
- if(tx == null){
- //transactionless means auto-commit
- newData.commitId = txId;
+ Matcher<Element> rowMatcher = query.getRowMatcher();
+
+ Transaction tx = this.getTransactionManager().getTransaction();
+ long txId = getTxId(tx);
+
+
+ int rowsUpdated = 0;
+
+ for(Row row : this.cache.values()){
+ synchronized(row){
+ RowData rData = row.chooseMostRecentCommitted(tx, txId);
+ if(rData == null || rData.data == null){
+ continue;
}
-
- int updates = update.apply(newData.data);
-
- if(updates > 0){
- //no need to put a new version if no data was modified
- row.rowData.put(txId, newData);
+
+ if(!rowMatcher.matches(rData.rowElement))
+ continue;
+
+ try {
+ //apply to a copy, store the copy as a transactional state.
+ RowData newData = new RowData(txId, rData.data.clone(), row.rowId);
+ if(tx == null){
+ //transactionless means auto-commit
+ newData.commitId = txId;
+ }
+
+ int updates = update.apply(newData.rowElement);
+
+ if(updates > 0){
+ //no need to put a new version if no data was modified
+ row.rowData.put(txId, newData);
+ if(newData.commitId == -1)
+ this.uncommittedRows.put(row.rowId, row);
+ }
+
+ rowsUpdated = updates > 0 ? rowsUpdated + 1 : rowsUpdated;
+ }
+ catch (JDOMException ex) {
+ if(log.isDebugEnabled())
+ log.debug("Exception while applying update " + update.toString(), ex);
}
-
- rowsUpdated = updates > 0 ? rowsUpdated + 1 : rowsUpdated;
- }
- catch (JDOMException ex) {
- if(log.isDebugEnabled())
- log.debug("Exception while applying update " + update.toString(), ex);
}
}
+
+ setLastActivity(System.currentTimeMillis());
+
+ if(rowsUpdated > 0){
+ dumpCache();
+ }
+
+ return rowsUpdated;
+ }finally{
+ writeComplete();
}
-
- setLastActivity(System.currentTimeMillis());
-
- if(rowsUpdated > 0){
- dumpCache();
- }
-
- return rowsUpdated;
}
@Override
public boolean upsertRow(String id, Element data) {
- ensureReady();
-
- Transaction tx = this.getTransactionManager().getTransaction();
- long txId = getTxId(tx);
-
- RowData newData = new RowData(txId, data);
- if(tx == null){
- //transactionless means auto-commit
- newData.commitId = txId;
- }
-
- Row newRow = new Row(id, newData);
-
- Row existed = this.cache.putIfAbsent(id, newRow); //takes care of the insert
- synchronized(existed){
- //takes care of the "or update"
- existed.rowData.put(txId, newData);
+ ensureWriteReady();
+ try{
+ Transaction tx = this.getTransactionManager().getTransaction();
+ long txId = getTxId(tx);
+
+ RowData newData = new RowData(txId, data, id);
+ if(tx == null){
+ //transactionless means auto-commit
+ newData.commitId = txId;
+ }
+
+ Row newRow = new Row(id, newData);
+
+ boolean didInsert = false;
+ synchronized(newRow){
+ Row existingRow = this.cache.putIfAbsent(id, newRow); //takes care of the insert
+ if(existingRow != null){
+ synchronized(existingRow){
+ //we inserted if the most recent committed was null or had null data
+ RowData mostRecent = existingRow.chooseMostRecentCommitted(tx, txId);
+ didInsert = mostRecent == null || mostRecent.data == null;
+
+ //takes care of the "or update"
+ existingRow.rowData.put(txId, newData);
+ this.uncommittedRows.put(id, existingRow);
+ }
+ }
+ else{
+ didInsert = true;
+ this.uncommittedRows.put(id, newRow);
+ }
+ }
+
+ setLastActivity(System.currentTimeMillis());
+ dumpCache();
+
+ return didInsert; //if none existed, then we inserted
+
+ }finally{
+ writeComplete();
}
-
- setLastActivity(System.currentTimeMillis());
- dumpCache();
-
- return existed == null; //if none existed, then we inserted
}
@Override
public void deleteRow(String id) throws KeyNotFoundException {
- ensureReady();
-
- Row toRemove = this.cache.get(id);
-
- if(toRemove == null){
- throw new KeyNotFoundException(id);
- }
-
- Transaction tx = this.getTransactionManager().getTransaction();
- long txId = getTxId(tx);
-
- RowData newData = new RowData(txId, null);
- if(tx == null){
- newData.commitId = txId;
- }
+ ensureWriteReady();
+ try{
-
- synchronized(toRemove){
- RowData rData = toRemove.chooseMostRecentCommitted(tx);
- if(rData == null || rData.data == null){
+ Row row = this.cache.get(id);
+
+ if(row == null){
throw new KeyNotFoundException(id);
}
-
- //a RowData that is null means it was deleted.
- toRemove.rowData.put(txId, newData);
+
+ Transaction tx = this.getTransactionManager().getTransaction();
+ long txId = getTxId(tx);
+
+ RowData newData = new RowData(txId, null, id);
+ if(tx == null){
+ newData.commitId = txId;
+ }
+
+
+ synchronized(row){
+ RowData rData = row.chooseMostRecentCommitted(tx, txId);
+ if(rData == null || rData.data == null){
+ throw new KeyNotFoundException(id);
+ }
+
+ //a RowData that is null means it was deleted.
+ row.rowData.put(txId, newData);
+ this.uncommittedRows.put(row.rowId, row);
+ }
+
+ setLastActivity(System.currentTimeMillis());
+ dumpCache();
+ }finally{
+ writeComplete();
}
-
- setLastActivity(System.currentTimeMillis());
- dumpCache();
}
@Override
public int deleteAll(XpathQuery query) {
- ensureReady();
-
- query.setConversionService(this.getConversionService());
-
- Transaction tx = this.getTransactionManager().getTransaction();
- long txId = getTxId(tx);
-
- Matcher<Element> rowMatcher = query.getRowMatcher();
- Iterator<Map.Entry<String, Row>> it = this.cache.entrySet().iterator();
+ ensureWriteReady();
+ try{
+
+ query.setConversionService(this.getConversionService());
+
+ Transaction tx = this.getTransactionManager().getTransaction();
+ long txId = getTxId(tx);
+
+ Matcher<Element> rowMatcher = query.getRowMatcher();
+ Iterator<Map.Entry<String, Row>> it = this.cache.entrySet().iterator();
+
+ int numRemoved = 0;
+
+ while(it.hasNext()){
+ Map.Entry<String, Row> entry = it.next();
+
+ Row row = entry.getValue();
+ synchronized(row){
+ RowData rData = row.chooseMostRecentCommitted(tx, txId);
+ if(rData == null || rData.data == null){
+ continue;
+ }
+
+ if(rowMatcher.matches(rData.rowElement)){
+ RowData newData = new RowData(txId, null, row.rowId);
+ if(tx == null){
+ newData.commitId = txId;
+ }
+ row.rowData.put(txId, newData);
+ this.uncommittedRows.put(row.rowId, row);
+
+ numRemoved++;
+ }
+ }
+ }
+
+ setLastActivity(System.currentTimeMillis());
+
+ if(numRemoved > 0)
+ dumpCache();
+
+ return numRemoved;
+ }finally{
+ writeComplete();
+ }
+ }
+
+ //</editor-fold>
+
+ private void update(){
- int numRemoved = 0;
+ Set<Row> rowsToRemove = new HashSet<>();
+ Set<Long> remainingTransactions = new HashSet<>();
+ Iterator<Row> it = this.uncommittedRows.values().iterator();
while(it.hasNext()){
- Map.Entry<String, Row> entry = it.next();
-
- Row row = entry.getValue();
+ Row row = it.next();
synchronized(row){
- RowData rData = row.chooseMostRecentCommitted(tx);
- if(rData == null || rData.data == null){
- continue;
- }
+ it.remove();
- if(rowMatcher.matches(rData.data)){
- RowData newData = new RowData(txId, null);
- if(tx == null){
- newData.commitId = txId;
+ if(row.cleanup()){
+ rowsToRemove.add(row);
+ }
+ else{
+ //remember the remaining transactions
+ for(RowData data : row.rowData.values()){
+ if(data.commitId == -1){
+ remainingTransactions.add(data.transactionId);
+ }
}
- row.rowData.put(txId, newData);
-
- numRemoved++;
}
}
}
- setLastActivity(System.currentTimeMillis());
-
- if(numRemoved > 0)
- dumpCache();
+ if(rowsToRemove.size() > 0){
+ //we have to lock the table in order to actually remove any rows.
+ try{
+ this.getTableLock();
+
+ for(Row row : rowsToRemove){
+ //doublecheck - do another cleanup, don't want to be sloppy here.
+ if(row.cleanup()){
+ this.cache.remove(row.rowId);
+ }
+ else{
+ //remember the remaining transactions
+ for(RowData data : row.rowData.values()){
+ if(data.commitId == -1){
+ remainingTransactions.add(data.transactionId);
+ }
+ }
+ }
+ }
+ }
+ finally{
+ this.releaseTableLock();
+ }
+ }
- return numRemoved;
+ //unbind the engine from all transactions except the remaining transactions
+ this.getTransactionManager().unbindEngineExceptFrom(this, remainingTransactions);
}
-
- //</editor-fold>
@Override
protected boolean spinUp() {
@@ -411,23 +508,25 @@ protected boolean spinUp() {
List<Element> rowList = doc.getRootElement().getChildren("row", XFlatDatabase.xFlatNs);
for(int i = rowList.size() - 1; i >= 0; i--){
- Element row = rowList.get(i).detach();
+ Element row = rowList.get(i);
if(row.getChildren().isEmpty()){
continue;
}
- Element data = row.getChildren().get(0);
+ Element data = row.getChildren().get(0).detach();
String id = getId(row);
- long txId = -1;
- long commitId = -1;
+ //default it to zero so that we know it's committed but if we don't get an actual
+ //value for the commit then we have the lowest value.
+ long txId = 0;
+ long commitId = 0;
String a = row.getAttributeValue("tx", XFlatDatabase.xFlatNs);
if(a != null && !"".equals(a)){
try{
txId = Long.parseLong(a);
}catch(NumberFormatException ex){
- //just leave it as -1.
+ //just leave it as 0.
}
}
a = row.getAttributeValue("commit", XFlatDatabase.xFlatNs);
@@ -435,11 +534,11 @@ protected boolean spinUp() {
try{
commitId = Long.parseLong(a);
}catch(NumberFormatException ex){
- //just leave it as -1.
+ //just leave it as 0.
}
}
- RowData rData = new RowData(txId, data);
+ RowData rData = new RowData(txId, data, id);
rData.commitId = commitId;
Row newRow = new Row(id, rData);
@@ -459,6 +558,18 @@ protected boolean spinUp() {
}
}
+ //schedule the update task
+ this.getExecutorService().scheduleWithFixedDelay(new Runnable(){
+ @Override
+ public void run() {
+ if(state.get() == EngineState.SpinningDown || state.get() == EngineState.SpunDown){
+ throw new RuntimeException("task termination");
+ }
+
+ update();
+ }
+ }, 500, 500, TimeUnit.MILLISECONDS);
+
return true;
}
@@ -479,112 +590,116 @@ protected boolean beginOperations() {
}
/**
- * Called before every write to ensure we are ready to write.
- * If the engine is spinning down then we throw because engines are read-only
- * when spinning down.
+ * Overrides ensureWriteReady to additionally check if the
+ * engine has fully finished spinning up
*/
- private void ensureReady(){
- EngineState state = this.state.get();
- if(state == EngineState.SpunDown ||
- state == EngineState.SpinningDown){
- throw new EngineStateException("Write operations not supported on an engine that is spinning down", state);
- }
-
- if(operationsReady.get() && state == EngineState.Running){
- return;
- }
+ @Override
+ protected void ensureWriteReady(){
+ super.ensureWriteReady();
- synchronized(operationsReady){
- while(!operationsReady.get() && this.state.get() != EngineState.Running){
- try {
- operationsReady.wait();
- } catch (InterruptedException ex) {
- if(operationsReady.get()){
- //oh ok we're all good to go
- return;
+ //check if we're not yet running, if so wait until we are running
+ if(!operationsReady.get() || state.get() != EngineState.Running){
+ synchronized(operationsReady){
+ while(!operationsReady.get() && this.state.get() != EngineState.Running){
+ try {
+ operationsReady.wait();
+ } catch (InterruptedException ex) {
+ if(operationsReady.get()){
+ //oh ok we're all good to go
+ return;
+ }
+ throw new XflatException("Interrupted while waiting for engine to be ready");
}
- throw new XflatException("Interrupted while waiting for engine to be ready");
}
}
}
}
+
private ConcurrentMap<Cursor<Element>, String> openCursors = new ConcurrentHashMap<>();
@Override
protected boolean spinDown(final SpinDownEventHandler completionEventHandler) {
- //not much to do since everything's in the cache, just dump the cache
- //and set read-only mode.
- if(!this.state.compareAndSet(EngineState.Running, EngineState.SpinningDown)){
- //we're in the wrong state.
- return false;
- }
-
- if(log.isTraceEnabled())
- log.trace("Spinning down");
+ try{
+ this.getTableLock();
- final AtomicReference<ScheduledFuture<?>> cacheDumpTask = new AtomicReference<>(null);
- if(this.cache != null && lastModified.get() >= lastDump.get()){
- //schedule immediate dump
- cacheDumpTask.set(this.getExecutorService().schedule(
- new Runnable(){
+ //not much to do since everything's in the cache, just dump the cache
+ //and set read-only mode.
+ if(!this.state.compareAndSet(EngineState.Running, EngineState.SpinningDown)){
+ //we're in the wrong state.
+ return false;
+ }
+
+ if(log.isTraceEnabled())
+ log.trace("Spinning down");
+
+
+ final AtomicReference<ScheduledFuture<?>> cacheDumpTask = new AtomicReference<>(null);
+ if(this.cache != null && lastModified.get() >= lastDump.get()){
+ //schedule immediate dump
+ cacheDumpTask.set(this.getExecutorService().schedule(
+ new Runnable(){
+ @Override
+ public void run() {
+ try{
+ dumpCacheNow();
+ }
+ catch(Exception ex){
+ log.warn("Unable to dump cached data", ex);
+ }
+ }
+ }, 0, TimeUnit.MILLISECONDS));
+ }
+
+ if(openCursors.isEmpty() && (cacheDumpTask.get() == null || cacheDumpTask.get().isDone())){
+ this.state.set(EngineState.SpunDown);
+
+ if(log.isTraceEnabled())
+ log.trace("Spin down complete (immediate)");
+
+ if(completionEventHandler != null)
+ completionEventHandler.spinDownComplete(new SpinDownEvent(CachedDocumentEngine.this));
+
+ //we're ok to finish our spin down now
+ return forceSpinDown();
+
+ }
+
+ Runnable spinDownTask = new Runnable(){
@Override
public void run() {
- try{
- dumpCacheNow();
+ if(!openCursors.isEmpty())
+ return;
+
+ if(cacheDumpTask.get() != null && !cacheDumpTask.get().isDone()){
+ return;
}
- catch(Exception ex){
- log.warn("Unable to dump cached data", ex);
+
+ if(!state.compareAndSet(EngineState.SpinningDown, EngineState.SpunDown)){
+ throw new RuntimeException("cancel task - in wrong state");
}
- }
- }, 0, TimeUnit.MILLISECONDS));
- }
-
- if(openCursors.isEmpty() && (cacheDumpTask.get() == null || cacheDumpTask.get().isDone())){
- this.state.set(EngineState.SpunDown);
-
- if(log.isTraceEnabled())
- log.trace("Spin down complete (immediate)");
-
- if(completionEventHandler != null)
- completionEventHandler.spinDownComplete(new SpinDownEvent(CachedDocumentEngine.this));
- //we're ok to finish our spin down now
- return forceSpinDown();
-
- }
+ if(log.isTraceEnabled())
+ log.trace(String.format("Spin down complete (task)"));
- Runnable spinDownTask = new Runnable(){
- @Override
- public void run() {
- if(!openCursors.isEmpty())
- return;
+ if(completionEventHandler != null)
+ completionEventHandler.spinDownComplete(new SpinDownEvent(CachedDocumentEngine.this));
+ //we're ok to finish our spin down now
+ forceSpinDown();
- if(cacheDumpTask.get() != null && !cacheDumpTask.get().isDone()){
- return;
- }
+ throw new RuntimeException("Scheduled Task Complete");
- if(!state.compareAndSet(EngineState.SpinningDown, EngineState.SpunDown)){
- throw new RuntimeException("cancel task - in wrong state");
}
+ };
+ this.getExecutorService().scheduleWithFixedDelay(
+ spinDownTask, 5, 10, TimeUnit.MILLISECONDS);
- if(log.isTraceEnabled())
- log.trace(String.format("Spin down complete (task)"));
-
- if(completionEventHandler != null)
- completionEventHandler.spinDownComplete(new SpinDownEvent(CachedDocumentEngine.this));
- //we're ok to finish our spin down now
- forceSpinDown();
-
- throw new RuntimeException("Scheduled Task Complete");
-
- }
- };
- this.getExecutorService().scheduleWithFixedDelay(
- spinDownTask, 5, 10, TimeUnit.MILLISECONDS);
-
- return true;
+ return true;
+ }
+ finally{
+ this.releaseTableLock();
+ }
}
@Override
@@ -773,12 +888,12 @@ private void peekNext(){
while(toIterate.hasNext()){
Row next = toIterate.next();
synchronized(next){
- RowData rData = next.chooseMostRecentCommitted(tx);
+ RowData rData = next.chooseMostRecentCommitted(tx, txId);
if(rData == null || rData.data == null){
continue;
}
- if(rowMatcher.matches(rData.data)){
+ if(rowMatcher.matches(rData.rowElement)){
//found a matching row
peekCount++;
this.peek = rData.data.clone();
View
167 java/XFlat/src/org/gburgett/xflat/engine/IdShardedEngine.java
@@ -55,6 +55,13 @@ protected boolean isSpunDown(){
return super.isSpunDown() && crossShardQueries.isEmpty();
}
+ /**
+ * Gets a list of shard intervals over which the query should be executed.
+ * This is obtained by dissecting the query according to ID and then
+ * looking for known shards that intersect the dissected query.
+ * @param query
+ * @return
+ */
private List<Interval<T>> getExecutionPlan(XpathQuery query){
final IntervalProvider<T> provider = config.getIntervalProvider();
@@ -74,14 +81,18 @@ protected boolean isSpunDown(){
@Override
public void insertRow(final String id, final Element data) throws DuplicateKeyException {
-
- doWithEngine(getInterval(id), new EngineAction(){
- @Override
- public Object act(Engine engine) {
- engine.insertRow(id, data);
- return null;
- }
- });
+ ensureWriteReady();
+ try{
+ doWithEngine(getInterval(id), new EngineAction(){
+ @Override
+ public Object act(Engine engine) {
+ engine.insertRow(id, data);
+ return null;
+ }
+ });
+ }finally{
+ writeComplete();
+ }
}
@Override
@@ -95,13 +106,9 @@ public Element act(Engine engine) {
}
@Override
- public Cursor<Element> queryTable(XpathQuery query) {
+ public Cursor<Element> queryTable(final XpathQuery query) {
query.setConversionService(this.getConversionService());
- return internalQuery(query);
- }
-
- private Cursor<Element> internalQuery(final XpathQuery query){
List<Interval<T>> shardIntervals = getExecutionPlan(query);
//no known shards intersect the query
@@ -128,82 +135,112 @@ public Element act(Engine engine) {
@Override
public void replaceRow(final String id, final Element data) throws KeyNotFoundException {
- doWithEngine(getInterval(id), new EngineAction(){
- @Override
- public Object act(Engine engine) {
- engine.replaceRow(id, data);
- return null;
- }
- });
+ ensureWriteReady();
+ try{
+ doWithEngine(getInterval(id), new EngineAction(){
+ @Override
+ public Object act(Engine engine) {
+ engine.replaceRow(id, data);
+ return null;
+ }
+ });
+ }finally{
+ writeComplete();
+ }
}
@Override
public boolean update(final String id, final XpathUpdate update) throws KeyNotFoundException {
- return doWithEngine(getInterval(id), new EngineAction<Boolean>(){
- @Override
- public Boolean act(Engine engine) {
- return engine.update(id, update);
- }
- });
+ ensureWriteReady();
+ try{
+ return doWithEngine(getInterval(id), new EngineAction<Boolean>(){
+ @Override
+ public Boolean act(Engine engine) {
+ return engine.update(id, update);
+ }
+ });
+ }finally{
+ writeComplete();
+ }
}
@Override
public int update(final XpathQuery query, final XpathUpdate update) {
- query.setConversionService(this.getConversionService());
- update.setConversionService(this.getConversionService());
-
- EngineAction<Integer> action = new EngineAction<Integer>(){
- @Override
- public Integer act(Engine engine) {
- return engine.update(query, update);
+ ensureWriteReady();
+ try{
+ query.setConversionService(this.getConversionService());
+ update.setConversionService(this.getConversionService());
+
+ EngineAction<Integer> action = new EngineAction<Integer>(){
+ @Override
+ public Integer act(Engine engine) {
+ return engine.update(query, update);
+ }
+ };
+
+ int updated = 0;
+ for(Interval<T> shardInterval : this.getExecutionPlan(query)){
+ updated += doWithEngine(shardInterval, action);
}
- };
-
- int updated = 0;
- for(Interval<T> shardInterval : this.getExecutionPlan(query)){
- updated += doWithEngine(shardInterval, action);
+
+ return updated;
+ }finally{
+ writeComplete();
}
-
- return updated;
}
@Override
public boolean upsertRow(final String id, final Element data) {
- return doWithEngine(getInterval(id), new EngineAction<Boolean>(){
- @Override
- public Boolean act(Engine engine) {
- return engine.upsertRow(id, data);
- }
- });
+ ensureWriteReady();
+ try{
+ return doWithEngine(getInterval(id), new EngineAction<Boolean>(){
+ @Override
+ public Boolean act(Engine engine) {
+ return engine.upsertRow(id, data);
+ }
+ });
+ }finally{
+ writeComplete();
+ }
}
@Override
public void deleteRow(final String id) throws KeyNotFoundException {
- doWithEngine(getInterval(id), new EngineAction(){
- @Override
- public Object act(Engine engine) {
- engine.deleteRow(id);
- return null;
- }
- });
+ ensureWriteReady();
+ try{
+ doWithEngine(getInterval(id), new EngineAction(){
+ @Override
+ public Object act(Engine engine) {
+ engine.deleteRow(id);
+ return null;
+ }
+ });
+ }finally{
+ writeComplete();
+ }
}
@Override
public int deleteAll(final XpathQuery query) {
- query.setConversionService(this.getConversionService());
- EngineAction<Integer> action = new EngineAction<Integer>(){
- @Override
- public Integer act(Engine engine) {
- return engine.deleteAll(query);
+ ensureWriteReady();
+ try{
+ query.setConversionService(this.getConversionService());
+ EngineAction<Integer> action = new EngineAction<Integer>(){
+ @Override
+ public Integer act(Engine engine) {
+ return engine.deleteAll(query);
+ }
+ };
+
+ int count = 0;
+ for(Interval<T> shard : getExecutionPlan(query)){
+ count += doWithEngine(shard, action);
}
- };
-
- int count = 0;
- for(Interval<T> shard : getExecutionPlan(query)){
- count += doWithEngine(shard, action);
+
+ return count;
+ }finally{
+ writeComplete();
}
-
- return count;
}
View
158 java/XFlat/src/org/gburgett/xflat/transaction/ThreadContextTransactionManager.java
@@ -4,56 +4,24 @@
*/
package org.gburgett.xflat.transaction;
+import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.jdom2.Element;
+import java.util.concurrent.atomic.AtomicReference;
+import org.gburgett.xflat.db.EngineBase;
+import org.gburgett.xflat.db.EngineTransactionManager;
/**
*
* @author Gordon
*/
-public class ThreadContextTransactionManager implements TransactionManager {
+public class ThreadContextTransactionManager extends EngineTransactionManager {
- AtomicLong lastId = new AtomicLong();
-
- /**
- * Generates a new Transaction ID. The ID is composed of the lower
- * 48 bits of {@link System#currentTimeMillis() } plus a 16-bit uniquifier.
- * unfortunately this means we have a y10k problem :P I'll let my descendants
- * deal with it.
- * @return A new ID for a transaction.
- */
- protected long generateNewId(){
- long id;
- long last;
- do{
- //bitshifting current time millis still gets us at least to the year 10,000 before it overflows.
- id = System.currentTimeMillis() << 16;
- last = lastId.get();
- if((last & 0xFFFFFFFFFFFF0000l) == (id & 0xFFFFFFFFFFFF0000l)){
- //the last ID was at the same millisecond as our new ID, need to use uniquifier.
- int u = (int)(last & 0xFFFFl) + 1;
- if(u > 0xFFFF){
- try {
- //we can't roll over, need to slow down rate of transaction generation.
- Thread.sleep(1);
- } catch (InterruptedException ex) {
- //don't care
- }
- //try again, hopefully currentTimeMillis rolled over.
- continue;
- }
-
- id = id | u;
- }
- }while(!lastId.compareAndSet(last, id));
-
- return id;
- }
-
private Map<Thread, ThreadedTransaction> currentTransactions = new ConcurrentHashMap<>();
private Map<Long, ThreadedTransaction> committedTransactions = new ConcurrentHashMap<>();
@@ -108,13 +76,97 @@ public boolean isTransactionReverted(long transactionId) {
return true;
}
+
@Override
public long transactionlessCommitId() {
return generateNewId();
}
-
+
+ @Override
+ public long getLowestOpenTransaction() {
+ long lowest = Long.MAX_VALUE;
+ for(Transaction tx : currentTransactions.values()){
+ if(tx.getTransactionId() < lowest){
+ lowest = tx.getTransactionId();
+ }
+ }
+
+ return lowest;
+ }
+
+ @Override
+ public void bindEngineToCurrentTransaction(EngineBase engine) {
+
+ ThreadedTransaction tx = currentTransactions.get(Thread.currentThread());
+ if(tx == null){
+ return;
+ }
+
+ tx.boundEngines.add(engine);
+ }
+
+ @Override
+ public void unbindEngineFromTransaction(EngineBase engine, Long transactionId) {
+ ThreadedTransaction tx = null;
+ for(ThreadedTransaction t : this.currentTransactions.values()){
+ if(t.getTransactionId() == transactionId){
+ tx = t;
+ break;
+ }
+ }
+
+ if(tx == null){
+ tx = this.committedTransactions.get(transactionId);
+ }
+
+ if(tx == null){
+ //the transaction was reverted, don't bother unbinding.
+ return;
+ }
+ tx.boundEngines.remove(engine);
+
+ if(tx.boundEngines.isEmpty()){
+ //remove it from the committed transactions if it is empty.
+ this.committedTransactions.remove(tx.getTransactionId());
+ }
+ }
+
+ @Override
+ public void unbindEngineExceptFrom(EngineBase engine, Collection<Long> transactionIds) {
+ for(ThreadedTransaction tx : this.currentTransactions.values()){
+ if(transactionIds.contains(tx.getTransactionId())){
+ continue;
+ }
+
+ //try to remove its binding
+ tx.boundEngines.remove(engine);
+ }
+
+ Iterator<ThreadedTransaction> it = this.committedTransactions.values().iterator();
+ while(it.hasNext()){
+ ThreadedTransaction tx = it.next();
+ if(transactionIds.contains(tx.getTransactionId())){
+ continue;
+ }
+
+ //try to remove its binding
+ tx.boundEngines.remove(engine);
+
+ if(tx.boundEngines.isEmpty()){
+ //remove it from the committed transactions if it is empty.
+ it.remove();
+ }
+ }
+ }
+
+
+ /**
+ * A Transaction that is meant to exist within the context of one thread.
+ * There should be no cross-thread transactional data access, only cross-thread
+ * state querying.
+ */
protected class ThreadedTransaction implements Transaction{
private TransactionOptions options;
@@ -124,6 +176,10 @@ public long transactionlessCommitId() {
private final long id;
+ private AtomicReference<Set<TransactionListener>> listeners = new AtomicReference<>(null);
+
+ final Set<EngineBase> boundEngines = new ConcurrentSkipListSet<>();
+
private long commitId = -1;
@Override
public long getCommitId(){
@@ -198,6 +254,28 @@ public boolean isCommitted() {
public boolean isReverted() {
return isCompleted.get() && commitId > -1;
}
+
+ @Override
+ public void putTransactionListener(TransactionListener listener) {
+ Set<TransactionListener> l = this.listeners.get();
+ if(l == null){
+ l = new ConcurrentSkipListSet<>();
+ if(!this.listeners.compareAndSet(null, l)){
+ l = this.listeners.get();
+ }