... All
+ * attributes are of type VARCHAR. All accesses are through the primary key.
+ * Therefore, only one index on the primary key is needed.
*
- * The following options must be passed when using this database client.
+ *
+ * The following options must be passed when using this database client.
*
*
* - db.driver The JDBC driver class to use.
@@ -46,35 +50,32 @@
* - db.user User name for the connection.
* - db.passwd Password for the connection.
*
- *
+ *
* @author sudipto
- *
+ *
*/
public class JdbcDBClient extends DB implements JdbcDBClientConstants {
-
+
private ArrayList conns;
private boolean initialized = false;
private Properties props;
private static final String DEFAULT_PROP = "";
private ConcurrentMap cachedStatements;
-
+ private int scanFetchLimit;
+
/**
* The statement type for the prepared statements.
*/
private static class StatementType {
-
+
enum Type {
- INSERT(1),
- DELETE(2),
- READ(3),
- UPDATE(4),
- SCAN(5),
- ;
+ INSERT(1), DELETE(2), READ(3), UPDATE(4), SCAN(5), ;
int internalType;
+
private Type(int type) {
internalType = type;
}
-
+
int getHashCode() {
final int prime = 31;
int result = 1;
@@ -82,12 +83,12 @@ int getHashCode() {
return result;
}
}
-
+
Type type;
int shardIndex;
int numFields;
String tableName;
-
+
StatementType(Type type, String tableName, int numFields, int _shardIndex) {
this.type = type;
this.tableName = tableName;
@@ -130,138 +131,163 @@ public boolean equals(Object obj) {
}
}
- /**
- * For the given key, returns what shard contains data for this key
- *
- * @param key Data key to do operation on
- * @return Shard index
- */
- private int getShardIndexByKey(String key) {
- int ret = Math.abs(key.hashCode()) % conns.size();
- //System.out.println(conns.size() + ": Shard instance for "+ key + " (hash " + key.hashCode()+ " ) " + " is " + ret);
- return ret;
- }
+ /**
+ * For the given key, returns what shard contains data for this key
+ *
+ * @param key
+ * Data key to do operation on
+ * @return Shard index
+ */
+ private int getShardIndexByKey(String key) {
+ int ret = Math.abs(key.hashCode()) % conns.size();
+ // System.out.println(conns.size() + ": Shard instance for "+ key +
+ // " (hash " + key.hashCode()+ " ) " + " is " + ret);
+ return ret;
+ }
- /**
- * For the given key, returns Connection object that holds connection
- * to the shard that contains this key
- *
- * @param key Data key to get information for
- * @return Connection object
- */
- private Connection getShardConnectionByKey(String key) {
- return conns.get(getShardIndexByKey(key));
- }
+ /**
+ * For the given key, returns Connection object that holds connection to the
+ * shard that contains this key
+ *
+ * @param key
+ * Data key to get information for
+ * @return Connection object
+ */
+ private Connection getShardConnectionByKey(String key) {
+ return conns.get(getShardIndexByKey(key));
+ }
- private void cleanupAllConnections() throws SQLException {
- for(Connection conn: conns) {
- conn.close();
- }
+ private void cleanupAllConnections() throws SQLException {
+ for (Connection conn : conns) {
+ conn.close();
}
-
+ }
+
/**
- * Initialize the database connection and set it up for sending requests to the database.
- * This must be called once per client.
- * @throws
+ * Initialize the database connection and set it up for sending requests to
+ * the database. This must be called once per client.
+ *
+ * @throws
*/
@Override
- public void init() throws DBException {
- if (initialized) {
- System.err.println("Client connection already initialized.");
- return;
- }
- props = getProperties();
- String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP);
- String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP);
- String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP);
- String driver = props.getProperty(DRIVER_CLASS);
+ public void init() throws DBException {
+ if (initialized) {
+ System.err.println("Client connection already initialized.");
+ return;
+ }
+ props = getProperties();
+ String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP);
+ String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP);
+ String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP);
+ String driver = props.getProperty(DRIVER_CLASS);
+
+ String fetchLimit = props.getProperty(SCAN_FETCH_LIMIT);
+ if (fetchLimit == null) fetchLimit = String.valueOf(SCAN_FETCH_LIMIT_DEFAULT);
+ try {
+ scanFetchLimit = Integer.parseInt(fetchLimit);
+ } catch (NumberFormatException e) {
+ System.err.println("Invalid value for scan fetch limit passed: " + fetchLimit);
+ scanFetchLimit = SCAN_FETCH_LIMIT_DEFAULT;
+ }
- try {
- if (driver != null) {
- Class.forName(driver);
- }
- int shardCount = 0;
- conns = new ArrayList(3);
- for (String url: urls.split(",")) {
- System.out.println("Adding shard node URL: " + url);
- Connection conn = DriverManager.getConnection(url, user, passwd);
- // Since there is no explicit commit method in the DB interface, all
- // operations should auto commit.
- conn.setAutoCommit(true);
- shardCount++;
- conns.add(conn);
- }
+ try {
+ if (driver != null) {
+ Class.forName(driver);
+ }
+ int shardCount = 0;
+ conns = new ArrayList(3);
+ for (String url : urls.split(",")) {
+ System.out.println("Adding shard node URL: " + url);
+ Connection conn = DriverManager.getConnection(url, user, passwd);
+ // Since there is no explicit commit method in the DB interface, all
+ // operations should auto commit.
+ conn.setAutoCommit(true);
+ shardCount++;
+ conns.add(conn);
+ }
- System.out.println("Using " + shardCount + " shards");
+ System.out.println("Using " + shardCount + " shards");
- cachedStatements = new ConcurrentHashMap();
- } catch (ClassNotFoundException e) {
- System.err.println("Error in initializing the JDBS driver: " + e);
- throw new DBException(e);
- } catch (SQLException e) {
- System.err.println("Error in database operation: " + e);
+ cachedStatements = new ConcurrentHashMap();
+ } catch (ClassNotFoundException e) {
+ System.err.println("Error in initializing the JDBS driver: " + e);
+ throw new DBException(e);
+ } catch (SQLException e) {
+ System.err.println("Error in database operation: " + e);
throw new DBException(e);
} catch (NumberFormatException e) {
System.err.println("Invalid value for fieldcount property. " + e);
throw new DBException(e);
}
- initialized = true;
- }
-
+ initialized = true;
+ }
+
@Override
- public void cleanup() throws DBException {
- try {
+ public void cleanup() throws DBException {
+ try {
cleanupAllConnections();
} catch (SQLException e) {
System.err.println("Error in closing the connection. " + e);
throw new DBException(e);
}
- }
-
- private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key)
- throws SQLException {
- StringBuilder insert = new StringBuilder("INSERT INTO ");
- insert.append(insertType.tableName);
- insert.append(" VALUES(?");
+ }
+
+ private PreparedStatement createAndCacheInsertStatement(
+ StatementType insertType, String key) throws SQLException {
+ StringBuilder insert = new StringBuilder("INSERT INTO ");
+ insert.append(insertType.tableName);
+ insert.append(" VALUES(?");
for (int i = 0; i < insertType.numFields; i++) {
insert.append(",?");
}
insert.append(");");
- PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert.toString());
- PreparedStatement stmt = cachedStatements.putIfAbsent(insertType, insertStatement);
- if (stmt == null) return insertStatement;
- else return stmt;
- }
-
- private PreparedStatement createAndCacheReadStatement(StatementType readType, String key)
- throws SQLException {
+ PreparedStatement insertStatement = getShardConnectionByKey(key)
+ .prepareStatement(insert.toString());
+ PreparedStatement stmt = cachedStatements.putIfAbsent(insertType,
+ insertStatement);
+ if (stmt == null)
+ return insertStatement;
+ else
+ return stmt;
+ }
+
+ private PreparedStatement createAndCacheReadStatement(StatementType readType,
+ String key) throws SQLException {
StringBuilder read = new StringBuilder("SELECT * FROM ");
read.append(readType.tableName);
read.append(" WHERE ");
read.append(PRIMARY_KEY);
read.append(" = ");
read.append("?;");
- PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read.toString());
- PreparedStatement stmt = cachedStatements.putIfAbsent(readType, readStatement);
- if (stmt == null) return readStatement;
- else return stmt;
+ PreparedStatement readStatement = getShardConnectionByKey(key)
+ .prepareStatement(read.toString());
+ PreparedStatement stmt = cachedStatements.putIfAbsent(readType,
+ readStatement);
+ if (stmt == null)
+ return readStatement;
+ else
+ return stmt;
}
-
- private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key)
- throws SQLException {
+
+ private PreparedStatement createAndCacheDeleteStatement(
+ StatementType deleteType, String key) throws SQLException {
StringBuilder delete = new StringBuilder("DELETE FROM ");
delete.append(deleteType.tableName);
delete.append(" WHERE ");
delete.append(PRIMARY_KEY);
delete.append(" = ?;");
- PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete.toString());
- PreparedStatement stmt = cachedStatements.putIfAbsent(deleteType, deleteStatement);
- if (stmt == null) return deleteStatement;
- else return stmt;
+ PreparedStatement deleteStatement = getShardConnectionByKey(key)
+ .prepareStatement(delete.toString());
+ PreparedStatement stmt = cachedStatements.putIfAbsent(deleteType,
+ deleteStatement);
+ if (stmt == null)
+ return deleteStatement;
+ else
+ return stmt;
}
-
- private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key)
- throws SQLException {
+
+ private PreparedStatement createAndCacheUpdateStatement(
+ StatementType updateType, String key) throws SQLException {
StringBuilder update = new StringBuilder("UPDATE ");
update.append(updateType.tableName);
update.append(" SET ");
@@ -269,42 +295,56 @@ private PreparedStatement createAndCacheUpdateStatement(StatementType updateType
update.append(COLUMN_PREFIX);
update.append(i);
update.append("=?");
- if (i < updateType.numFields) update.append(", ");
+ if (i < updateType.numFields)
+ update.append(", ");
}
update.append(" WHERE ");
update.append(PRIMARY_KEY);
update.append(" = ?;");
- PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update.toString());
- PreparedStatement stmt = cachedStatements.putIfAbsent(updateType, insertStatement);
- if (stmt == null) return insertStatement;
- else return stmt;
+ PreparedStatement insertStatement = getShardConnectionByKey(key)
+ .prepareStatement(update.toString());
+ PreparedStatement stmt = cachedStatements.putIfAbsent(updateType,
+ insertStatement);
+ if (stmt == null)
+ return insertStatement;
+ else
+ return stmt;
}
-
- private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key)
- throws SQLException {
- StringBuilder select = new StringBuilder("SELECT * FROM ");
+
+ private PreparedStatement createAndCacheScanStatement(StatementType scanType,
+ String key) throws SQLException {
+ StringBuilder select = new StringBuilder("SELECT * FROM ");
select.append(scanType.tableName);
select.append(" WHERE ");
select.append(PRIMARY_KEY);
select.append(" >= ");
select.append("?;");
- PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select.toString());
- PreparedStatement stmt = cachedStatements.putIfAbsent(scanType, scanStatement);
- if (stmt == null) return scanStatement;
- else return stmt;
+ PreparedStatement scanStatement = getShardConnectionByKey(key)
+ .prepareStatement(select.toString());
+ // Set a limit on the number of rows fetched. This prevents the JDBC implementation
+ // to fetch the entire result set. Some implementations, such as that of postgres
+ // fetches the entire result set instead of streaming through the results.
+ scanStatement.setFetchSize(scanFetchLimit);
+ PreparedStatement stmt = cachedStatements.putIfAbsent(scanType,
+ scanStatement);
+ if (stmt == null)
+ return scanStatement;
+ else
+ return stmt;
}
- @Override
- public int read(String tableName, String key, Set fields,
- HashMap result) {
- if (tableName == null) {
+ @Override
+ public int read(String tableName, String key, Set fields,
+ HashMap result) {
+ if (tableName == null) {
return -1;
}
if (key == null) {
return -1;
}
try {
- StatementType type = new StatementType(StatementType.Type.READ, tableName, 1, getShardIndexByKey(key));
+ StatementType type = new StatementType(StatementType.Type.READ,
+ tableName, 1, getShardIndexByKey(key));
PreparedStatement readStatement = cachedStatements.get(type);
if (readStatement == null) {
readStatement = createAndCacheReadStatement(type, key);
@@ -324,22 +364,24 @@ public int read(String tableName, String key, Set fields,
resultSet.close();
return SUCCESS;
} catch (SQLException e) {
- System.err.println("Error in processing read of table " + tableName + ": "+e);
+ System.err.println("Error in processing read of table " + tableName
+ + ": " + e);
return -2;
}
- }
+ }
- @Override
- public int scan(String tableName, String startKey, int recordcount,
- Set fields, Vector> result) {
- if (tableName == null) {
+ @Override
+ public int scan(String tableName, String startKey, int recordcount,
+ Set fields, Vector> result) {
+ if (tableName == null) {
return -1;
}
if (startKey == null) {
return -1;
}
try {
- StatementType type = new StatementType(StatementType.Type.SCAN, tableName, 1, getShardIndexByKey(startKey));
+ StatementType type = new StatementType(StatementType.Type.SCAN,
+ tableName, 1, getShardIndexByKey(startKey));
PreparedStatement scanStatement = cachedStatements.get(type);
if (scanStatement == null) {
scanStatement = createAndCacheScanStatement(type, startKey);
@@ -362,11 +404,12 @@ public int scan(String tableName, String startKey, int recordcount,
System.err.println("Error in processing scan of table: " + tableName + e);
return -2;
}
- }
+ }
- @Override
- public int update(String tableName, String key, HashMap values) {
- if (tableName == null) {
+ @Override
+ public int update(String tableName, String key,
+ HashMap values) {
+ if (tableName == null) {
return -1;
}
if (key == null) {
@@ -374,7 +417,8 @@ public int update(String tableName, String key, HashMap va
}
try {
int numFields = values.size();
- StatementType type = new StatementType(StatementType.Type.UPDATE, tableName, numFields, getShardIndexByKey(key));
+ StatementType type = new StatementType(StatementType.Type.UPDATE,
+ tableName, numFields, getShardIndexByKey(key));
PreparedStatement updateStatement = cachedStatements.get(type);
if (updateStatement == null) {
updateStatement = createAndCacheUpdateStatement(type, key);
@@ -385,29 +429,34 @@ public int update(String tableName, String key, HashMap va
}
updateStatement.setString(index, key);
int result = updateStatement.executeUpdate();
- if (result == 1) return SUCCESS;
- else return 1;
+ if (result == 1)
+ return SUCCESS;
+ else
+ return 1;
} catch (SQLException e) {
- System.err.println("Error in processing update to table: " + tableName + e);
+ System.err.println("Error in processing update to table: " + tableName
+ + e);
return -1;
}
- }
+ }
- @Override
- public int insert(String tableName, String key, HashMap values) {
- if (tableName == null) {
- return -1;
- }
- if (key == null) {
- return -1;
- }
- try {
- int numFields = values.size();
- StatementType type = new StatementType(StatementType.Type.INSERT, tableName, numFields, getShardIndexByKey(key));
- PreparedStatement insertStatement = cachedStatements.get(type);
- if (insertStatement == null) {
- insertStatement = createAndCacheInsertStatement(type, key);
- }
+ @Override
+ public int insert(String tableName, String key,
+ HashMap values) {
+ if (tableName == null) {
+ return -1;
+ }
+ if (key == null) {
+ return -1;
+ }
+ try {
+ int numFields = values.size();
+ StatementType type = new StatementType(StatementType.Type.INSERT,
+ tableName, numFields, getShardIndexByKey(key));
+ PreparedStatement insertStatement = cachedStatements.get(type);
+ if (insertStatement == null) {
+ insertStatement = createAndCacheInsertStatement(type, key);
+ }
insertStatement.setString(1, key);
int index = 2;
for (Map.Entry entry : values.entrySet()) {
@@ -415,35 +464,42 @@ public int insert(String tableName, String key, HashMap va
insertStatement.setString(index++, field);
}
int result = insertStatement.executeUpdate();
- if (result == 1) return SUCCESS;
- else return 1;
+ if (result == 1)
+ return SUCCESS;
+ else
+ return 1;
} catch (SQLException e) {
- System.err.println("Error in processing insert to table: " + tableName + e);
+ System.err.println("Error in processing insert to table: " + tableName
+ + e);
return -1;
}
- }
+ }
- @Override
- public int delete(String tableName, String key) {
- if (tableName == null) {
+ @Override
+ public int delete(String tableName, String key) {
+ if (tableName == null) {
return -1;
}
if (key == null) {
return -1;
}
try {
- StatementType type = new StatementType(StatementType.Type.DELETE, tableName, 1, getShardIndexByKey(key));
+ StatementType type = new StatementType(StatementType.Type.DELETE,
+ tableName, 1, getShardIndexByKey(key));
PreparedStatement deleteStatement = cachedStatements.get(type);
if (deleteStatement == null) {
deleteStatement = createAndCacheDeleteStatement(type, key);
}
deleteStatement.setString(1, key);
int result = deleteStatement.executeUpdate();
- if (result == 1) return SUCCESS;
- else return 1;
+ if (result == 1)
+ return SUCCESS;
+ else
+ return 1;
} catch (SQLException e) {
- System.err.println("Error in processing delete to table: " + tableName + e);
+ System.err.println("Error in processing delete to table: " + tableName
+ + e);
return -1;
}
- }
+ }
}
diff --git a/db/jdbc/src/com/yahoo/ycsb/db/JdbcDBClientConstants.java b/db/jdbc/src/com/yahoo/ycsb/db/JdbcDBClientConstants.java
index 79e0525e2a..1ab04e67be 100644
--- a/db/jdbc/src/com/yahoo/ycsb/db/JdbcDBClientConstants.java
+++ b/db/jdbc/src/com/yahoo/ycsb/db/JdbcDBClientConstants.java
@@ -42,6 +42,15 @@ public interface JdbcDBClientConstants {
/** Default number of fields in a record. */
public static final String FIELD_COUNT_PROPERTY_DEFAULT="10";
+ /**
+ * The name of the property that specify the maximum number of entries
+ * prefetched when a scan is executed.
+ */
+ public static final String SCAN_FETCH_LIMIT = "scan.fetch.limit";
+
+ /** Default limit for scan fetch. */
+ public static final int SCAN_FETCH_LIMIT_DEFAULT = 10;
+
/** Representing a NULL value. */
public static final String NULL_VALUE = "NULL";