Permalink
Browse files

Improved thread-safety in a number of locations, fixed issue with tra…

…nsaction journaling and cleanup.

Signed-off-by: gburgett <gordon.burgett@gmail.com>
  • Loading branch information...
1 parent 40ce0b1 commit 7efb0f95e636616833fbb8c6cd7e954e02ce9ec8 @gburgett committed Feb 24, 2013
@@ -25,7 +25,6 @@
import javax.xml.bind.Unmarshaller;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import org.apache.commons.logging.LogFactory;
@@ -256,30 +255,44 @@ private boolean makeJaxbConverters(Class<?> target)
* @throws JAXBException If an error occurs when creating the marshaller or unmarshaller.
*/
public static <T> void addJAXBConverters(JAXBContext context, Class<T> baseClass, ConversionService registerTo) throws JAXBException{
- Converter<T, Element> marshaller = new JAXBMarshallingConverter(baseClass, context.createMarshaller());
- Converter<Element, T> unmarshaller = new JAXBUnmarshallingConverter<>(baseClass, context.createUnmarshaller());
+ Converter<T, Element> marshaller = new JAXBMarshallingConverter(baseClass, context);
+ Converter<Element, T> unmarshaller = new JAXBUnmarshallingConverter<>(baseClass, context);
registerTo.addConverter(baseClass, Element.class, marshaller);
registerTo.addConverter(Element.class, baseClass, unmarshaller);
}
private static class JAXBMarshallingConverter<T> implements Converter<T, Element>{
XMLInputFactory factory = XMLInputFactory.newFactory();
- Marshaller marshaller;
+
+ ThreadLocal<Marshaller> marshaller;
+ JAXBContext context;
Class<T> clazz;
- public JAXBMarshallingConverter(Class<T> clazz, Marshaller marshaller){
+ public JAXBMarshallingConverter(final Class<T> clazz, final JAXBContext context){
this.clazz = clazz;
- this.marshaller = marshaller;
+ this.marshaller = new ThreadLocal<>();
+ this.context = context;
}
@Override
public Element convert(T source) throws ConversionException {
+
+ Marshaller marshaller = this.marshaller.get();
+ if(marshaller == null){
+ try {
+ this.marshaller.set(marshaller = context.createMarshaller());
+ } catch (JAXBException ex) {
+ throw new ConversionException("Unable to create marshaller for class " + clazz, ex);
+ }
+ }
+
+
try{
Document doc;
JDOMStreamWriter out = new JDOMStreamWriter();
try{
- this.marshaller.marshal(source, out);
+ marshaller.marshal(source, out);
doc = out.getDocument();
}
@@ -298,17 +311,28 @@ public Element convert(T source) throws ConversionException {
private static class JAXBUnmarshallingConverter<T> implements Converter<Element, T>{
org.jdom2.output.XMLOutputter outputter = new XMLOutputter();
- Unmarshaller unmarshaller;
+ ThreadLocal<Unmarshaller> unmarshaller;
+ JAXBContext context;
Class<T> clazz;
- public JAXBUnmarshallingConverter(Class<T> clazz, Unmarshaller unmarshaller){
+ public JAXBUnmarshallingConverter(final Class<T> clazz, final JAXBContext context) throws JAXBException{
this.clazz = clazz;
- this.unmarshaller = unmarshaller;
+ this.unmarshaller = new ThreadLocal<>();
+ this.context = context;
}
@Override
public T convert(Element source) throws ConversionException {
+ Unmarshaller unmarshaller = this.unmarshaller.get();
+ if(unmarshaller == null){
+ try {
+ this.unmarshaller.set(unmarshaller = context.createUnmarshaller());
+ } catch (JAXBException ex) {
+ throw new ConversionException("Unable to create unmarshaller for class " + clazz);
+ }
+ }
+
Document doc = new Document();
doc.setRootElement(source.detach());
@@ -634,7 +634,7 @@ public boolean cleanup(){
//remove the ones we couldn't remove during the iteration
if(toRemove != null && toRemove.size() > 0){
for(RowData data : toRemove){
- rowData.remove(data.commitId);
+ rowData.remove(data.transactionId);
}
}
@@ -278,9 +278,14 @@ public void revert(long txId, boolean isRecovering){
return;
}
- //we will need to revert over all known shards in order to recover.
- for(Interval<T> interval : this.knownShards.keySet()){
- this.getEngine(interval).revert(txId, isRecovering);
+ this.getTableLock();
+ try{
+ //we will need to revert over all known shards in order to recover.
+ for(Interval<T> interval : this.knownShards.keySet()){
+ this.getEngine(interval).revert(txId, isRecovering);
+ }
+ }finally{
+ this.releaseTableLock();
}
}
@@ -296,14 +301,19 @@ protected boolean spinUp() {
else{
//need to scan the directory for existing known shards.
for(File f : directory.listFiles()){
- if(!f.getName().endsWith(".xml")){
+ if(!f.getName().endsWith(".xml") || f.getName().endsWith("config.xml")){
continue;
}
- String shardName = f.getName().substring(0, f.getName().length() - 4);
- Interval<T> i = config.getIntervalProvider().getInterval(shardName);
- if(i != null){
- knownShards.put(i, f);
+ try{
+ String shardName = f.getName().substring(0, f.getName().length() - 4);
+ Interval<T> i = config.getIntervalProvider().getInterval(shardName);
+ if(i != null){
+ knownShards.put(i, f);
+ }
+ }catch(Exception ex){
+ this.log.warn("Error identifying interval for file " + f.getName(), ex);
+ continue;
}
}
}
@@ -171,6 +171,22 @@ public EngineBase provideEngine(){
}
}
+ public void notifyRecoveryComplete(){
+ this.lock.writeLock().lock();
+ try{
+ EngineBase engine = this.engine.get();
+
+ if(engine.getState() == EngineState.SpunUp){
+ //need to give the engine the go-ahead
+ engine.beginOperations();
+ }
+
+ }
+ finally{
+ this.lock.writeLock().unlock();
+ }
+ }
+
private EngineBase makeNewEngine(File file){
//TODO: engines will in the future be configurable & based on a strategy
@@ -244,8 +260,13 @@ else if(state == EngineState.SpinningUp ||
}
//spinUp returns true if this thread successfully spun it up
- if(engine.spinUp())
- engine.beginOperations();
+ if(engine.spinUp()){
+ if(this.db.getState() == XFlatDatabase.DatabaseState.Running){
+ //spin-up could be called when initializing, in which case
+ //the engine needs to be ready to do recovery but not running yet.
+ engine.beginOperations();
+ }
+ }
return engine;
}
@@ -210,7 +210,7 @@ public XFlatDatabase(File directory, ScheduledExecutorService executorService){
* when {@link #shutdown() } is called.
*/
public void Initialize(){
- if(!this.state.compareAndSet(DatabaseState.Uninitialized, DatabaseState.Running)){
+ if(!this.state.compareAndSet(DatabaseState.Uninitialized, DatabaseState.Initializing)){
return;
}
@@ -234,8 +234,11 @@ public void Initialize(){
//recover transactional state if necessary
+
this.transactionManager.recover(this);
+ //done initializing
+ this.state.set(DatabaseState.Running);
}catch(Exception ex){
this.state.set(DatabaseState.Uninitialized);
throw new XFlatException("Initialization error", ex);
@@ -553,6 +556,11 @@ private void loadPojoConverter() throws ClassNotFoundException, InstantiationExc
* called.
*/
Uninitialized,
+ /**
+ * The state when the database is initializing, including potentially
+ * recovering from an unexpected shutdown.
+ */
+ Initializing,
/**
* The state of a database that is running and capable of responding
* to requests.
@@ -562,7 +570,9 @@ private void loadPojoConverter() throws ClassNotFoundException, InstantiationExc
* The state of a database that is either in the process of or has already
* shut down. Requests on this database will throw.
*/
- ShuttingDown
+ ShuttingDown,
+
+
}
}
Oops, something went wrong.

0 comments on commit 7efb0f9

Please sign in to comment.