Skip to content

Commit

Permalink
Got a lot of coding done for transactions. Still need to work out the…
Browse files Browse the repository at this point in the history
… tests.

Signed-off-by: gburgett <gordon.burgett@gmail.com>
  • Loading branch information
gburgett committed Jan 20, 2013
1 parent 88840d0 commit 3bcc009
Show file tree
Hide file tree
Showing 23 changed files with 955 additions and 94 deletions.
4 changes: 3 additions & 1 deletion java/XFlat/src/org/gburgett/xflat/Cursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@
* @author gordon
*/
public interface Cursor<T> extends Iterable<T>, AutoCloseable {


@Override
void close() throws XflatException;
}
4 changes: 2 additions & 2 deletions java/XFlat/src/org/gburgett/xflat/DatabaseConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ public DatabaseConfig setDefaultTableConfig(TableConfig tableConfig){
/**
* The default configuration used by the Database.
*/
public static DatabaseConfig defaultConfig = new DatabaseConfig()
public static DatabaseConfig Default = new DatabaseConfig()
.setThreadCount(4)
.setPojoConverterClass("org.gburgett.xflat.convert.converters.JAXBPojoConverter")
.setDefaultTableConfig(TableConfig.defaultConfig)
.setDefaultTableConfig(TableConfig.Default)
.setIdGeneratorStrategy(Arrays.asList(
UuidIdGenerator.class,
TimestampIdGenerator.class,
Expand Down
2 changes: 1 addition & 1 deletion java/XFlat/src/org/gburgett/xflat/TableConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public TableConfig sharded(ShardsetConfig<?> config){
* The default configuration used by the database when no configuration
* is specified.
*/
public static TableConfig defaultConfig = new TableConfig()
public static TableConfig Default = new TableConfig()
.setIdGenerator(null)
.setInactivityShutdownMs(3000);

Expand Down
2 changes: 1 addition & 1 deletion java/XFlat/src/org/gburgett/xflat/db/ConvertingTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ public Iterator<T> iterator() {
}

@Override
public void close() throws Exception {
public void close() throws XflatException {
this.rowCursor.close();
}
}
Expand Down
179 changes: 171 additions & 8 deletions java/XFlat/src/org/gburgett/xflat/db/EngineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@
*/
package org.gburgett.xflat.db;

import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.gburgett.xflat.convert.ConversionService;
import org.gburgett.xflat.transaction.Transaction;
import org.gburgett.xflat.transaction.TransactionManager;
import org.jdom2.Attribute;
import org.jdom2.Element;

/**
Expand Down Expand Up @@ -123,6 +131,7 @@ public SpinDownEvent(Engine source){

//</editor-fold>

//<editor-fold desc="dependencies">
private ScheduledExecutorService executorService;
protected ScheduledExecutorService getExecutorService(){
return executorService;
Expand All @@ -141,6 +150,21 @@ protected void setConversionService(ConversionService conversionService) {
this.conversionService = conversionService;
}

private TransactionManager transactionManager;
/**
* Gets the transactionManager.
*/
public TransactionManager getTransactionManager(){
return this.transactionManager;
}
/**
* Sets the transactionManager.
*/
public void setTransactionManager(TransactionManager transactionManager){
this.transactionManager = transactionManager;
}

//</editor-fold>


/**
Expand Down Expand Up @@ -178,16 +202,155 @@ protected void setId(Element row, String id){
row.setAttribute("id", id, XFlatDatabase.xFlatNs);
}



/**
* Checks whether this engine has any transactional updates in an uncommitted
* or unreverted state.
* If so, returns true.
* @return true if this engine has uncommitted transactional data, false otherwise.
*/
protected abstract boolean hasUncomittedData();

/**
* Wraps some element data in a row element with the given ID
* @param data The data to wrap
* @param id The ID to use for the row
* @return The data wrapped in a row element
* Represents one row in the database. The row contains a set of
* {@link RowData} which represents the committed and uncommitted data in
* the row. The row data is mapped by its transaction ID.
* <p/>
* The Row should always be locked before any reading or modification of
* the data.
*/
protected Element wrapInRow(Element data, String id){
Element row = new Element("row", XFlatDatabase.xFlatNs).setContent(data);
setId(row, id);
protected class Row{
/**
* The ID of this row.
*/
public final String rowId;

/**
* A SortedMap of the committed and uncommitted data in the row.
* Always lock the row before accessing this data.
*/
public final SortedMap<Long, RowData> rowData = new TreeMap<>();

public Row(String id){
this.rowId = id;
}

public Row(String id, RowData data){
this.rowId = id;
this.rowData.put(data.transactionId, data);
}

/**
* Chooses the most recent committed RowData that was committed before the given transaction.
* If the transaction is null, this will choose the most recent committed
* RowData globally.
* <p/>
* ALWAYS invoke this while synchronized on the Row.
* @param currentTransaction The current transaction, or null.
* @return The most recent committed RowData in this row, committed before the transaction.
*/
public RowData chooseMostRecentCommitted(Transaction currentTransaction){
if(currentTransaction == null){
return chooseMostRecentCommitted(null, Long.MAX_VALUE);
}

return chooseMostRecentCommitted(currentTransaction, currentTransaction.getTransactionId());
}

/**
* Chooses the most recent committed RowData that was committed before the given transaction ID.
* This prevents dirty reads in a non-transactional context by having a synchronizing transaction ID
* which can be obtained from {@link TransactionManager#transactionlessCommitId() }
* <p/>
* ALWAYS invoke this while synchronized on the Row.
* @param snapshotId The Transaction ID representing the time at which a snapshot of the data should be obtained.
* @return The most recent committed RowData in this row, committed before the given snapshot.
*/
public RowData chooseMostRecentCommitted(Long snapshotId){
return chooseMostRecentCommitted(null, snapshotId);
}

private RowData chooseMostRecentCommitted(Transaction currentTransaction, long currentTxId){

RowData ret = null;
long retCommitId = -1;

Iterator<RowData> it = rowData.values().iterator();
while(it.hasNext()){
RowData data = it.next();

//if we're in a transaction, see if this row is the version for this transaction.
//if the transaction is reverted we don't want that, we want the most recent
//committed version
if(currentTransaction != null && !currentTransaction.isReverted()){

if(data.transactionId > -1 && currentTxId == data.transactionId){
//this row data is the data in the current transaction
return data;
}
}

if(data.commitId == -1){
//uncommitted row data - doublecheck with the transaction manager

data.commitId = transactionManager.isTransactionCommitted(data.transactionId);
}

if(data.commitId > -1){
//this row data has been committed
if(currentTxId > data.commitId){
//the current transaction is null or began after the transaction was committed

if(retCommitId < data.commitId){
//the last valid version we saw was before this version.

ret = data;
retCommitId = data.commitId;
}
}
}
else{
//check if reverted
if(transactionManager.isTransactionReverted(data.transactionId)){
//remove it from the row
it.remove();
}
}
}

return ret;
}


}

protected class RowData{
/**
* A snapshot of the data in the row, possibly uncommitted.
*/
public Element data = null;

/**
* The ID of the transaction that created this data snapshot
*/
public long transactionId = -1;

return row;
/**
* The ID of the transaction commit that caused this row data to become
* committed. If the data is uncommitted, this is -1.
*/
public long commitId = -1;

public RowData(long txId){
this.transactionId = txId;
}

public RowData(long txId, Element data){
this.data = data;
this.transactionId = txId;
}
}


}
24 changes: 22 additions & 2 deletions java/XFlat/src/org/gburgett/xflat/db/ShardedEngineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private EngineBase getEngine(Interval<T> interval){
this.knownShards.put(interval, file);

metadata = this.getMetadataFactory().makeTableMetadata(name, file);
metadata.config = TableConfig.defaultConfig; //not even really used for our purposes
metadata.config = TableConfig.Default; //not even really used for our purposes

TableMetadata weWereLate = openShards.putIfAbsent(interval, metadata);
if(weWereLate != null){
Expand Down Expand Up @@ -169,7 +169,7 @@ protected void update(){
Iterator<TableMetadata> it = openShards.values().iterator();
while(it.hasNext()){
TableMetadata table = it.next();
if(table.getLastActivity() + 3000 < System.currentTimeMillis()){
if(table.canSpinDown()){
//remove right now - if between the check and the remove we got some activity
//then oh well, we can spin up a new instance.
it.remove();
Expand All @@ -184,6 +184,26 @@ protected void update(){
}
}
}

@Override
protected boolean hasUncomittedData() {
EngineState state = this.state.get();
if(state == EngineState.SpinningDown){
for(EngineBase e : this.spinningDownEngines.values()){
if(e.hasUncomittedData()){
return true;
}
}
}
else if(state == EngineState.Running){
for(TableMetadata table : this.openShards.values()){
if(table.hasUncommittedData()){
return true;
}
}
}
return false;
}

@Override
protected boolean spinUp() {
Expand Down
35 changes: 29 additions & 6 deletions java/XFlat/src/org/gburgett/xflat/db/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,16 @@ public String getName(){
TableConfig config;

long lastActivity = System.currentTimeMillis();
public long getLastActivity(){
return lastActivity;


public boolean canSpinDown(){
EngineBase engine = this.engine.get();
return lastActivity + 3000 < System.currentTimeMillis() && engine == null || !engine.hasUncomittedData();
}

public boolean hasUncommittedData(){
EngineBase engine = this.engine.get();
return engine != null && engine.hasUncomittedData();
}

EngineState getEngineState(){
Expand Down Expand Up @@ -182,13 +190,29 @@ else if(state == EngineState.SpinningUp ||

public EngineBase spinDown(){
synchronized(this){
final EngineBase engine = this.engine.getAndSet(null);
EngineBase engine = this.engine.getAndSet(null);
EngineState state;
if(engine == null ||
(state = engine.getState()) == EngineState.SpinningDown ||
state == EngineState.SpunDown)
state == EngineState.SpunDown){
//another thread already spinning it down
return engine;

}
else{
if(engine.hasUncomittedData()){
//whoops! see if we can put it back quick
engine = this.engine.getAndSet(engine);
//continue like we were spinning this one down.

if(engine == null ||
(state = engine.getState()) == EngineState.SpinningDown ||
state == EngineState.SpunDown){

return engine;
}
}
}

Log l = LogFactory.getLog(getClass());
if(l.isTraceEnabled())
Expand All @@ -197,8 +221,7 @@ public EngineBase spinDown(){

if(engine.spinDown(new SpinDownEventHandler(){
@Override
public void spinDownComplete(SpinDownEvent event) {
engine.forceSpinDown();
public void spinDownComplete(SpinDownEvent event) {
}
}))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private TableMetadata makeNewTableMetadata(String name, File engineFile, TableCo

TableMetadata ret = new TableMetadata(name, db, engineFile);

config = config == null ? TableConfig.defaultConfig : config;
config = config == null ? TableConfig.Default : config;
ret.config = config;

//make ID Generator
Expand Down
2 changes: 1 addition & 1 deletion java/XFlat/src/org/gburgett/xflat/db/XFlatDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void run() {
//the engine cache
private ConcurrentHashMap<String, TableMetadata> tables = new ConcurrentHashMap<>();

private DatabaseConfig config = DatabaseConfig.defaultConfig;
private DatabaseConfig config = DatabaseConfig.Default;
public void setConfig(DatabaseConfig config){
if(this.state.get() != DatabaseState.Uninitialized){
throw new XflatException("Cannot configure database after initialization");
Expand Down
Loading

0 comments on commit 3bcc009

Please sign in to comment.