Skip to content
Browse files

Fixed a durability issue that would cause data corruption. Added impr…

…oved testing for that scenario. Added some tests for multithreaded DB access.

Signed-off-by: gburgett <gordon.burgett@gmail.com>
  • Loading branch information...
1 parent 0d01975 commit d4eba56436e8bf6b6d8d6547b726003022b7f0f7 @gburgett committed Feb 15, 2013
View
3 .gitignore
@@ -2,4 +2,5 @@
/java/XFlat/nbproject/private/
/java/XFlat/build/
-/java/XFlat/dist/
+/java/XFlat/dist/
+/java/XFlat/integrationtests
View
7 java/XFlat/src/org/xflatdb/xflat/db/EngineBase.java
@@ -17,6 +17,7 @@
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -428,7 +429,9 @@ protected void setId(Element row, String id){
* @param tx
*/
public void commit(Transaction tx, TransactionOptions options){
-
+ if(tx.isCommitted() || tx.isReverted()){
+ throw new UnsupportedOperationException("Cannot commit an already finished transaction.");
+ }
}
/**
@@ -629,7 +632,7 @@ public boolean cleanup(){
//if there's no more row datas, or there is only one row data and it's value is "nothing", then return true.
return rowData.isEmpty() || (rowData.size() == 1 && rowData.values().iterator().next().data == null);
- }
+ }
}
protected class RowData{
View
35 java/XFlat/src/org/xflatdb/xflat/db/IdAccessor.java
@@ -81,12 +81,14 @@ private IdAccessor(Class<T> pojoType, PropertyDescriptor idProperty, Field idFie
try{
Object idPropOrField = getIdPropertyOrField(pojoType);
- if(idPropOrField instanceof PropertyDescriptor){
- idProp = (PropertyDescriptor)idPropOrField;
- }
- else{
- idField = (Field)idPropOrField;
- idField.setAccessible(true);
+ if(idPropOrField != null){
+ if(idPropOrField instanceof PropertyDescriptor){
+ idProp = (PropertyDescriptor)idPropOrField;
+ }
+ else{
+ idField = (Field)idPropOrField;
+ idField.setAccessible(true);
+ }
}
}
catch(IntrospectionException ex){
@@ -168,15 +170,18 @@ private static Object getIdPropertyOrField(Class<?> pojoType) throws Introspecti
List<Field> fields = new ArrayList<>();
- for(Field f : pojoType.getDeclaredFields()){
- if(Object.class.equals(f.getDeclaringClass()))
- continue;
-
- if((f.getModifiers() & Modifier.STATIC) == Modifier.STATIC ||
- (f.getModifiers() & Modifier.FINAL) == Modifier.FINAL)
- continue;
-
- fields.add(f);
+ while(!Object.class.equals(pojoType)){
+ for(Field f : pojoType.getDeclaredFields()){
+ if(Object.class.equals(f.getDeclaringClass()))
+ continue;
+
+ if((f.getModifiers() & Modifier.STATIC) == Modifier.STATIC ||
+ (f.getModifiers() & Modifier.FINAL) == Modifier.FINAL)
+ continue;
+
+ fields.add(f);
+ }
+ pojoType = pojoType.getSuperclass();
}
//try fields marked with ID attribute
View
6 java/XFlat/src/org/xflatdb/xflat/db/ShardedEngineBase.java
@@ -212,7 +212,7 @@ protected void updateTask(){
while(it.hasNext()){
TableMetadata table = it.next();
if(table.canSpinDown()){
- EngineBase spinDown = table.spinDown(false);
+ EngineBase spinDown = table.spinDown(false, false);
//don't remove any metadata. It's too dangerous with the way the concurrency is structured.
@@ -333,7 +333,7 @@ protected boolean spinDown(final SpinDownEventHandler completionEventHandler) {
synchronized(spinDownSyncRoot){
for(Map.Entry<Interval<T>, TableMetadata> m : this.openShards.entrySet()){
- EngineBase spinningDown = m.getValue().spinDown(true);
+ EngineBase spinningDown = m.getValue().spinDown(true, false);
this.spinningDownEngines.put(m.getKey(), spinningDown);
}
}
@@ -402,7 +402,7 @@ protected boolean forceSpinDown() {
synchronized(spinDownSyncRoot){
for(Map.Entry<Interval<T>, TableMetadata> m : this.openShards.entrySet()){
- EngineBase spinningDown = m.getValue().spinDown(true);
+ EngineBase spinningDown = m.getValue().spinDown(true, true);
this.spinningDownEngines.put(m.getKey(), spinningDown);
}
View
9 java/XFlat/src/org/xflatdb/xflat/db/TableMetadata.java
@@ -258,12 +258,13 @@ else if(state == EngineState.SpinningUp ||
/**
* Spins down the engine, leaving the metadata in a state where it will
* be required to spin up a new engine before providing it.
- * @param force Whether to force a spin down even if the engine has uncommitted
+ * @param ignoreUncommitted Whether to require a spin down even if the engine has uncommitted
* data, effectively automatically reverting it. Usually only set when the
* entire database is being shut down.
+ * @param force whether to use forceSpinDown instead of a natural spin down.
* @return The engine that was spun down.
*/
- public EngineBase spinDown(boolean force){
+ public EngineBase spinDown(boolean ignoreUncommitted, boolean force){
lock.writeLock().lock();
try{
EngineBase engine = this.engine.get();
@@ -280,7 +281,7 @@ public EngineBase spinDown(boolean force){
try{
engine.getTableLock();
- if(engine.hasUncomittedData() && !force){
+ if(engine.hasUncomittedData() && !ignoreUncommitted){
//can't spin it down, return the engine
return engine;
}
@@ -295,7 +296,7 @@ public EngineBase spinDown(boolean force){
log.trace(String.format("Spinning down table %s", this.name));
- if(engine.spinDown(new SpinDownEventHandler(){
+ if(!force && engine.spinDown(new SpinDownEventHandler(){
@Override
public void spinDownComplete(SpinDownEvent event) {
}
View
6 java/XFlat/src/org/xflatdb/xflat/db/XFlatDatabase.java
@@ -110,7 +110,7 @@ public TransactionManager getTransactionManager(){
return this.transactionManager;
}
- EngineTransactionManager getEngineTransactionManager(){
+ protected EngineTransactionManager getEngineTransactionManager(){
return this.transactionManager;
}
@@ -294,7 +294,7 @@ private void doShutdown(int timeout) throws TimeoutException{
Set<EngineBase> engines = new HashSet<>();
for(Map.Entry<String, TableMetadata> table : this.tables.entrySet()){
try{
- EngineBase e = table.getValue().spinDown(true);
+ EngineBase e = table.getValue().spinDown(true, false);
if(e != null){
if(e.getState() == EngineState.Running){
//don't care, force spin down
@@ -409,7 +409,7 @@ private void update(){
for(TableMetadata m : this.tables.values()){
if(m.canSpinDown()){
//spin down if no uncommitted data
- m.spinDown(false);
+ m.spinDown(false, false);
}
//don't ever remove TableMetadata. It's too dangerous with the way we do locking and isn't worth it.
}
View
181 java/XFlat/src/org/xflatdb/xflat/engine/CachedDocumentEngine.java
@@ -17,6 +17,7 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -452,11 +453,12 @@ public int deleteAll(XPathQuery query) {
- private void updateTask(){
+ private void updateTask(boolean cleanAll){
Set<Long> remainingTransactions = new HashSet<>();
Set<Row> rowsToRemove = new HashSet<>();
+
synchronized(syncRoot){
if(this.currentlyCommitting.get() != -1){
if(this.getTransactionManager().isTransactionCommitted(this.currentlyCommitting.get()) == -1 &&
@@ -467,15 +469,22 @@ private void updateTask(){
}
}
-
- Iterator<Row> it = this.uncommittedRows.values().iterator();
+ //What are we cleaning? If cleanAll, then inspect the ENTIRE cache, not just uncommitted data.
+ Iterable<Row> toClean;
+ if(cleanAll)
+ toClean = this.cache.values();
+ else
+ toClean = this.uncommittedRows.values();
+
+ Iterator<Row> it = toClean.iterator();
while(it.hasNext()){
Row row = it.next();
synchronized(row){
if(row.cleanup()){
rowsToRemove.add(row);
//fully committed, we can remove it from uncommitted rows.
- it.remove();
+ if(!cleanAll)
+ it.remove();
}
else{
boolean isFullyCommitted = true;
@@ -487,7 +496,7 @@ private void updateTask(){
}
}
- if(isFullyCommitted){
+ if(!cleanAll && isFullyCommitted){
//fully committed, we can remove it from uncommitted rows.
it.remove();
}
@@ -530,6 +539,8 @@ private void updateTask(){
@Override
public void commit(Transaction tx, TransactionOptions options){
+ super.commit(tx, options);
+
synchronized(syncRoot){
if(!currentlyCommitting.compareAndSet(-1, tx.getTransactionId())){
//see if this transaction is completely finished committing, or if it reverted
@@ -574,18 +585,28 @@ public void commit(Transaction tx, TransactionOptions options){
//we must immediately dump the cache, we cannot say we are committed
//until the data is on disk.
lastModified.set(System.currentTimeMillis());
- dumpCacheNow();
+ dumpCacheNow(true);
currentlyCommitting.compareAndSet(tx.getTransactionId(), -1);
}
}
@Override
public void revert(long txId, boolean isRecovering){
+ super.revert(txId, isRecovering);
+
synchronized(syncRoot){
boolean mustDump = false;
- Iterator<Row> it = this.uncommittedRows.values().iterator();
+ Iterable<Row> toRevert;
+ if(isRecovering)
+ //need to revert over the entire cache.
+ toRevert = this.cache.values();
+ else
+ //need to revert only over the uncommitted rows.
+ toRevert = this.uncommittedRows.values();
+
+ Iterator<Row> it = toRevert.iterator();
while(it.hasNext()){
Row row = it.next();
synchronized(row){
@@ -602,7 +623,7 @@ public void revert(long txId, boolean isRecovering){
if(mustDump){
lastModified.set(System.currentTimeMillis());
- this.dumpCacheNow();
+ this.dumpCacheNow(true);
}
//else we can leave dumping the cache for the cleanup task.
@@ -633,37 +654,53 @@ protected boolean spinUp() {
if(row.getChildren().isEmpty()){
continue;
}
- Element data = row.getChildren().get(0).detach();
String id = getId(row);
- //default it to zero so that we know it's committed but if we don't get an actual
- //value for the commit then we have the lowest value.
- long txId = 0;
- long commitId = 0;
+
+ Row newRow = null;
- String a = row.getAttributeValue("tx", XFlatDatabase.xFlatNs);
- if(a != null && !"".equals(a)){
- try{
- txId = Long.parseLong(a);
- }catch(NumberFormatException ex){
- //just leave it as 0.
+ for(Element data : row.getChildren()){
+ //default it to zero so that we know it's committed but if we don't get an actual
+ //value for the commit then we have the lowest value.
+ long txId = 0;
+ long commitId = 0;
+
+ String a = data.getAttributeValue("tx", XFlatDatabase.xFlatNs);
+ if(a != null && !"".equals(a)){
+ try{
+ txId = Long.parseLong(a, 16);
+ }catch(NumberFormatException ex){
+ //just leave it as 0.
+ }
}
- }
- a = row.getAttributeValue("commit", XFlatDatabase.xFlatNs);
- if(a != null && !"".equals(a)){
- try{
- commitId = Long.parseLong(a);
- }catch(NumberFormatException ex){
- //just leave it as 0.
+ a = data.getAttributeValue("commit", XFlatDatabase.xFlatNs);
+ if(a != null && !"".equals(a)){
+ try{
+ commitId = Long.parseLong(a, 16);
+ }catch(NumberFormatException ex){
+ //just leave it as 0.
+ }
}
+
+ if("delete".equals(data.getName()) && XFlatDatabase.xFlatNs.equals(data.getNamespace())){
+ //it's a delete marker
+ data = null;
+ }
+ else{
+ data = data.detach();
+ }
+
+ RowData rData = new RowData(txId, data, id);
+ rData.commitId = commitId;
+
+ if(newRow == null)
+ newRow = new Row(id, rData);
+ else
+ newRow.rowData.put(txId, rData);
}
- RowData rData = new RowData(txId, data, id);
- rData.commitId = commitId;
-
- Row newRow = new Row(id, rData);
-
- this.cache.put(id, newRow);
+ if(newRow != null)
+ this.cache.put(id, newRow);
}
} catch (JDOMException | IOException ex) {
throw new XFlatException("Error building document cache", ex);
@@ -680,13 +717,18 @@ protected boolean spinUp() {
//schedule the update task
this.getExecutorService().scheduleWithFixedDelay(new Runnable(){
+ int runCount = 0;
+
@Override
public void run() {
if(state.get() == EngineState.SpinningDown || state.get() == EngineState.SpunDown){
throw new RuntimeException("task termination");
}
- updateTask();
+ runCount = (runCount + 1) % 100;
+
+ //every 100 iterations, clean the entire cache.
+ updateTask(runCount == 0);
}
}, 500, 500, TimeUnit.MILLISECONDS);
@@ -756,16 +798,19 @@ protected boolean spinDown(final SpinDownEventHandler completionEventHandler) {
if(log.isTraceEnabled())
log.trace(String.format("Table %s Spinning down", this.getTableName()));
+
+ //do the transactional data cleanup task, ensuring we clean the entire cache.
+ updateTask(true);
final AtomicReference<ScheduledFuture<?>> cacheDumpTask = new AtomicReference<>(null);
- if(this.cache != null && lastModified.get() >= lastDump.get()){
+ if(this.cache != null){
//schedule immediate dump
cacheDumpTask.set(this.getExecutorService().schedule(
new Runnable(){
@Override
public void run() {
try{
- dumpCacheNow();
+ dumpCacheNow(true);
}
catch(Exception ex){
log.warn("Unable to dump cached data", ex);
@@ -873,7 +918,7 @@ private void dumpCache(){
@Override
public void run() {
try{
- dumpCacheNow();
+ dumpCacheNow(false);
}
catch(XFlatException ex){
log.warn("Unable to dump cached data", ex);
@@ -897,41 +942,69 @@ public void run() {
}
private final Object dumpSyncRoot = new Object();
- private void dumpCacheNow(){
+ /**
+ * Dumps the cache immediately, on this thread.
+ * @param required true if a dump is absolutely required, false to allow this
+ * method to choose not to dump if it feels that a dump is unnecessary.
+ */
+ private void dumpCacheNow(boolean required){
synchronized(dumpSyncRoot){
- if(lastModified.get() < lastDump.get()){
+ if(!required && lastModified.get() < lastDump.get()){
//no need to dump
return;
}
long lastDump = System.currentTimeMillis();
- //take a 'snapshot' of the detached elements
+
Document doc = new Document();
Element root = new Element("table", XFlatDatabase.xFlatNs)
.setAttribute("name", this.getTableName(), XFlatDatabase.xFlatNs);
doc.setRootElement(root);
- //get a transaction ID so we are taking a snapshot of the committed data at this point in time.
- long snapshotId = getTransactionManager().transactionlessCommitId();
-
for(Row row : this.cache.values()){
synchronized(row){
- RowData rData = row.chooseMostRecentCommitted(snapshotId);
- if(rData == null || rData.data == null){
- //the data was deleted
- continue;
- }
-
+ Element rowEl = null;
- Element rowEl = new Element("row", XFlatDatabase.xFlatNs);
- setId(rowEl, row.rowId);
- rowEl.setAttribute("tx", Long.toString(rData.transactionId), XFlatDatabase.xFlatNs);
- rowEl.setAttribute("commit", Long.toString(rData.commitId), XFlatDatabase.xFlatNs);
+ int nonDeleteData = 0;
- rowEl.addContent(rData.data.clone());
+ //put ALL committed data to disk, even some that might otherwise
+ //be cleaned up, because we may be in the process of committing
+ //one of N engines and will need all previous values if we revert.
+ for(RowData rData : row.rowData.values()){
+ if(rData == null)
+ continue;
+
+ if(rData.commitId == -1)
+ //uncommitted data is not put to disk
+ continue;
+
+ if(rowEl == null){
+ rowEl = new Element("row", XFlatDatabase.xFlatNs);
+ setId(rowEl, row.rowId);
+ }
+
+ Element dataEl;
+ if(rData.data == null){
+ //the data was deleted - make sure we mark that on the row
+ dataEl = new Element("delete", XFlatDatabase.xFlatNs);
+ }
+ else{
+ dataEl = rData.data.clone();
+ nonDeleteData++;
+ }
+
+ dataEl.setAttribute("tx", Long.toHexString(rData.transactionId), XFlatDatabase.xFlatNs);
+ dataEl.setAttribute("commit", Long.toHexString(rData.commitId), XFlatDatabase.xFlatNs);
+
+ rowEl.addContent(dataEl);
+ }
- root.addContent(rowEl);
+ //doublecheck - only write out an element if there's actually
+ //any data to write. Delete marker elements don't count.
+ if(rowEl != null && nonDeleteData > 0){
+ root.addContent(rowEl);
+ }
}
}
View
8 java/XFlat/src/org/xflatdb/xflat/transaction/ThreadContextTransactionManager.java
@@ -818,8 +818,8 @@ public void removeTransactionListener(TransactionListener listener) {
@Override
public Element convert(TransactionJournalEntry source) throws ConversionException {
Element ret = new Element("entry");
- ret.setAttribute("txId", Long.toString(source.txId));
- ret.setAttribute("commit", Long.toString(source.commitId));
+ ret.setAttribute("txId", Long.toHexString(source.txId));
+ ret.setAttribute("commit", Long.toHexString(source.commitId));
for(String s : source.tableNames){
ret.addContent(new Element("table").setText(s));
@@ -839,11 +839,11 @@ public TransactionJournalEntry convert(Element source) throws ConversionExceptio
if(txId == null){
throw new ConversionException("txId attribute required");
}
- ret.txId = Long.parseLong(txId);
+ ret.txId = Long.parseLong(txId, 16);
String commitId = source.getAttributeValue("commit");
if(commitId != null){
- ret.commitId = Long.parseLong(commitId);
+ ret.commitId = Long.parseLong(commitId, 16);
}
for(Element e : source.getChildren("table")){
View
19 java/XFlat/test/org/xflatdb/xflat/db/DatabaseIntegrationTest.java
@@ -15,19 +15,10 @@
*/
package org.xflatdb.xflat.db;
-import org.xflatdb.xflat.db.TimestampIdGenerator;
-import org.xflatdb.xflat.db.XFlatDatabase;
-import org.xflatdb.xflat.db.BigIntIdGenerator;
-import org.xflatdb.xflat.TableConfig;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.LogFactory;
-import org.xflatdb.xflat.Cursor;
-import org.xflatdb.xflat.Table;
-import org.xflatdb.xflat.XFlatException;
-import org.xflatdb.xflat.query.XPathQuery;
-import org.xflatdb.xflat.util.DocumentFileWrapper;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
@@ -37,9 +28,15 @@
import org.jdom2.JDOMException;
import org.jdom2.xpath.XPathExpression;
import org.jdom2.xpath.XPathFactory;
+import static org.junit.Assert.*;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.*;
+import org.xflatdb.xflat.Cursor;
+import org.xflatdb.xflat.Table;
+import org.xflatdb.xflat.TableConfig;
+import org.xflatdb.xflat.XFlatException;
+import org.xflatdb.xflat.query.XPathQuery;
+import org.xflatdb.xflat.util.DocumentFileWrapper;
import test.Baz;
import test.Foo;
import test.Utils;
@@ -50,7 +47,7 @@
*/
public class DatabaseIntegrationTest {
- static File workspace = new File("DbIntegrationTests");
+ static File workspace = new File("integrationtests");
@BeforeClass
public static void setUpClass(){
View
147 java/XFlat/test/org/xflatdb/xflat/db/EngineTestsBase.java
@@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.xflatdb.xflat.Cursor;
import org.xflatdb.xflat.DuplicateKeyException;
@@ -54,12 +55,17 @@
import org.jdom2.Document;
import org.jdom2.Element;
import org.jdom2.JDOMException;
+import org.jdom2.output.XMLOutputter;
+import org.jdom2.xpath.XPathFactory;
import org.junit.AfterClass;
import static org.junit.Assert.*;
import org.junit.BeforeClass;
import org.junit.Test;
import test.Utils;
import static org.mockito.Mockito.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.xflatdb.xflat.transaction.TransactionManager;
import org.xflatdb.xflat.transaction.TransactionScope;
/**
@@ -1130,7 +1136,11 @@ public void testDeleteRow_RowExists_Deletes() throws Exception {
spinDown(ctx);
Document doc = getFileContents(ctx);
+ System.out.println(dumpDoc(doc));
+
List<Element> children = doc.getRootElement().getChildren("row", XFlatDatabase.xFlatNs);
+
+
assertEquals("Document should have one fewer element", 1, children.size());
assertThat("Document should have correct data", children,
@@ -1225,6 +1235,8 @@ public void testDeleteAll_MatchesMultiple_MultipleDeleted() throws Exception {
spinDown(ctx);
Document doc = getFileContents(ctx);
+ System.out.println(dumpDoc(doc));
+
List<Element> children = doc.getRootElement().getChildren("row", XFlatDatabase.xFlatNs);
assertEquals("Document should have fewer elements", 1, children.size());
@@ -1379,11 +1391,11 @@ public void testInsert_InTransaction_HasReadIsolation() throws Exception {
ctx.instance.insertRow("1", rowData);
//swap out the transaction
- ctx.transactionManager.setContextId(1L);
+ ((FakeThreadContextTransactionManager)ctx.transactionManager).setContextId(1L);
outsideTransaction = ctx.instance.readRow("1");
//swap the transaction back
- ctx.transactionManager.setContextId(0L);
+ ((FakeThreadContextTransactionManager)ctx.transactionManager).setContextId(0L);
insideTransaction = ctx.instance.readRow("1");
tx.commit();
@@ -1434,26 +1446,26 @@ public void testQuery_InTransaction_HasReadIsolation() throws Exception {
.setText("some text data");
//now that we've opened the transaction, switch contexts and insert
- ctx.transactionManager.setContextId(1L);
+ ((FakeThreadContextTransactionManager)ctx.transactionManager).setContextId(1L);
ctx.instance.insertRow("4", rowData);
//ACT
//switch back and query
- ctx.transactionManager.setContextId(0L);
+ ((FakeThreadContextTransactionManager)ctx.transactionManager).setContextId(0L);
try(Cursor<Element> cursor = ctx.instance.queryTable(query)){
Iterator<Element> it = cursor.iterator();
fromCursor.add(it.next());
//switch contexts and insert again
- ctx.transactionManager.setContextId(1L);
+ ((FakeThreadContextTransactionManager)ctx.transactionManager).setContextId(1L);
ctx.instance.insertRow("5", new Element("fifth")
.setAttribute("fooInt", "17")
.setText("fifth text data"));
- ctx.transactionManager.setContextId(0L);
+ ((FakeThreadContextTransactionManager)ctx.transactionManager).setContextId(0L);
while(it.hasNext()){
fromCursor.add(it.next());
}
@@ -1489,12 +1501,12 @@ public void testConflictingWrite_SnapshotIsolation_ThrowsWriteConflictException(
ctx.instance.insertRow("1", rowData);
//swap out the transaction
- ctx.transactionManager.setContextId(1L);
+ ((FakeThreadContextTransactionManager)ctx.transactionManager).setContextId(1L);
ctx.instance.insertRow("1", new Element("other").setText("other text data"));
//swap the transaction back
- ctx.transactionManager.setContextId(0L);
+ ((FakeThreadContextTransactionManager)ctx.transactionManager).setContextId(0L);
try{
tx.commit();
@@ -1521,13 +1533,13 @@ public void testConflictingWrite_SnapshotIsolation_TwoTransactions_ThrowsWriteCo
ctx.instance.insertRow("1", rowData);
//swap out the transaction
- ctx.transactionManager.setContextId(1L);
+ ((FakeThreadContextTransactionManager)ctx.transactionManager).setContextId(1L);
try(TransactionScope tx2 = ctx.transactionManager.openTransaction()){
//insert conflicting data
ctx.instance.insertRow("1", new Element("other").setText("other text data"));
//swap the transaction back
- ctx.transactionManager.setContextId(0L);
+ ((FakeThreadContextTransactionManager)ctx.transactionManager).setContextId(0L);
//should be OK
tx.commit();
@@ -1703,7 +1715,7 @@ public int deleteAll(XPathQuery query) {
try(TransactionScope tx = ctx.transactionManager.openTransaction()){
- ctx.transactionManager.bindEngineToCurrentTransaction(eng2);
+ ((FakeThreadContextTransactionManager)ctx.transactionManager).bindEngineToCurrentTransaction(eng2);
ctx.instance.insertRow("0", new Element("data").setText("some text data"));
ctx.instance.insertRow("1", new Element("second").setText("second text data"));
@@ -1719,6 +1731,107 @@ public int deleteAll(XPathQuery query) {
assertNull("Should have reverted data after partial commit", ctx.instance.readRow("0"));
assertNull("Should have reverted data after partial commit", ctx.instance.readRow("1"));
}
+
+ /**
+ * Test that the data persisted to disk is full enough that a new engine can revert
+ * a committed transaction. This tests durability of transactions.
+ * @throws Exception
+ */
+ @Test
+ public void testCommit_EngineShutDown_NewEngineCanRevertCommittedData() throws Exception {
+ System.out.println("testCommit_EngineShutDown_NewEngineCanRevertCommittedData");
+
+ final TestContext ctx = getContext();
+
+ EngineTransactionManager txManager = mock(EngineTransactionManager.class);
+ ctx.transactionManager = txManager;
+ ctx.instance.setTransactionManager(txManager);
+
+
+ final AtomicLong txIds = new AtomicLong(37);
+
+ when(txManager.transactionlessCommitId())
+ .then(new Answer<Long>(){
+ @Override
+ public Long answer(InvocationOnMock invocation) throws Throwable {
+ return txIds.incrementAndGet();
+ }
+ });
+
+ prepFileContents(ctx, null);
+ spinUp(ctx);
+
+ ctx.instance.insertRow("0", new Element("data").setText("some text data"));
+ ctx.instance.insertRow("1", new Element("second").setText("second text data"));
+
+ //now we open a transaction - should not need transactionless ID anymore
+ when(txManager.transactionlessCommitId())
+ .thenThrow(Exception.class);
+ Transaction tx = mock(Transaction.class);
+ //assign TX and commit IDs
+ long txId = txIds.incrementAndGet();
+ when(tx.getTransactionId())
+ .thenReturn(txId);
+ long commitId = txIds.incrementAndGet();
+ when(tx.getCommitId())
+ .thenReturn(commitId);
+
+ when(txManager.getTransaction())
+ .thenReturn(tx);
+ when(txManager.isTransactionCommitted(txId))
+ .thenReturn(-1L);
+ when(txManager.anyOpenTransactions())
+ .thenReturn(true);
+ when(txManager.getLowestOpenTransaction())
+ .thenReturn(txId);
+
+
+ //insert the transactional data
+ ctx.instance.insertRow("2", new Element("transactional3").setText("tx text 3"));
+ ctx.instance.update("1", XPathUpdate.set(XPathFactory.instance().compile("second"), "tx updated text"));
+ ctx.instance.deleteRow("0");
+
+ //ACT
+ when(txManager.isCommitInProgress(tx.getCommitId()))
+ .thenReturn(true);
+ ctx.instance.commit(tx, TransactionOptions.DEFAULT);
+ //pretend the process is failing, but let it do it's normal tasks (we're still in the process of committing anyways).
+ ctx.executorService.shutdown();
+ ctx.executorService.awaitTermination(5, TimeUnit.SECONDS);
+
+ ctx.instance.forceSpinDown();
+
+ System.out.println("partially committed data");
+ System.out.println(dumpDoc(getFileContents(ctx)));
+
+ //spin up a new engine instance and ask it to revert the old transaction
+ ctx.executorService = new ScheduledThreadPoolExecutor(2);
+ //ctx.executorService = mock(ScheduledExecutorService.class); //uncomment this for debugging
+ ctx.transactionManager = new FakeThreadContextTransactionManager(new FakeDocumentFileWrapper(ctx.transactionJournal));
+ ctx.instance = setupEngine(ctx);
+
+ spinUp(ctx);
+
+ //wait a sec cause it might take us a while to actually end up reverting when we spin up
+ Thread.sleep(1000);
+
+ ctx.instance.revert(tx.getTransactionId(), true);
+
+ //ASSERT
+ Element row = ctx.instance.readRow("0");
+ assertNotNull("Original data should exist", row);
+ assertThat("Should have original data", row, isNamed("data"));
+ assertThat("Should have original data", row, hasText("some text data"));
+
+ row = ctx.instance.readRow("1");
+ assertNotNull("Original data should exist", row);
+ assertThat("Should have original data", row, isNamed("second"));
+ assertThat("Should have original data", row, hasText("second text data"));
+
+ row = ctx.instance.readRow("2");
+ assertNull("Should not have kept transactionally inserted data", row);
+ }
+
//</editor-fold>
//</editor-fold>
@@ -1800,6 +1913,16 @@ public void describeTo(Description description) {
};
}
+ private static final XMLOutputter outputter = new XMLOutputter();
+ /**
+ * Dumps a document to a string for debugging purposes.
+ * @param doc
+ * @return The string representation of the document.
+ */
+ protected String dumpDoc(Document doc){
+ return outputter.outputString(doc);
+ }
+
protected class TestContext{
public TEngine instance;
@@ -1811,7 +1934,7 @@ public void describeTo(Description description) {
public AtomicReference<Document> transactionJournal = new AtomicReference<>(new Document().setRootElement(new Element("transactionJournal")));
- public FakeThreadContextTransactionManager transactionManager = new FakeThreadContextTransactionManager(new FakeDocumentFileWrapper(transactionJournal));
+ public EngineTransactionManager transactionManager = new FakeThreadContextTransactionManager(new FakeDocumentFileWrapper(transactionJournal));
public final Map<String, Object> additionalContext;
View
396 java/XFlat/test/org/xflatdb/xflat/db/MultithreadedDbIntegrationTests.java
@@ -0,0 +1,396 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.xflatdb.xflat.db;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import junit.framework.AssertionFailedError;
+import org.hamcrest.Matchers;
+import org.jdom2.Element;
+import org.jdom2.xpath.XPathExpression;
+import org.jdom2.xpath.XPathFactory;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.xflatdb.xflat.Table;
+import test.Foo;
+import test.Utils;
+import static org.junit.Assert.*;
+import org.xflatdb.xflat.query.XPathQuery;
+import org.xflatdb.xflat.query.XPathUpdate;
+import org.xflatdb.xflat.transaction.Propagation;
+import org.xflatdb.xflat.transaction.TransactionOptions;
+import org.xflatdb.xflat.transaction.TransactionScope;
+
+/**
+ *
+ * @author Gordon
+ */
+public class MultithreadedDbIntegrationTests {
+
+ static File workspace = new File("integrationtests");
+
+ @BeforeClass
+ public static void setUpClass(){
+ if(workspace.exists()){
+ Utils.deleteDir(workspace);
+ }
+ }
+
+ private XFlatDatabase getDatabase(String testName){
+ File dbDir = new File(workspace, testName);
+ XFlatDatabase ret = new XFlatDatabase(dbDir);
+
+ return ret;
+ }
+
+ private List<Thread> run(Runnable r, int threads){
+ List<Thread> ret = new ArrayList<>();
+ for(int i = 0; i < threads; i++){
+ ret.add(new Thread(r));
+ }
+
+ for(int i = 0; i < threads; i++){
+ ret.get(i).start();
+ }
+ return ret;
+ }
+
+ private void spinWait(long nanos){
+ long start = System.nanoTime();
+ long diff = 0;
+ do{
+ diff = System.nanoTime() - start;
+ if((diff & 0xFFFF) == 0L) //approx. 65 uS
+ Thread.yield();
+
+ }while(diff - nanos < 0);
+ }
+
+ private static final Random seedRandom = new Random();
+ private ThreadLocal<Random> random = new ThreadLocal<Random>(){
+ @Override
+ protected Random initialValue(){
+ return new Random(seedRandom.nextLong());
+ }
+ };
+
+ @Test
+ public void HeavyRead_OneUpdate_AfterInsertAllGetNewValue() throws Exception {
+ System.out.println("HeavyRead_OneUpdate_AfterInsertAllGetNewValue");
+
+ final AtomicBoolean finished = new AtomicBoolean(false);
+ final XFlatDatabase db = getDatabase("HeavyRead_OneUpdate_AfterInsertAllGetNewValue");
+
+ db.getConversionService().addConverter(Foo.class, Element.class, new Foo.ToElementConverter());
+ db.getConversionService().addConverter(Element.class, Foo.class, new Foo.FromElementConverter());
+
+ db.Initialize();
+ try{
+
+ final Foo oldFoo = new Foo();
+ oldFoo.fooInt = 5;
+ oldFoo.setId("1");
+
+ final Foo newFoo = new Foo();
+ newFoo.fooInt = 6;
+ newFoo.setId("1");
+
+ final AtomicInteger counter = new AtomicInteger(0);
+
+ final List<Integer> allOldFooCounters = new ArrayList<>();
+ final List<Integer> allNewFooCounters = new ArrayList<>();
+
+ //set up readers
+ Runnable r = new Runnable(){
+ @Override
+ public void run() {
+ List<Integer> oldFooCounters = new ArrayList<>();
+ List<Integer> newFooCounters = new ArrayList<>();
+
+ Table<Foo> fooTable = db.getTable(Foo.class);
+
+ while(!finished.get()){
+
+ Integer count = counter.incrementAndGet();
+ Foo foo = fooTable.find("1");
+ if(oldFoo.equals(foo))
+ oldFooCounters.add(count);
+ else if(newFoo.equals(foo))
+ newFooCounters.add(count);
+ }
+
+ synchronized(allOldFooCounters){
+ allOldFooCounters.addAll(oldFooCounters);
+ }
+ synchronized(allNewFooCounters){
+ allNewFooCounters.addAll(newFooCounters);
+ }
+ }
+ };
+
+ Table<Foo> fooTable = db.getTable(Foo.class);
+ fooTable.insert(oldFoo);
+
+ Integer before;
+ Integer after;
+
+ List<Thread> th = run(r, 3);
+
+ Thread.sleep(10);
+
+ //act
+ before = counter.incrementAndGet();
+ fooTable.replace(newFoo);
+ after = counter.incrementAndGet();
+
+ finished.set(true);
+
+ for(Thread t : th){
+ t.join();
+ }
+
+ //ASSERT
+
+ //there may be instances between "before" and "after" where we find both,
+ //but all the instances of "OldFoo" should definitely be before "after"
+ //and all instances of "NewFoo" should definitely be after "before".
+
+ assertThat("All counters in oldFooCounters should be prior to 'after'",
+ allOldFooCounters, Matchers.everyItem(Matchers.lessThan(after)));
+
+ //assert all new foo counters are post-after
+ assertThat("All counters in newFooCounters should be post 'before'",
+ allNewFooCounters, Matchers.everyItem(Matchers.greaterThan(before)));
+
+ }finally{
+ finished.set(true);
+ db.shutdown();
+ }
+ }
+
+ @Test
+ public void HeavyWrite_OneReader_AllReadsInOrder() throws Exception {
+ System.out.println("HeavyWrite_OneReader_AllReadsInOrder");
+
+ final AtomicBoolean finished = new AtomicBoolean(false);
+ final XFlatDatabase db = getDatabase("HeavyWrite_OneReader_AllReadsInOrder");
+
+ db.getConversionService().addConverter(Foo.class, Element.class, new Foo.ToElementConverter());
+ db.getConversionService().addConverter(Element.class, Foo.class, new Foo.FromElementConverter());
+
+ db.Initialize();
+ try{
+ final AtomicInteger counter = new AtomicInteger(0);
+ final AtomicInteger idStarter = new AtomicInteger(0);
+
+ final AtomicInteger numInserts = new AtomicInteger(0);
+
+ final AtomicReference<Exception> lastException = new AtomicReference<>(null);
+
+ Runnable r = new Runnable(){
+ @Override
+ public void run() {
+ try{
+ String id = Integer.toString(idStarter.incrementAndGet());
+
+
+ Table<Foo> fooTable = db.getTable(Foo.class);
+
+ while(!finished.get()){
+
+ Foo foo = new Foo();
+ foo.fooInt = counter.incrementAndGet();
+ //put it in one of 3 ID slots
+ foo.setId(id);
+
+ boolean inserted = fooTable.upsert(foo);
+ if(inserted)
+ numInserts.incrementAndGet();
+ }
+
+ //do one more - this will be the final one
+ Foo foo = new Foo();
+ foo.fooInt = counter.incrementAndGet();
+ //put it in one of 4 ID slots
+ foo.setId(id);
+
+ boolean inserted = fooTable.upsert(foo);
+ if(inserted)
+ numInserts.incrementAndGet();
+
+ }catch(Exception ex){
+ System.err.println(ex.toString());
+ lastException.set(ex);
+ }
+ }
+ };
+
+ Table<Foo> fooTable = db.getTable(Foo.class);
+ List<Integer> foos = new ArrayList<>(4);
+ Integer after, before;
+
+ List<Thread> th = run(r, 3);
+
+ Thread.sleep(10);
+
+
+ before = counter.incrementAndGet();
+ finished.set(true);
+ for(Thread t : th){
+ t.join();
+ }
+
+
+ //ASSERT
+ if(lastException.get() != null){
+ fail(lastException.get().getMessage());
+ }
+
+ foos.add(fooTable.find("1").fooInt);
+ foos.add(fooTable.find("2").fooInt);
+ foos.add(fooTable.find("3").fooInt);
+ after = counter.incrementAndGet();
+
+ assertThat(foos, Matchers.everyItem(Matchers.lessThan(after)));
+ assertThat(foos, Matchers.everyItem(Matchers.greaterThan(before)));
+
+ assertEquals(3, numInserts.get());
+ }
+ finally{
+ finished.set(true);
+ db.shutdown();
+ }
+ }
+
+
+ @Test
+ public void testTransactionalWrites_InOwnThread_MaintainsIsolation() throws Exception {
+ System.out.println("testTransactionalWrites_InOwnThread_MaintainsIsolation");
+
+ final AtomicBoolean finished = new AtomicBoolean(false);
+ final XFlatDatabase db = getDatabase("HeavyWrite_OneReader_AllReadsInOrder");
+
+ db.getConversionService().addConverter(Foo.class, Element.class, new Foo.ToElementConverter());
+ db.getConversionService().addConverter(Element.class, Foo.class, new Foo.FromElementConverter());
+
+ db.Initialize();
+ try{
+ final AtomicInteger counter = new AtomicInteger(0);
+ final AtomicInteger idStarter = new AtomicInteger(0);
+ final AtomicReference<Throwable> lastException = new AtomicReference<>(null);
+
+ final XPathExpression<Object> fooInt = XPathFactory.instance().compile("foo/fooInt");
+
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ int start = idStarter.incrementAndGet();
+
+ try{
+
+ Table<Foo> fooTable = db.getTable(Foo.class);
+
+ Map<String, Foo> insertedFoos = new HashMap<>();
+
+ //should be isolated
+ try(TransactionScope tx = db.getTransactionManager().openTransaction(TransactionOptions.DEFAULT.withPropagation(Propagation.REQUIRES_NEW))){
+
+ int count = start;
+ while((++count % 100) != start){
+ //go around the loop till we get back to start
+
+ Foo foo = new Foo();
+ foo.fooInt = random.get().nextInt();
+ //drop it in the "count" ID. We are definitely
+ //stepping on the toes of another thread here,
+ //but hopefully not at the same time to avoid synclock.
+ foo.setId(Integer.toString(count));
+
+ //ought not to throw DuplicateKeyException
+ fooTable.insert(foo);
+
+ insertedFoos.put(foo.getId(), foo);
+ }
+
+ for(Map.Entry<String, Foo> foos : insertedFoos.entrySet()){
+ //we ought to be able to find the data in the DB
+
+ Foo foo = foos.getValue();
+
+ Foo inDb = fooTable.find(foos.getKey());
+ assertEquals("Unequal foos at ID " + foos.getKey(), foo, inDb);
+ }
+
+ int rand = random.get().nextInt(insertedFoos.size());
+ int cnt = 0;
+ Foo foo = null;
+ for(Foo f : insertedFoos.values()){
+ if(cnt++ == rand){
+ foo = f;
+ break;
+ }
+ }
+
+ int newFooInt = random.get().nextInt();
+ fooTable.update(XPathQuery.eq(fooInt, foo.fooInt), XPathUpdate.set(fooInt, newFooInt));
+ foo = fooTable.find(foo.getId());
+ assertEquals("Should update in TX", newFooInt, foo.fooInt);
+ }
+ }catch(Throwable ex){
+ synchronized(this){
+ System.err.println("Error in thread with start " + start);
+ System.err.println(ex.toString());
+ System.err.println(ex.getStackTrace());
+ }
+ lastException.set(ex);
+ }
+ }
+ };
+
+ //ACT
+ Table<Foo> fooTable = db.getTable(Foo.class);
+ List<Foo> inMainThread = new ArrayList<>();
+
+ List<Thread> threads = run(r, 3);
+
+
+ //should see nothing in DB while these transactions are running
+ inMainThread.add(fooTable.find("1"));
+ inMainThread.add(fooTable.find("2"));
+ inMainThread.add(fooTable.find("3"));
+ inMainThread.add(fooTable.find("4"));
+
+ Foo foo = new Foo();
+ foo.fooInt = 7;
+ fooTable.insert(foo);
+
+ Foo inDb = fooTable.find(foo.getId());
+
+ for(Thread t : threads)
+ t.join();
+
+ //ASSERT
+ if(lastException.get() != null){
+ throw new Exception("Failure while processing", lastException.get());
+ }
+
+ assertThat(inMainThread, Matchers.everyItem(Matchers.nullValue(Foo.class)));
+ assertEquals("Should have retrieved non-transactional data", foo, inDb);
+ assertEquals("Should have retrieved non-transactional data", foo, fooTable.find(foo.getId()));
+
+ }finally{
+ finished.set(true);
+ db.shutdown();
+ }
+ }
+}
View
6 java/XFlat/test/org/xflatdb/xflat/engine/CachedDocumentEngineTest.java
@@ -15,16 +15,14 @@
*/
package org.xflatdb.xflat.engine;
-import org.xflatdb.xflat.engine.CachedDocumentEngine;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
+import org.jdom2.Document;
+import org.jdom2.JDOMException;
import org.xflatdb.xflat.db.EngineBase;
import org.xflatdb.xflat.db.EngineTestsBase;
-import org.xflatdb.xflat.util.DocumentFileWrapper;
import org.xflatdb.xflat.util.FakeDocumentFileWrapper;
-import org.jdom2.Document;
-import org.jdom2.JDOMException;
/**
*
View
7 java/XFlat/test/org/xflatdb/xflat/engine/IdShardedEngineTest.java
@@ -51,6 +51,7 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.jdom2.output.XMLOutputter;
+import org.xflatdb.xflat.db.EngineTransactionManager;
/**
*
@@ -77,6 +78,11 @@ protected ScheduledExecutorService getExecutorService(){
}
@Override
+ protected EngineTransactionManager getEngineTransactionManager(){
+ return ctx.transactionManager;
+ }
+
+ @Override
public TransactionManager getTransactionManager(){
return ctx.transactionManager;
}
@@ -131,7 +137,6 @@ public void writeFile(String fileName, Document doc){
return ret;
}
});
- db.setTransactionManager(ctx.transactionManager);
ctx.additionalContext.put("db", db);
IntervalProvider provider = NumericIntervalProvider.forInteger(1, 100);

0 comments on commit d4eba56

Please sign in to comment.
Something went wrong with that request. Please try again.