Skip to content

Commit

Permalink
InMemory and MapDB connectors now implement AutoCloseable and close c…
Browse files Browse the repository at this point in the history
…onnections on JVM shutdown. A close() method is added in the Trainable interface, responsible for releasing algorithm's resources. Removed modifiesData() method from trainable, no algorithm should modify the Dataset internally (if it does it should copy the dataset). Update tests to close() the instances.
  • Loading branch information
datumbox committed Apr 25, 2015
1 parent 359e411 commit 44fb658
Show file tree
Hide file tree
Showing 39 changed files with 197 additions and 65 deletions.
3 changes: 0 additions & 3 deletions TODO.txt
@@ -1,9 +1,6 @@
CODE IMPROVEMENTS CODE IMPROVEMENTS
================= =================


- Do we need close() in models and dataset etc?
- Make sure the connector is closed automatically if not closed manually?

- Improve Serialization by setting the serialVersionUID in every serializable class? - Improve Serialization by setting the serialVersionUID in every serializable class?
- Create better Exceptions and Exception messages. - Create better Exceptions and Exception messages.
- Add multithreading support. - Add multithreading support.
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/datumbox/applications/nlp/CETR.java
Expand Up @@ -217,6 +217,7 @@ private void performClustering(Dataset dataset, int numberOfClusters) {
//Map<Integer, BaseMLclusterer.Cluster> clusters = instance.getClusters(); //Map<Integer, BaseMLclusterer.Cluster> clusters = instance.getClusters();


instance.erase(); //erase immediately the result instance.erase(); //erase immediately the result
instance = null;
} }


private List<Double> calculateTTRlist(List<String> rows) { private List<Double> calculateTTRlist(List<String> rows) {
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/com/datumbox/common/dataobjects/Dataset.java
Expand Up @@ -498,10 +498,7 @@ public void erase() {
dbc.dropBigMap("tmp_xColumnTypes", xDataTypes); dbc.dropBigMap("tmp_xColumnTypes", xDataTypes);
dbc.dropBigMap("tmp_recordList", recordList); dbc.dropBigMap("tmp_recordList", recordList);
dbc.dropDatabase(); dbc.dropDatabase();

dbc.close();
dbName = null;
dbc = null;
dbConf = null;


//Ensures that the Dataset can't be used after erase() is called. //Ensures that the Dataset can't be used after erase() is called.
yDataType = null; yDataType = null;
Expand Down
12 changes: 5 additions & 7 deletions src/main/java/com/datumbox/common/objecttypes/Trainable.java
Expand Up @@ -48,16 +48,14 @@ public interface Trainable<MP extends Learnable, TP extends Parameterizable> {
* @param trainingParameters * @param trainingParameters
*/ */
public void fit(Dataset trainingData, TP trainingParameters); public void fit(Dataset trainingData, TP trainingParameters);

/**
* Returns whether the algorithm modifies the provided data.
*
* @return
*/
public boolean modifiesData();


/** /**
* Deletes the database of the algorithm. * Deletes the database of the algorithm.
*/ */
public void erase(); public void erase();

/**
* Closes all the resources of the algorithm.
*/
public void close();
} }
@@ -0,0 +1,70 @@
/**
* Copyright (C) 2013-2015 Vasilis Vryniotis <bbriniotis@datumbox.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datumbox.common.persistentstorage;

import com.datumbox.common.persistentstorage.interfaces.DatabaseConnector;

/**
* Any class that inherits from the abstract AutoCloseConnector class can be used
* in a try-with-resources statement block. Moreover this class setups a shutdown
* hook which ensures that the Connector will automatically call close() before the
* JVM is terminated.
*
* @author Vasilis Vryniotis <bbriniotis@datumbox.com>
*/
public abstract class AutoCloseConnector implements DatabaseConnector, AutoCloseable {

private boolean isClosed = false;

/**
* Protected Constructor which is responsible for adding the Shutdown hook.
*/
protected AutoCloseConnector() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
AutoCloseConnector.this.close();
}
});
}

