diff --git a/mongodb/README.md b/mongodb/README.md index 63d0158aa6..8f10c65795 100644 --- a/mongodb/README.md +++ b/mongodb/README.md @@ -40,3 +40,5 @@ See the next section for the list of configuration parameters for MongoDB. ### `mongodb.database` (default: `ycsb`) ### `mongodb.writeConcern` (default `safe`) + +### `mongodb.maxconnections` (default `10`) diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java index 7c8df19af4..3082e4c9ae 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java @@ -11,10 +11,11 @@ import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.Map; import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; import com.mongodb.BasicDBObject; import com.mongodb.DBAddress; @@ -22,102 +23,133 @@ import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.Mongo; +import com.mongodb.MongoOptions; import com.mongodb.WriteConcern; import com.mongodb.WriteResult; - +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; -import com.yahoo.ycsb.ByteIterator; -import com.yahoo.ycsb.StringByteIterator; /** * MongoDB client for YCSB framework. - * + * * Properties to set: - * - * mongodb.url=mongodb://localhost:27017 - * mongodb.database=ycsb + * + * mongodb.url=mongodb://localhost:27017 mongodb.database=ycsb * mongodb.writeConcern=normal - * + * * @author ypai - * */ public class MongoDbClient extends DB { - private Mongo mongo; - private WriteConcern writeConcern; - private String database; + /** Used to include a field in a response. */ + protected static final Integer INCLUDE = Integer.valueOf(1); + + /** A singleton Mongo instance. */ + private static Mongo mongo; + + /** The default write concern for the test. */ + private static WriteConcern writeConcern; + + /** The database to access. */ + private static String database; + + /** Count the number of times initialized to teardown on the last {@link #cleanup()}. */ + private static final AtomicInteger initCount = new AtomicInteger(0); - @Override /** * Initialize any state for this DB. * Called once per DB instance; there is one DB instance per client thread. */ + @Override public void init() throws DBException { - // initialize MongoDb driver - Properties props = getProperties(); - String url = props.getProperty("mongodb.url", "mongodb://localhost:27017"); - database = props.getProperty("mongodb.database", "ycsb"); - String writeConcernType = props.getProperty("mongodb.writeConcern", "safe").toLowerCase(); - - if ("none".equals(writeConcernType)) { - writeConcern = WriteConcern.NONE; - } else if ("safe".equals(writeConcernType)) { - writeConcern = WriteConcern.SAFE; - } else if ("normal".equals(writeConcernType)) { - writeConcern = WriteConcern.NORMAL; - } else if ("fsync_safe".equals(writeConcernType)) { - writeConcern = WriteConcern.FSYNC_SAFE; - } else if ("replicas_safe".equals(writeConcernType)) { - writeConcern = WriteConcern.REPLICAS_SAFE; - } else { - System.err.println("ERROR: Invalid writeConcern: '" + writeConcernType + "'. " + - "Must be [ none | safe | normal | fsync_safe | replicas_safe ]"); - System.exit(1); - } + initCount.incrementAndGet(); + synchronized (INCLUDE) { + if (mongo != null) { + return; + } - try { - // strip out prefix since Java driver doesn't currently support - // standard connection format URL yet - // http://www.mongodb.org/display/DOCS/Connections - if (url.startsWith("mongodb://")) { - url = url.substring(10); + // initialize MongoDb driver + Properties props = getProperties(); + String url = props.getProperty("mongodb.url", + "mongodb://localhost:27017"); + database = props.getProperty("mongodb.database", "ycsb"); + String writeConcernType = props.getProperty("mongodb.writeConcern", + "safe").toLowerCase(); + final String maxConnections = props.getProperty( + "mongodb.maxconnections", "10"); + + if ("none".equals(writeConcernType)) { + writeConcern = WriteConcern.NONE; + } + else if ("safe".equals(writeConcernType)) { + writeConcern = WriteConcern.SAFE; + } + else if ("normal".equals(writeConcernType)) { + writeConcern = WriteConcern.NORMAL; + } + else if ("fsync_safe".equals(writeConcernType)) { + writeConcern = WriteConcern.FSYNC_SAFE; + } + else if ("replicas_safe".equals(writeConcernType)) { + writeConcern = WriteConcern.REPLICAS_SAFE; + } + else { + System.err + .println("ERROR: Invalid writeConcern: '" + + writeConcernType + + "'. " + + "Must be [ none | safe | normal | fsync_safe | replicas_safe ]"); + System.exit(1); } - // need to append db to url. - url += "/"+database; - System.out.println("new database url = "+url); - mongo = new Mongo(new DBAddress(url)); - System.out.println("mongo connection created with "+url); - } catch (Exception e1) { - System.err.println( - "Could not initialize MongoDB connection pool for Loader: " - + e1.toString()); - e1.printStackTrace(); - return; - } + try { + // strip out prefix since Java driver doesn't currently support + // standard connection format URL yet + // http://www.mongodb.org/display/DOCS/Connections + if (url.startsWith("mongodb://")) { + url = url.substring(10); + } + // need to append db to url. + url += "/" + database; + System.out.println("new database url = " + url); + MongoOptions options = new MongoOptions(); + options.connectionsPerHost = Integer.parseInt(maxConnections); + mongo = new Mongo(new DBAddress(url), options); + + System.out.println("mongo connection created with " + url); + } + catch (Exception e1) { + System.err + .println("Could not initialize MongoDB connection pool for Loader: " + + e1.toString()); + e1.printStackTrace(); + return; + } + } } - + + /** + * Cleanup any state for this DB. + * Called once per DB instance; there is one DB instance per client thread. + */ @Override - /** - * Cleanup any state for this DB. - * Called once per DB instance; there is one DB instance per client thread. - */ - public void cleanup() throws DBException - { - try { - mongo.close(); - } catch (Exception e1) { - System.err.println( - "Could not close MongoDB connection pool: " - + e1.toString()); - e1.printStackTrace(); - return; + public void cleanup() throws DBException { + if (initCount.decrementAndGet() <= 0) { + try { + mongo.close(); + } + catch (Exception e1) { + System.err.println("Could not close MongoDB connection pool: " + + e1.toString()); + e1.printStackTrace(); + return; + } } - } + } - @Override /** * Delete a record from the database. * @@ -125,8 +157,9 @@ public void cleanup() throws DBException * @param key The record key of the record to delete. * @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes. */ + @Override public int delete(String table, String key) { - com.mongodb.DB db=null; + com.mongodb.DB db = null; try { db = mongo.getDB(database); db.requestStart(); @@ -134,20 +167,18 @@ public int delete(String table, String key) { DBObject q = new BasicDBObject().append("_id", key); WriteResult res = collection.remove(q, writeConcern); return res.getN() == 1 ? 0 : 1; - } catch (Exception e) { + } + catch (Exception e) { System.err.println(e.toString()); return 1; } - finally - { - if (db!=null) - { + finally { + if (db != null) { db.requestDone(); } } } - @Override /** * Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified * record key. @@ -157,7 +188,9 @@ public int delete(String table, String key) { * @param values A HashMap of field/value pairs to insert in the record * @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes. */ - public int insert(String table, String key, HashMap values) { + @Override + public int insert(String table, String key, + HashMap values) { com.mongodb.DB db = null; try { db = mongo.getDB(database); @@ -166,24 +199,23 @@ public int insert(String table, String key, HashMap values DBCollection collection = db.getCollection(table); DBObject r = new BasicDBObject().append("_id", key); - for(String k: values.keySet()) { - r.put(k, values.get(k).toArray()); - } - WriteResult res = collection.insert(r,writeConcern); + for (String k : values.keySet()) { + r.put(k, values.get(k).toArray()); + } + WriteResult res = collection.insert(r, writeConcern); return res.getError() == null ? 0 : 1; - } catch (Exception e) { + } + catch (Exception e) { e.printStackTrace(); return 1; - } finally { - if (db!=null) - { + } + finally { + if (db != null) { db.requestDone(); } } } - @Override - @SuppressWarnings("unchecked") /** * Read a record from the database. Each field/value pair from the result will be stored in a HashMap. * @@ -193,6 +225,8 @@ public int insert(String table, String key, HashMap values * @param result A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error or "not found". */ + @Override + @SuppressWarnings("unchecked") public int read(String table, String key, Set fields, HashMap result) { com.mongodb.DB db = null; @@ -204,16 +238,16 @@ public int read(String table, String key, Set fields, DBCollection collection = db.getCollection(table); DBObject q = new BasicDBObject().append("_id", key); DBObject fieldsToReturn = new BasicDBObject(); - boolean returnAllFields = fields == null; DBObject queryResult = null; - if (!returnAllFields) { + if (fields != null) { Iterator iter = fields.iterator(); while (iter.hasNext()) { - fieldsToReturn.put(iter.next(), 1); + fieldsToReturn.put(iter.next(), INCLUDE); } queryResult = collection.findOne(q, fieldsToReturn); - } else { + } + else { queryResult = collection.findOne(q); } @@ -221,19 +255,18 @@ public int read(String table, String key, Set fields, result.putAll(queryResult.toMap()); } return queryResult != null ? 0 : 1; - } catch (Exception e) { + } + catch (Exception e) { System.err.println(e.toString()); return 1; - } finally { - if (db!=null) - { + } + finally { + if (db != null) { db.requestDone(); } } } - - @Override /** * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified * record key, overwriting any existing values with the same field name. @@ -243,7 +276,9 @@ public int read(String table, String key, Set fields, * @param values A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes. */ - public int update(String table, String key, HashMap values) { + @Override + public int update(String table, String key, + HashMap values) { com.mongodb.DB db = null; try { db = mongo.getDB(database); @@ -264,19 +299,18 @@ public int update(String table, String key, HashMap values WriteResult res = collection.update(q, u, false, false, writeConcern); return res.getN() == 1 ? 0 : 1; - } catch (Exception e) { + } + catch (Exception e) { System.err.println(e.toString()); return 1; - } finally { - if (db!=null) - { + } + finally { + if (db != null) { db.requestDone(); } } } - @Override - @SuppressWarnings("unchecked") /** * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in a HashMap. * @@ -287,9 +321,10 @@ public int update(String table, String key, HashMap values * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record * @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes. */ + @Override public int scan(String table, String startkey, int recordcount, Set fields, Vector> result) { - com.mongodb.DB db=null; + com.mongodb.DB db = null; try { db = mongo.getDB(database); db.requestStart(); @@ -299,23 +334,44 @@ public int scan(String table, String startkey, int recordcount, DBObject q = new BasicDBObject().append("_id", scanRange); DBCursor cursor = collection.find(q).limit(recordcount); while (cursor.hasNext()) { - //toMap() returns a Map, but result.add() expects a Map. Hence, the suppress warnings. - result.add(StringByteIterator.getByteIteratorMap((Map)cursor.next().toMap())); + // toMap() returns a Map, but result.add() expects a + // Map. Hence, the suppress warnings. + HashMap resultMap = new HashMap(); + + DBObject obj = cursor.next(); + fillMap(resultMap, obj); + + result.add(resultMap); } return 0; - } catch (Exception e) { + } + catch (Exception e) { System.err.println(e.toString()); return 1; } - finally - { - if (db!=null) - { + finally { + if (db != null) { db.requestDone(); } } } -} + /** + * TODO - Finish + * + * @param resultMap + * @param obj + */ + @SuppressWarnings("unchecked") + protected void fillMap(HashMap resultMap, DBObject obj) { + Map objMap = obj.toMap(); + for (Map.Entry entry : objMap.entrySet()) { + if (entry.getValue() instanceof byte[]) { + resultMap.put(entry.getKey(), new ByteArrayByteIterator( + (byte[]) entry.getValue())); + } + } + } +} diff --git a/pom.xml b/pom.xml index 59abef40dd..27e1b436ff 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ 7.1.0.CR1 2.1.1 1.0 - 2.8.0 + 2.9.0 1.0.1 2.0.0 0.81