Skip to content
Browse files

Sharding works!!!! Fixes #10

Signed-off-by: gburgett <gordon.burgett@gmail.com>
  • Loading branch information...
1 parent 1cc52e2 commit 88840d07e74affc96ae8008a37f03423da17a618 @gburgett committed Jan 18, 2013
View
12 java/XFlat/src/org/gburgett/xflat/ShardsetConfig.java
@@ -67,13 +67,21 @@ private ShardsetConfig(ShardsetConfig other){
* @return A new shardset config.
*/
public static <U> ShardsetConfig<U> create(XPathExpression<?> xpathProperty, Class<U> propertyClass, IntervalProvider<U> rangeProvider){
+ if(xpathProperty == null){
+ throw new IllegalArgumentException("xpathProperty cannot be null");
+ }
+ if(propertyClass == null){
+ throw new IllegalArgumentException("propertyClass cannot be null");
+ }
+ if(rangeProvider == null){
+ throw new IllegalArgumentException("rangeProvider cannot be null");
+ }
+
ShardsetConfig<U> ret = new ShardsetConfig<>();
ret.shardPropertySelector = xpathProperty;
ret.shardPropertyClass = propertyClass;
ret.intervalProvider = rangeProvider;
-
-
return ret;
}
}
View
8 java/XFlat/src/org/gburgett/xflat/db/ShardedEngineBase.java
@@ -17,6 +17,7 @@
import org.gburgett.xflat.Cursor;
import org.gburgett.xflat.EngineStateException;
import org.gburgett.xflat.ShardsetConfig;
+import org.gburgett.xflat.TableConfig;
import org.gburgett.xflat.XflatException;
import org.gburgett.xflat.convert.ConversionException;
import org.gburgett.xflat.query.Interval;
@@ -123,6 +124,8 @@ private EngineBase getEngine(Interval<T> interval){
this.knownShards.put(interval, file);
metadata = this.getMetadataFactory().makeTableMetadata(name, file);
+ metadata.config = TableConfig.defaultConfig; //not even really used for our purposes
+
TableMetadata weWereLate = openShards.putIfAbsent(interval, metadata);
if(weWereLate != null){
//another thread put the new metadata already
@@ -161,6 +164,8 @@ private EngineBase getEngine(Interval<T> interval){
}
protected void update(){
+
+
Iterator<TableMetadata> it = openShards.values().iterator();
while(it.hasNext()){
TableMetadata table = it.next();
@@ -269,6 +274,9 @@ public void run() {
if(state == EngineState.SpunDown || state == EngineState.Uninitialized){
it.remove();
}
+ else if(state == EngineState.Running){
+ spinningDown.spinDown(null);
+ }
}
//give it a few more ms just in case
}
View
2 java/XFlat/src/org/gburgett/xflat/engine/CachedDocumentEngine.java
@@ -359,7 +359,7 @@ public void run() {
}, 0, TimeUnit.MILLISECONDS));
}
- if(openCursors.isEmpty() && cacheDumpTask.get() == null || cacheDumpTask.get().isDone()){
+ if(openCursors.isEmpty() && (cacheDumpTask.get() == null || cacheDumpTask.get().isDone())){
this.state.set(EngineState.SpunDown);
if(log.isTraceEnabled())
View
18 java/XFlat/src/org/gburgett/xflat/engine/DefaultEngineFactory.java
@@ -6,18 +6,32 @@
import org.gburgett.xflat.TableConfig;
import java.io.File;
+import org.gburgett.xflat.XflatException;
import org.gburgett.xflat.db.EngineBase;
import org.gburgett.xflat.db.EngineFactory;
-import org.gburgett.xflat.engine.CachedDocumentEngine;
+import org.gburgett.xflat.query.XpathQuery;
+import org.gburgett.xflat.util.XPathExpressionEqualityMatcher;
+import org.hamcrest.Matcher;
+import org.jdom2.xpath.XPathExpression;
/**
- *
+ * The default engine factory, which chooses from among the engines available to
+ * the core of XFlat.
* @author Gordon
*/
public class DefaultEngineFactory implements EngineFactory {
+ private Matcher idPropertyMatcher = new XPathExpressionEqualityMatcher(XpathQuery.Id);
+
@Override
public EngineBase newEngine(File file, String tableName, TableConfig config) {
+ if(config.getShardsetConfig() != null){
+ if(idPropertyMatcher.matches(config.getShardsetConfig().getShardPropertySelector())){
+ return new IdShardedEngine(file, tableName, config.getShardsetConfig());
+ }
+ throw new XflatException("Tables sharded on other values than Id are not supported");
+ }
+
return new CachedDocumentEngine(file, tableName);
}
View
2 java/XFlat/src/org/gburgett/xflat/engine/IdShardedEngine.java
@@ -122,7 +122,7 @@ public Element act(Engine engine) {
//we need a cursor that will cross multiple shards.
Cursor<Element> ret = new CrossShardQueryCursor(query, shardIntervals);
//remember it so we don't spin down while it's open
- this.crossShardQueries.put(ret, null);
+ this.crossShardQueries.put(ret, "");
return ret;
}
View
33 java/XFlat/src/org/gburgett/xflat/query/Interval.java
@@ -134,6 +134,28 @@ public boolean contains(T value, Comparator<T> comparator) {
return ret;
}
+ static <U> int compareEndBegin(Interval<U> val1, Interval<U> val2, Comparator<U> itemComparer){
+ if(val1.end == null){
+ //val2 begin cannot be +∞, so val1 end > val2 begin
+ return 1;
+ }
+
+ if(val2.begin == null){
+ //val1 end cannot be -∞, so val1 end > val2 begin
+ return 1;
+ }
+
+ int ret = itemComparer.compare(val1.end, val2.begin);
+ if(ret == 0){
+ if(val1.endInclusive && val2.beginInclusive){
+ return 0;
+ }
+ //else val1 is lower than val2
+ return -1;
+ }
+ return ret;
+ }
+
private static int compareExclusivity(boolean val1Inclusive, boolean val2Inclusive){
if(val1Inclusive){
//if val2 is exclusive of beginning, val 1 is less
@@ -144,6 +166,7 @@ private static int compareExclusivity(boolean val1Inclusive, boolean val2Inclusi
}
+
/**
* Returns true iff the given interval intersects this interval, according
* to the given comparator.
@@ -155,25 +178,19 @@ public boolean intersects(Interval<T> other, Comparator<T> comparator){
int compare = compareBegin(this, other, comparator);
if(compare <= 0){
- compare = comparator.compare(this.end, other.begin);
+ compare = compareEndBegin(this, other, comparator);
if(compare < 0){
return false;
}
- else if(compare == 0){
- return this.endInclusive && other.beginInclusive;
- }
else{
return true;
}
}
else {
- compare = comparator.compare(other.end, this.begin);
+ compare = compareEndBegin(other, this, comparator);
if(compare < 0){
return false;
}
- else if(compare == 0){
- return this.beginInclusive && other.endInclusive;
- }
else{
return true;
}
View
55 java/XFlat/src/org/gburgett/xflat/util/XPathExpressionEqualityMatcher.java
@@ -39,7 +39,7 @@ public XPathExpressionEqualityMatcher(XPathExpression<U> toMatch) {
this.toMatch = toMatch;
}
- private List<String> tokenizeExpression(XPathExpression<U> expression){
+ private static List<String> tokenizeExpression(XPathExpression<?> expression){
String exp = expression.getExpression();
List<String> ret = new ArrayList<>();
@@ -66,36 +66,61 @@ public XPathExpressionEqualityMatcher(XPathExpression<U> toMatch) {
@Override
protected boolean matchesSafely(XPathExpression<U> item) {
- if (toMatch == null) {
- return item == null;
+ if (item == null) {
+ return toMatch == null;
+ }
+ if(toMatch == null){
+ return false;
}
if(myExpTokens == null){
myExpTokens = tokenizeExpression(toMatch);
}
- List<String> itemTokens = tokenizeExpression(item);
+
+ return equals(toMatch, myExpTokens, item, tokenizeExpression(item));
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ if (toMatch == null) {
+ description.appendText("null XPath expression");
+ return;
+ }
+ description.appendText("XPath expression equal to ").appendText(toMatch.getExpression());
+ }
+
+ private static boolean equals(XPathExpression<?> left, List<String> leftSideTokens, XPathExpression<?> right, List<String> rightSideTokens){
+
- if(itemTokens.size() != myExpTokens.size()){
+ if(rightSideTokens.size() != leftSideTokens.size()){
return false;
}
- for(int i = 0; i < myExpTokens.size(); i++){
- if(!myExpTokens.get(i).equals(itemTokens.get(i))){
+ for(int i = 0; i < leftSideTokens.size(); i++){
+ if(!leftSideTokens.get(i).equals(rightSideTokens.get(i))){
return false;
}
}
return true;
}
-
- @Override
- public void describeTo(Description description) {
- if (toMatch == null) {
- description.appendText("null XPath expression");
- return;
+
+ /**
+ * Compares two XPath expressions for equality by tokenizing their expressions
+ * and expanding namespace prefixes.
+ * @param left
+ * @param right
+ * @return true iff the two expressions are equal.
+ */
+ public static boolean equals(XPathExpression<?> left, XPathExpression<?> right){
+ if (left == null) {
+ return right == null;
}
- description.appendText("XPath expression equal to ").appendText(toMatch.getExpression());
+ if(right == null){
+ return false;
+ }
+
+ return equals(left, tokenizeExpression(left), right, tokenizeExpression(right));
}
-
}
View
3 java/XFlat/test/org/gburgett/xflat/db/EngineTestsBase.java
@@ -1256,10 +1256,11 @@ public void describeTo(Description description) {
public long id;
- public final Map<String, Object> additionalContext = new HashMap<>();
+ public final Map<String, Object> additionalContext;
public TestContext(){
this.id = Thread.currentThread().getId();
+ additionalContext = new HashMap<>();
}
}
}
View
137 java/XFlat/test/org/gburgett/xflat/engine/IdShardedEngineIntegrationTests.java
@@ -0,0 +1,137 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.gburgett.xflat.engine;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.gburgett.xflat.ShardsetConfig;
+import org.gburgett.xflat.Table;
+import org.gburgett.xflat.TableConfig;
+import org.gburgett.xflat.db.IntegerIdGenerator;
+import org.gburgett.xflat.db.XFlatDatabase;
+import org.gburgett.xflat.query.NumericIntervalProvider;
+import org.gburgett.xflat.query.XpathQuery;
+import org.jdom2.Element;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import test.Foo;
+import test.Utils;
+import static org.junit.Assert.*;
+
+/**
+ *
+ * @author Gordon
+ */
+public class IdShardedEngineIntegrationTests {
+ static File workspace = new File(new File("DbIntegrationTests"), "IdShardedEngineIntegrationTests");
+
+ static String tbl = "table";
+
+ @BeforeClass
+ public static void setUpClass(){
+ if(workspace.exists()){
+ Utils.deleteDir(workspace);
+ }
+ }
+
+ private XFlatDatabase getDatabase(String testName){
+ File dbDir = new File(workspace, testName);
+ XFlatDatabase ret = new XFlatDatabase(dbDir);
+
+ ret.configureTable(tbl, TableConfig.defaultConfig
+ .setIdGenerator(IntegerIdGenerator.class)
+ .sharded(ShardsetConfig.create(XpathQuery.Id, Integer.class, NumericIntervalProvider.forInteger(2, 100))));
+
+ ret.getConversionService().addConverter(Foo.class, Element.class, new Foo.ToElementConverter());
+ ret.getConversionService().addConverter(Element.class, Foo.class, new Foo.FromElementConverter());
+
+ return ret;
+ }
+
+
+ @Test
+ public void testInsertRetrieve_SingleShard_OneFileCreated() throws Exception {
+ String testName = "testInsertRetrieve_SingleShard_OneFileCreated";
+ System.out.println(testName);
+
+ XFlatDatabase db = getDatabase(testName);
+
+ db.Initialize();
+
+ Table<Foo> table = db.getTable(Foo.class, this.tbl);
+
+ Foo foo = new Foo();
+ foo.fooInt = 1;
+
+ table.insert(foo);
+
+ Foo foo2 = table.find(foo.getId());
+
+ assertEquals("should retrieve same data", foo, foo2);
+
+ db.shutdown();
+
+ File shardDir = new File(new File(workspace, testName), this.tbl + ".xml");
+ assertTrue("shard directory should exist", shardDir.exists());
+ assertTrue("shard directory should be a directory", shardDir.isDirectory());
+
+ File[] shards = shardDir.listFiles();
+ assertEquals("should be one shard", 1, shards.length);
+ assertTrue("Should be named after the range of data", new File(shardDir, "-98.xml").exists());
+ }
+
+ @Test
+ public void testInsertRetrieve_MultipleShards_MultipleFilesCreated() throws Exception {
+ String testName = "testInsertRetrieve_MultipleShards_MultipleFilesCreated";
+ System.out.println(testName);
+
+ XFlatDatabase db = getDatabase(testName);
+
+ db.Initialize();
+
+ Table<Foo> table = db.getTable(Foo.class, this.tbl);
+
+ Foo foo = new Foo();
+ foo.fooInt = 1;
+ table.insert(foo);
+
+ foo = new Foo();
+ foo.fooInt = 2;
+ table.insert(foo);
+
+ foo = new Foo();
+ foo.fooInt = 3;
+ table.insert(foo);
+
+ List<Foo> fooList = table.findAll(XpathQuery.gt(XpathQuery.Id, 1));
+
+ //can't trust ordering of a query
+ Collections.sort(fooList, new Comparator<Foo>(){
+ @Override
+ public int compare(Foo o1, Foo o2) {
+ return o1.getId().compareTo(o2.getId());
+ }
+ });
+
+ assertEquals("Should get 2 foos", 2, fooList.size());
+ assertEquals("should retrieve same data", 2, fooList.get(0).fooInt);
+ assertEquals("should retrieve same data", 3, fooList.get(1).fooInt);
+
+
+ db.shutdown();
+
+ File shardDir = new File(new File(workspace, testName), this.tbl + ".xml");
+ assertTrue("shard directory should exist", shardDir.exists());
+ assertTrue("shard directory should be a directory", shardDir.isDirectory());
+
+ File[] shards = shardDir.listFiles();
+ assertEquals("should be two shards", 2, shards.length);
+ assertTrue("Should be named after the range of data", new File(shardDir, "-98.xml").exists());
+ assertTrue("Should be named after the range of data", new File(shardDir, "2.xml").exists());
+ }
+
+}
View
70 java/XFlat/test/org/gburgett/xflat/engine/IdShardedEngineTest.java
@@ -11,6 +11,8 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.gburgett.xflat.ShardsetConfig;
import org.gburgett.xflat.TableConfig;
import org.gburgett.xflat.convert.ConversionService;
@@ -30,18 +32,27 @@
import org.jdom2.JDOMException;
import org.jdom2.xpath.XPathExpression;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.jdom2.output.XMLOutputter;
/**
*
* @author gordon
*/
public class IdShardedEngineTest extends ShardedEngineTestsBase<IdShardedEngine> {
+ Log log = LogFactory.getLog(getClass());
+
String name = "IdShardedEngineTest";
+
+ XMLOutputter outputter = new XMLOutputter();
+
@Override
protected IdShardedEngine createInstance(TestContext ctx) {
- final Map<File, Document> docs = new ConcurrentHashMap<>();
+ final Map<String, Document> docs = new ConcurrentHashMap<>();
ctx.additionalContext.put("docs", docs);
XFlatDatabase db = new XFlatDatabase(workspace, executorService);
@@ -62,12 +73,20 @@ public EngineBase newEngine(final File file, String tableName, TableConfig confi
DocumentFileWrapper wrapper = new DocumentFileWrapper(file){
@Override
public Document readFile(){
- return docs.get(file);
+ return docs.get(file.getName());
}
@Override
public void writeFile(Document doc){
- docs.put(file, doc);
+ log.debug("writing file " + file.getName());
+ if(log.isTraceEnabled())
+ log.trace(outputter.outputString(doc));
+ docs.put(file.getName(), doc);
+ }
+
+ @Override
+ public boolean exists(){
+ return docs.containsKey(file.getName());
}
@Override
@@ -91,15 +110,32 @@ public void writeFile(String fileName, Document doc){
ctx.additionalContext.put("rangeProvider", provider);
ShardsetConfig cfg = ShardsetConfig.create(XpathQuery.Id, Integer.class, provider);
- File file = new File(ctx.workspace, name);
+ File file = spy(new File(ctx.workspace, name));
+ when(file.exists()).thenReturn(true);
+ when(file.isDirectory()).thenReturn(true);
+ when(file.listFiles())
+ .then(new Answer<File[]>(){
+
+ @Override
+ public File[] answer(InvocationOnMock invocation) throws Throwable {
+ File[] ret = new File[docs.size()];
+ int i = 0;
+ for(String name : docs.keySet()){
+ ret[i++] = new File(name);
+ }
+ return ret;
+ }
+ });
+
+
IdShardedEngine ret = new IdShardedEngine(file, name, cfg);
setMetadataFactory(ret, new TableMetadataFactory(db, file));
return ret;
}
@Override
protected void prepFileContents(TestContext ctx, Document contents) throws IOException {
- Map<File, Document> docs = (Map<File, Document>)ctx.additionalContext.get("docs");
+ Map<String, Document> docs = (Map<String, Document>)ctx.additionalContext.get("docs");
IntervalProvider<Integer> provider = (IntervalProvider<Integer>)ctx.additionalContext.get("rangeProvider");
if(contents == null){
@@ -127,40 +163,48 @@ protected void prepFileContents(TestContext ctx, Document contents) throws IOExc
//put the sharded documents in the docs collection
for(Map.Entry<Interval<Integer>, Document> doc : files.entrySet()){
File f = new File(provider.getName(doc.getKey()) + ".xml");
- docs.put(f, doc.getValue());
+ docs.put(f.getName(), doc.getValue());
}
}
@Override
protected Document getFileContents(TestContext ctx) throws IOException, JDOMException {
- Map<File, Document> docs = (Map<File, Document>)ctx.additionalContext.get("docs");
+ log.debug("getting file contents");
+
+ Map<String, Document> docs = (Map<String, Document>)ctx.additionalContext.get("docs");
IntervalProvider<Integer> provider = (IntervalProvider<Integer>)ctx.additionalContext.get("rangeProvider");
Document ret = new Document();
ret.setRootElement(new Element("db", XFlatDatabase.xFlatNs));
SortedMap<Integer, Document> sortedDocs = new TreeMap<>();
//will be spread across multiple documents, sort them by ID
- for(Map.Entry<File, Document> doc : docs.entrySet()){
+ for(Map.Entry<String, Document> doc : docs.entrySet()){
+ if(log.isTraceEnabled())
+ log.trace(outputter.outputString(doc.getValue()));
+
Integer i = Integer.parseInt(getShardNameFromFile(doc.getKey()));
sortedDocs.put(i, doc.getValue());
}
//each range is now in order, add all the rows from each document
for(Document d : sortedDocs.values()){
for(Element e : d.getRootElement().getChildren("row", XFlatDatabase.xFlatNs)){
- ret.getRootElement().addContent(e.detach());
+ ret.getRootElement().addContent(e.clone());
}
}
+ if(log.isTraceEnabled())
+ log.trace(outputter.outputString(ret));
+
return ret;
}
- private String getShardNameFromFile(File file){
- if(!file.getName().endsWith(".xml"))
- throw new RuntimeException("invalid file name " + file.getName());
+ private String getShardNameFromFile(String file){
+ if(!file.endsWith(".xml"))
+ throw new RuntimeException("invalid file name " + file);
- return file.getName().substring(0, file.getName().length() - 4);
+ return file.substring(0, file.length() - 4);
}
}

0 comments on commit 88840d0

Please sign in to comment.
Something went wrong with that request. Please try again.