/**
* Checks if the connector is closed.
*
* @return
*/
@Override
public boolean isClosed() {
return isClosed;
}

/**
* Marks the connector as closed.
*/
@Override
public void close() {
isClosed = true;
}

/**
* Ensures the connection is not closed.
*/
protected void ensureNotClosed() {
if(isClosed) {
throw new RuntimeException("The connector is already closed");
}
}
}
Expand Up @@ -15,6 +15,7 @@
*/ */
package com.datumbox.common.persistentstorage.inmemory; package com.datumbox.common.persistentstorage.inmemory;


import com.datumbox.common.persistentstorage.AutoCloseConnector;
import com.datumbox.common.persistentstorage.interfaces.DatabaseConnector; import com.datumbox.common.persistentstorage.interfaces.DatabaseConnector;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
Expand All @@ -37,7 +38,7 @@
* *
* @author Vasilis Vryniotis <bbriniotis@datumbox.com> * @author Vasilis Vryniotis <bbriniotis@datumbox.com>
*/ */
public class InMemoryConnector implements DatabaseConnector { public class InMemoryConnector extends AutoCloseConnector {


private final InMemoryConfiguration dbConf; private final InMemoryConfiguration dbConf;
private final String database; private final String database;
Expand All @@ -50,6 +51,7 @@ public class InMemoryConnector implements DatabaseConnector {
* @param dbConf * @param dbConf
*/ */
protected InMemoryConnector(String database, InMemoryConfiguration dbConf) { protected InMemoryConnector(String database, InMemoryConfiguration dbConf) {
super();
this.dbConf = dbConf; this.dbConf = dbConf;
this.database = database; this.database = database;
} }
Expand All @@ -64,6 +66,7 @@ protected InMemoryConnector(String database, InMemoryConfiguration dbConf) {
*/ */
@Override @Override
public <T extends Serializable> void save(String name, T serializableObject) { public <T extends Serializable> void save(String name, T serializableObject) {
ensureNotClosed();
try { try {
Files.write(getDefaultPath(), DeepCopy.serialize(serializableObject)); Files.write(getDefaultPath(), DeepCopy.serialize(serializableObject));
} }
Expand All @@ -83,6 +86,7 @@ public <T extends Serializable> void save(String name, T serializableObject) {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T extends Serializable> T load(String name, Class<T> klass) { public <T extends Serializable> T load(String name, Class<T> klass) {
ensureNotClosed();
try { try {
//read the stored serialized object //read the stored serialized object
T serializableObject = (T)DeepCopy.deserialize(Files.readAllBytes(getDefaultPath())); T serializableObject = (T)DeepCopy.deserialize(Files.readAllBytes(getDefaultPath()));
Expand All @@ -101,7 +105,10 @@ public <T extends Serializable> T load(String name, Class<T> klass) {
*/ */
@Override @Override
public void close() { public void close() {
//nothing to do if(isClosed()){
return;
}
super.close();
} }


/** /**
Expand All @@ -111,6 +118,7 @@ public void close() {
*/ */
@Override @Override
public boolean existsDatabase() { public boolean existsDatabase() {
ensureNotClosed();
return Files.exists(getDefaultPath()); return Files.exists(getDefaultPath());
} }


Expand All @@ -119,6 +127,7 @@ public boolean existsDatabase() {
*/ */
@Override @Override
public void dropDatabase() { public void dropDatabase() {
ensureNotClosed();
if(!existsDatabase()) { if(!existsDatabase()) {
return; return;
} }
Expand All @@ -143,6 +152,7 @@ public void dropDatabase() {
*/ */
@Override @Override
public <K,V> Map<K,V> getBigMap(String name, boolean isTemporary) { public <K,V> Map<K,V> getBigMap(String name, boolean isTemporary) {
ensureNotClosed();
return new HashMap<>(); return new HashMap<>();
} }


Expand All @@ -155,6 +165,7 @@ public <K,V> Map<K,V> getBigMap(String name, boolean isTemporary) {
*/ */
@Override @Override
public <T extends Map> void dropBigMap(String name, T map) { public <T extends Map> void dropBigMap(String name, T map) {
ensureNotClosed();
map.clear(); map.clear();
} }


Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package com.datumbox.common.persistentstorage.interfaces; package com.datumbox.common.persistentstorage.interfaces;


import java.io.Closeable;
import java.io.Serializable; import java.io.Serializable;
import java.util.Map; import java.util.Map;


Expand All @@ -27,7 +26,7 @@
* *
* @author Vasilis Vryniotis <bbriniotis@datumbox.com> * @author Vasilis Vryniotis <bbriniotis@datumbox.com>
*/ */
public interface DatabaseConnector extends Closeable { public interface DatabaseConnector {


/** /**
* This method is responsible for storing serializable objects in the * This method is responsible for storing serializable objects in the
Expand All @@ -52,9 +51,15 @@ public interface DatabaseConnector extends Closeable {
/** /**
* Closes the connection and clean ups the resources. * Closes the connection and clean ups the resources.
*/ */
@Override
public void close(); public void close();


/**
* Checks if the connector is closed.
*
* @return
*/
public boolean isClosed();

/** /**
* Checks if a particular database exists. * Checks if a particular database exists.
* *
Expand Down
Expand Up @@ -15,7 +15,7 @@
*/ */
package com.datumbox.common.persistentstorage.mapdb; package com.datumbox.common.persistentstorage.mapdb;


import com.datumbox.common.persistentstorage.interfaces.DatabaseConnector; import com.datumbox.common.persistentstorage.AutoCloseConnector;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.FileSystems; import java.nio.file.FileSystems;
Expand All @@ -39,7 +39,7 @@
* *
* @author Vasilis Vryniotis <bbriniotis@datumbox.com> * @author Vasilis Vryniotis <bbriniotis@datumbox.com>
*/ */
public class MapDBConnector implements DatabaseConnector { public class MapDBConnector extends AutoCloseConnector {


/** /**
* Enum class which stores the Database Type used for every collection. * Enum class which stores the Database Type used for every collection.
Expand All @@ -66,6 +66,7 @@ private enum DatabaseType {
* @param dbConf * @param dbConf
*/ */
protected MapDBConnector(String database, MapDBConfiguration dbConf) { protected MapDBConnector(String database, MapDBConfiguration dbConf) {
super();
this.dbConf = dbConf; this.dbConf = dbConf;
this.database = database; this.database = database;
} }
Expand All @@ -80,6 +81,7 @@ protected MapDBConnector(String database, MapDBConfiguration dbConf) {
*/ */
@Override @Override
public <T extends Serializable> void save(String name, T serializableObject) { public <T extends Serializable> void save(String name, T serializableObject) {
ensureNotClosed();
openDB(DatabaseType.DEFAULT_DB); openDB(DatabaseType.DEFAULT_DB);
DB db = dbRegistry.get(DatabaseType.DEFAULT_DB); DB db = dbRegistry.get(DatabaseType.DEFAULT_DB);
Atomic.Var<T> knowledgeBaseVar = db.getAtomicVar(name); Atomic.Var<T> knowledgeBaseVar = db.getAtomicVar(name);
Expand All @@ -99,6 +101,7 @@ public <T extends Serializable> void save(String name, T serializableObject) {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T extends Serializable> T load(String name, Class<T> klass) { public <T extends Serializable> T load(String name, Class<T> klass) {
ensureNotClosed();
openDB(DatabaseType.DEFAULT_DB); openDB(DatabaseType.DEFAULT_DB);
DB db = dbRegistry.get(DatabaseType.DEFAULT_DB); DB db = dbRegistry.get(DatabaseType.DEFAULT_DB);
Atomic.Var<T> atomicVar = db.getAtomicVar(name); Atomic.Var<T> atomicVar = db.getAtomicVar(name);
Expand All @@ -110,12 +113,11 @@ public <T extends Serializable> T load(String name, Class<T> klass) {
*/ */
@Override @Override
public void close() { public void close() {
//close all dbs stored in dbRegistry if(isClosed()){
for(DB db : dbRegistry.values()) { return;
if(isOpenDB(db)) {
db.close();
}
} }
super.close();
closeAllDBs();
} }


/** /**
Expand All @@ -125,6 +127,7 @@ public void close() {
*/ */
@Override @Override
public boolean existsDatabase() { public boolean existsDatabase() {
ensureNotClosed();
if(Files.exists(getDefaultPath())) { if(Files.exists(getDefaultPath())) {
return true; return true;
} }
Expand All @@ -143,13 +146,14 @@ public boolean existsDatabase() {
*/ */
@Override @Override
public void dropDatabase() { public void dropDatabase() {
ensureNotClosed();
if(!existsDatabase()) { if(!existsDatabase()) {
return; return;
} }


try { closeAllDBs();
close();


try {
dbRegistry.clear(); dbRegistry.clear();
Files.deleteIfExists(getDefaultPath()); Files.deleteIfExists(getDefaultPath());
Files.deleteIfExists(Paths.get(getDefaultPath().toString()+".p")); Files.deleteIfExists(Paths.get(getDefaultPath().toString()+".p"));
Expand All @@ -172,6 +176,7 @@ public void dropDatabase() {
*/ */
@Override @Override
public <K,V> Map<K,V> getBigMap(String name, boolean isTemporary) { public <K,V> Map<K,V> getBigMap(String name, boolean isTemporary) {
ensureNotClosed();
validateName(name, isTemporary); validateName(name, isTemporary);


DatabaseType dbType = isTemporary?DatabaseType.TEMP_DB:DatabaseType.DEFAULT_DB; DatabaseType dbType = isTemporary?DatabaseType.TEMP_DB:DatabaseType.DEFAULT_DB;
Expand All @@ -191,6 +196,7 @@ public <K,V> Map<K,V> getBigMap(String name, boolean isTemporary) {
*/ */
@Override @Override
public <T extends Map> void dropBigMap(String name, T map) { public <T extends Map> void dropBigMap(String name, T map) {
ensureNotClosed();
boolean isTemporary = existsInDB(dbRegistry.get(DatabaseType.TEMP_DB), name); boolean isTemporary = existsInDB(dbRegistry.get(DatabaseType.TEMP_DB), name);


DatabaseType dbType = isTemporary?DatabaseType.TEMP_DB:DatabaseType.DEFAULT_DB; DatabaseType dbType = isTemporary?DatabaseType.TEMP_DB:DatabaseType.DEFAULT_DB;
Expand All @@ -203,6 +209,15 @@ public <T extends Map> void dropBigMap(String name, T map) {


//private methods of connector class //private methods of connector class


private void closeAllDBs() {
//close all dbs stored in dbRegistry
for(DB db : dbRegistry.values()) {
if(isOpenDB(db)) {
db.close();
}
}
}

private boolean isOpenDB(DB db) { private boolean isOpenDB(DB db) {
return !(db == null || db.isClosed()); return !(db == null || db.isClosed());
} }
Expand Down
Expand Up @@ -82,6 +82,11 @@ public void erase() {
knowledgeBase.erase(); knowledgeBase.erase();
} }


@Override
public void close() {
knowledgeBase.close();
}

@Override @Override
public MP getModelParameters() { public MP getModelParameters() {
return knowledgeBase.getModelParameters(); return knowledgeBase.getModelParameters();
Expand All @@ -100,12 +105,6 @@ public void fit(Dataset trainingData, TP trainingParameters) {
knowledgeBase.save(); knowledgeBase.save();
} }


@Override
public boolean modifiesData() {
//If the algorithm modifies the data it should override this method
return false;
}

protected void initializeTrainingConfiguration(TP trainingParameters) { protected void initializeTrainingConfiguration(TP trainingParameters) {
//reset knowledge base //reset knowledge base
knowledgeBase.reinitialize(); knowledgeBase.reinitialize();
Expand Down

0 comments on commit 44fb658

Please sign in to comment.