Permalink
Browse files

Implemented cursor operations in sharded engine, still need to work o…

…ut the bugs in the tests.

Signed-off-by: gburgett <gordon.burgett@gmail.com>
  • Loading branch information...
1 parent f076ed2 commit 1cc52e2ed58dfb541cd669984e68af4bc3c1c159 @gburgett committed Jan 18, 2013
@@ -5,6 +5,7 @@
package org.gburgett.xflat.db;
import java.io.File;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
@@ -27,12 +28,12 @@
*/
public abstract class ShardedEngineBase<T> extends EngineBase {
protected ConcurrentMap<Interval<T>, TableMetadata> openShards = new ConcurrentHashMap<>();
+ protected ConcurrentMap<Interval<T>, File> knownShards = new ConcurrentHashMap<>();
+
//the engines that are spinning down while this engine spins down
private Map<Interval<T>, EngineBase> spinningDownEngines = new HashMap<>();
- private WeakHashMap<Cursor<Interval<T>>, String> openTableCursors = new WeakHashMap<>();
-
private final Object spinDownSyncRoot = new Object();
protected File directory;
@@ -76,11 +77,16 @@ public ShardedEngineBase(File file, String tableName, ShardsetConfig<T> config){
protected Interval<T> getInterval(Object value){
T converted;
- try {
- converted = this.getConversionService().convert(value, this.config.getShardPropertyClass());
- } catch (ConversionException ex) {
- throw new XflatException("Data cannot be sharded: sharding expression " + config.getShardPropertySelector().getExpression() +
- " selected non-convertible value " + value, ex);
+ if(value == null || !this.config.getShardPropertyClass().isAssignableFrom(value.getClass())){
+ try {
+ converted = this.getConversionService().convert(value, this.config.getShardPropertyClass());
+ } catch (ConversionException ex) {
+ throw new XflatException("Data cannot be sharded: sharding expression " + config.getShardPropertySelector().getExpression() +
+ " selected non-convertible value " + value, ex);
+ }
+ }
+ else{
+ converted = (T)value;
}
Interval<T> ret;
@@ -100,6 +106,7 @@ public ShardedEngineBase(File file, String tableName, ShardsetConfig<T> config){
}
private EngineBase getEngine(Interval<T> interval){
+
TableMetadata metadata = openShards.get(interval);
if(metadata == null){
//definitely ensure we aren't spinning down before we start up a new engine
@@ -112,7 +119,10 @@ private EngineBase getEngine(Interval<T> interval){
//build the new metadata element so we can use it to provide engines
String name = this.config.getIntervalProvider().getName(interval);
- metadata = this.getMetadataFactory().makeTableMetadata(name, new File(directory, name + ".xml"));
+ File file = new File(directory, name + ".xml");
+ this.knownShards.put(interval, file);
+
+ metadata = this.getMetadataFactory().makeTableMetadata(name, file);
TableMetadata weWereLate = openShards.putIfAbsent(interval, metadata);
if(weWereLate != null){
//another thread put the new metadata already
@@ -150,7 +160,6 @@ private EngineBase getEngine(Interval<T> interval){
}
}
-
protected void update(){
Iterator<TableMetadata> it = openShards.values().iterator();
while(it.hasNext()){
@@ -170,8 +179,7 @@ protected void update(){
}
}
}
-
-
+
@Override
protected boolean spinUp() {
if(!this.state.compareAndSet(EngineState.Uninitialized, EngineState.SpinningUp)){
@@ -181,6 +189,20 @@ protected boolean spinUp() {
if(!directory.exists()){
directory.mkdirs();
}
+ else{
+ //need to scan the directory for existing known shards.
+ for(File f : directory.listFiles()){
+ if(!f.getName().endsWith(".xml")){
+ continue;
+ }
+
+ String shardName = f.getName().substring(0, f.getName().length() - 4);
+ Interval<T> i = config.getIntervalProvider().getInterval(shardName);
+ if(i != null){
+ knownShards.put(i, f);
+ }
+ }
+ }
//we'll spin up tables as we need them.
this.getExecutorService().scheduleWithFixedDelay(new Runnable(){
@@ -10,7 +10,6 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -26,10 +25,10 @@
import org.gburgett.xflat.EngineStateException;
import org.gburgett.xflat.KeyNotFoundException;
import org.gburgett.xflat.XflatException;
-import org.gburgett.xflat.db.XFlatDatabase;
import org.gburgett.xflat.db.Engine;
import org.gburgett.xflat.db.EngineBase;
import org.gburgett.xflat.db.EngineState;
+import org.gburgett.xflat.db.XFlatDatabase;
import org.gburgett.xflat.query.XpathQuery;
import org.gburgett.xflat.query.XpathUpdate;
import org.gburgett.xflat.util.DocumentFileWrapper;
@@ -46,8 +45,6 @@
private final AtomicBoolean operationsReady = new AtomicBoolean(false);
- private final Object syncRoot = new Object();
-
private ConcurrentMap<String, Element> cache = null;
private DocumentFileWrapper file;
@@ -100,6 +97,7 @@ public Element readRow(String id) {
public Cursor<Element> queryTable(XpathQuery query) {
query.setConversionService(this.getConversionService());
TableCursor ret = new TableCursor(this.cache.values(), query);
+
this.openCursors.put(ret, "");
setLastActivity(System.currentTimeMillis());
@@ -329,7 +327,7 @@ private void ensureReady(){
}
}
- private WeakHashMap<Cursor<Element>, String> openCursors = new WeakHashMap<>();
+ private ConcurrentMap<Cursor<Element>, String> openCursors = new ConcurrentHashMap<>();
@Override
protected boolean spinDown(final SpinDownEventHandler completionEventHandler) {
@@ -423,10 +421,6 @@ private boolean isSpinningDown(){
this.state.get() == EngineState.SpinningDown;
}
-
-
-
-
private AtomicReference<Future<?>> scheduledDump = new AtomicReference<>(null);
private AtomicLong lastDump = new AtomicLong(0);
private AtomicLong lastModified = new AtomicLong(System.currentTimeMillis());
Oops, something went wrong.

0 comments on commit 1cc52e2

Please sign in to comment.