Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Fix an issue with JDBC scans #47

Closed
wants to merge 2 commits into from

5 participants

@sudiptodas

When issuing scans, a JDBC driver can either use a cursor to stream the results as they are consumed or can fetch the entire result set. The postgres JDBC driver does the latter. Therefore, YCSB runs out of heap space in workloads with scans on large tables. I added a new parameter to specify a limit on the number of rows fetched. This is a hint to the driver, not a binding contract, but seems to work with postgres. I felt this is a more portable way compared to using say LIMIT clause, whose syntax varies from one DBMS to another.

The added parameter is scan.fetch.limit whose default is 10 rows and can be overridden by the -p option of the YCSB command line arguments.

Hope this is helpful.

Sudipto

@m1ch1
Collaborator

Hi Sudipto,

Thanks for the patch! We've had many check-ins recently, and it looks like your branch is a bit out of date. Could you pull all the changes from the master branch?

Thanks!
--Michi

@sudiptodas

Hi Michi,

I'll look into it.

Sudipto

@sudiptodas

Hi Michi,

Let me know if the above merge is what you wanted.

Sudipto

@m1ch1
Collaborator

Hi Sudipto,

I still see changes that I think shouldn't be included in this patch:

sudiptodas@aaa247c

Maybe it's easier if you can send me the diff for your JDBC client change. Sorry I'm still a git newbie :)

Thanks!
--Michi

@pmq pmq referenced this pull request from a commit in pmq/YCSB
@pmq pmq gh-47 Fix an issue with JDBC scans
JDBC scans did not instruct the driver to use a specific fetchsize,
resulting in most cases in an OOM; add support for setting fetchsize.

Also add support for disabling autocommit on the JDBC connections,
mostly because Postgres for example doesn't support setting fetchsize
without disabling autocommit.
e44e677
@pmq

Hi Michi, here's a patch for this issue. I don't know if I should reopen a pull request or if this is sufficient.

Sudipto's patch won't probably work, as for this feature to work correctly on Postgres, you'd need as documented in my patch to disable auto-commit, otherwise the underlying cursor won't be present any more.

@m1ch1
Collaborator

Thanks Pierre, I'll review the patch some time this week.

--Michi

@m1ch1 m1ch1 referenced this pull request from a commit
@pmq pmq gh-47 Fix an issue with JDBC scans
JDBC scans did not instruct the driver to use a specific fetchsize,
resulting in most cases in an OOM; add support for setting fetchsize.

Also add support for disabling autocommit on the JDBC connections,
mostly because Postgres for example doesn't support setting fetchsize
without disabling autocommit.
7b564cc
@joey joey referenced this pull request from a commit in joey/YCSB
@pmq pmq gh-47 Fix an issue with JDBC scans
JDBC scans did not instruct the driver to use a specific fetchsize,
resulting in most cases in an OOM; add support for setting fetchsize.

Also add support for disabling autocommit on the JDBC connections,
mostly because Postgres for example doesn't support setting fetchsize
without disabling autocommit.
2a5c4a3
@wolfgangihloff wolfgangihloff referenced this pull request from a commit
@pmq pmq gh-47 Fix an issue with JDBC scans
JDBC scans did not instruct the driver to use a specific fetchsize,
resulting in most cases in an OOM; add support for setting fetchsize.

Also add support for disabling autocommit on the JDBC connections,
mostly because Postgres for example doesn't support setting fetchsize
without disabling autocommit.
11e736c
@busbey
Collaborator

this got fixed back in commit 7b564cc

@busbey busbey closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
412 db/jdbc/src/com/yahoo/ycsb/db/JdbcDBClient.java
@@ -28,17 +28,21 @@
import java.util.concurrent.ConcurrentMap;
/**
- * A class that wraps a JDBC compliant database to allow it to be interfaced with YCSB.
- * This class extends {@link DB} and implements the database interface used by YCSB client.
+ * A class that wraps a JDBC compliant database to allow it to be interfaced
+ * with YCSB. This class extends {@link DB} and implements the database
+ * interface used by YCSB client.
*
- * <br> Each client will have its own instance of this class. This client is
- * not thread safe.
+ * <br>
+ * Each client will have its own instance of this class. This client is not
+ * thread safe.
*
- * <br> This interface expects a schema <key> <field1> <field2> <field3> ...
- * All attributes are of type VARCHAR. All accesses are through the primary key. Therefore,
- * only one index on the primary key is needed.
+ * <br>
+ * This interface expects a schema <key> <field1> <field2> <field3> ... All
+ * attributes are of type VARCHAR. All accesses are through the primary key.
+ * Therefore, only one index on the primary key is needed.
*
- * <p> The following options must be passed when using this database client.
+ * <p>
+ * The following options must be passed when using this database client.
*
* <ul>
* <li><b>db.driver</b> The JDBC driver class to use.</li>
@@ -46,35 +50,32 @@
* <li><b>db.user</b> User name for the connection.</li>
* <li><b>db.passwd</b> Password for the connection.</li>
* </ul>
- *
+ *
* @author sudipto
- *
+ *
*/
public class JdbcDBClient extends DB implements JdbcDBClientConstants {
-
+
private ArrayList<Connection> conns;
private boolean initialized = false;
private Properties props;
private static final String DEFAULT_PROP = "";
private ConcurrentMap<StatementType, PreparedStatement> cachedStatements;
-
+ private int scanFetchLimit;
+
/**
* The statement type for the prepared statements.
*/
private static class StatementType {
-
+
enum Type {
- INSERT(1),
- DELETE(2),
- READ(3),
- UPDATE(4),
- SCAN(5),
- ;
+ INSERT(1), DELETE(2), READ(3), UPDATE(4), SCAN(5), ;
int internalType;
+
private Type(int type) {
internalType = type;
}
-
+
int getHashCode() {
final int prime = 31;
int result = 1;
@@ -82,12 +83,12 @@ int getHashCode() {
return result;
}
}
-
+
Type type;
int shardIndex;
int numFields;
String tableName;
-
+
StatementType(Type type, String tableName, int numFields, int _shardIndex) {
this.type = type;
this.tableName = tableName;
@@ -130,138 +131,163 @@ public boolean equals(Object obj) {
}
}
- /**
- * For the given key, returns what shard contains data for this key
- *
- * @param key Data key to do operation on
- * @return Shard index
- */
- private int getShardIndexByKey(String key) {
- int ret = Math.abs(key.hashCode()) % conns.size();
- //System.out.println(conns.size() + ": Shard instance for "+ key + " (hash " + key.hashCode()+ " ) " + " is " + ret);
- return ret;
- }
+ /**
+ * For the given key, returns what shard contains data for this key
+ *
+ * @param key
+ * Data key to do operation on
+ * @return Shard index
+ */
+ private int getShardIndexByKey(String key) {
+ int ret = Math.abs(key.hashCode()) % conns.size();
+ // System.out.println(conns.size() + ": Shard instance for "+ key +
+ // " (hash " + key.hashCode()+ " ) " + " is " + ret);
+ return ret;
+ }
- /**
- * For the given key, returns Connection object that holds connection
- * to the shard that contains this key
- *
- * @param key Data key to get information for
- * @return Connection object
- */
- private Connection getShardConnectionByKey(String key) {
- return conns.get(getShardIndexByKey(key));
- }
+ /**
+ * For the given key, returns Connection object that holds connection to the
+ * shard that contains this key
+ *
+ * @param key
+ * Data key to get information for
+ * @return Connection object
+ */
+ private Connection getShardConnectionByKey(String key) {
+ return conns.get(getShardIndexByKey(key));
+ }
- private void cleanupAllConnections() throws SQLException {
- for(Connection conn: conns) {
- conn.close();
- }
+ private void cleanupAllConnections() throws SQLException {
+ for (Connection conn : conns) {
+ conn.close();
}
-
+ }
+
/**
- * Initialize the database connection and set it up for sending requests to the database.
- * This must be called once per client.
- * @throws
+ * Initialize the database connection and set it up for sending requests to
+ * the database. This must be called once per client.
+ *
+ * @throws
*/
@Override
- public void init() throws DBException {
- if (initialized) {
- System.err.println("Client connection already initialized.");
- return;
- }
- props = getProperties();
- String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP);
- String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP);
- String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP);
- String driver = props.getProperty(DRIVER_CLASS);
+ public void init() throws DBException {
+ if (initialized) {
+ System.err.println("Client connection already initialized.");
+ return;
+ }
+ props = getProperties();
+ String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP);
+ String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP);
+ String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP);
+ String driver = props.getProperty(DRIVER_CLASS);
+
+ String fetchLimit = props.getProperty(SCAN_FETCH_LIMIT);
+ if (fetchLimit == null) fetchLimit = String.valueOf(SCAN_FETCH_LIMIT_DEFAULT);
+ try {
+ scanFetchLimit = Integer.parseInt(fetchLimit);
+ } catch (NumberFormatException e) {
+ System.err.println("Invalid value for scan fetch limit passed: " + fetchLimit);
+ scanFetchLimit = SCAN_FETCH_LIMIT_DEFAULT;
+ }
- try {
- if (driver != null) {
- Class.forName(driver);
- }
- int shardCount = 0;
- conns = new ArrayList<Connection>(3);
- for (String url: urls.split(",")) {
- System.out.println("Adding shard node URL: " + url);
- Connection conn = DriverManager.getConnection(url, user, passwd);
- // Since there is no explicit commit method in the DB interface, all
- // operations should auto commit.
- conn.setAutoCommit(true);
- shardCount++;
- conns.add(conn);
- }
+ try {
+ if (driver != null) {
+ Class.forName(driver);
+ }
+ int shardCount = 0;
+ conns = new ArrayList<Connection>(3);
+ for (String url : urls.split(",")) {
+ System.out.println("Adding shard node URL: " + url);
+ Connection conn = DriverManager.getConnection(url, user, passwd);
+ // Since there is no explicit commit method in the DB interface, all
+ // operations should auto commit.
+ conn.setAutoCommit(true);
+ shardCount++;
+ conns.add(conn);
+ }
- System.out.println("Using " + shardCount + " shards");
+ System.out.println("Using " + shardCount + " shards");
- cachedStatements = new ConcurrentHashMap<StatementType, PreparedStatement>();
- } catch (ClassNotFoundException e) {
- System.err.println("Error in initializing the JDBS driver: " + e);
- throw new DBException(e);
- } catch (SQLException e) {
- System.err.println("Error in database operation: " + e);
+ cachedStatements = new ConcurrentHashMap<StatementType, PreparedStatement>();
+ } catch (ClassNotFoundException e) {
+ System.err.println("Error in initializing the JDBS driver: " + e);
+ throw new DBException(e);
+ } catch (SQLException e) {
+ System.err.println("Error in database operation: " + e);
throw new DBException(e);
} catch (NumberFormatException e) {
System.err.println("Invalid value for fieldcount property. " + e);
throw new DBException(e);
}
- initialized = true;
- }
-
+ initialized = true;
+ }
+
@Override
- public void cleanup() throws DBException {
- try {
+ public void cleanup() throws DBException {
+ try {
cleanupAllConnections();
} catch (SQLException e) {
System.err.println("Error in closing the connection. " + e);
throw new DBException(e);
}
- }
-
- private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key)
- throws SQLException {
- StringBuilder insert = new StringBuilder("INSERT INTO ");
- insert.append(insertType.tableName);
- insert.append(" VALUES(?");
+ }
+
+ private PreparedStatement createAndCacheInsertStatement(
+ StatementType insertType, String key) throws SQLException {
+ StringBuilder insert = new StringBuilder("INSERT INTO ");
+ insert.append(insertType.tableName);
+ insert.append(" VALUES(?");
for (int i = 0; i < insertType.numFields; i++) {
insert.append(",?");
}
insert.append(");");
- PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert.toString());
- PreparedStatement stmt = cachedStatements.putIfAbsent(insertType, insertStatement);
- if (stmt == null) return insertStatement;
- else return stmt;
- }
-
- private PreparedStatement createAndCacheReadStatement(StatementType readType, String key)
- throws SQLException {
+ PreparedStatement insertStatement = getShardConnectionByKey(key)
+ .prepareStatement(insert.toString());
+ PreparedStatement stmt = cachedStatements.putIfAbsent(insertType,
+ insertStatement);
+ if (stmt == null)
+ return insertStatement;
+ else
+ return stmt;
+ }
+
+ private PreparedStatement createAndCacheReadStatement(StatementType readType,
+ String key) throws SQLException {
StringBuilder read = new StringBuilder("SELECT * FROM ");
read.append(readType.tableName);
read.append(" WHERE ");
read.append(PRIMARY_KEY);
read.append(" = ");
read.append("?;");
- PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read.toString());
- PreparedStatement stmt = cachedStatements.putIfAbsent(readType, readStatement);
- if (stmt == null) return readStatement;
- else return stmt;
+ PreparedStatement readStatement = getShardConnectionByKey(key)
+ .prepareStatement(read.toString());
+ PreparedStatement stmt = cachedStatements.putIfAbsent(readType,
+ readStatement);
+ if (stmt == null)
+ return readStatement;
+ else
+ return stmt;
}
-
- private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key)
- throws SQLException {
+
+ private PreparedStatement createAndCacheDeleteStatement(
+ StatementType deleteType, String key) throws SQLException {
StringBuilder delete = new StringBuilder("DELETE FROM ");
delete.append(deleteType.tableName);
delete.append(" WHERE ");
delete.append(PRIMARY_KEY);
delete.append(" = ?;");
- PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete.toString());
- PreparedStatement stmt = cachedStatements.putIfAbsent(deleteType, deleteStatement);
- if (stmt == null) return deleteStatement;
- else return stmt;
+ PreparedStatement deleteStatement = getShardConnectionByKey(key)
+ .prepareStatement(delete.toString());
+ PreparedStatement stmt = cachedStatements.putIfAbsent(deleteType,
+ deleteStatement);
+ if (stmt == null)
+ return deleteStatement;
+ else
+ return stmt;
}
-
- private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key)
- throws SQLException {
+
+ private PreparedStatement createAndCacheUpdateStatement(
+ StatementType updateType, String key) throws SQLException {
StringBuilder update = new StringBuilder("UPDATE ");
update.append(updateType.tableName);
update.append(" SET ");
@@ -269,42 +295,56 @@ private PreparedStatement createAndCacheUpdateStatement(StatementType updateType
update.append(COLUMN_PREFIX);
update.append(i);
update.append("=?");
- if (i < updateType.numFields) update.append(", ");
+ if (i < updateType.numFields)
+ update.append(", ");
}
update.append(" WHERE ");
update.append(PRIMARY_KEY);
update.append(" = ?;");
- PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update.toString());
- PreparedStatement stmt = cachedStatements.putIfAbsent(updateType, insertStatement);
- if (stmt == null) return insertStatement;
- else return stmt;
+ PreparedStatement insertStatement = getShardConnectionByKey(key)
+ .prepareStatement(update.toString());
+ PreparedStatement stmt = cachedStatements.putIfAbsent(updateType,
+ insertStatement);
+ if (stmt == null)
+ return insertStatement;
+ else
+ return stmt;
}
-
- private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key)
- throws SQLException {
- StringBuilder select = new StringBuilder("SELECT * FROM ");
+
+ private PreparedStatement createAndCacheScanStatement(StatementType scanType,
+ String key) throws SQLException {
+ StringBuilder select = new StringBuilder("SELECT * FROM ");
select.append(scanType.tableName);
select.append(" WHERE ");
select.append(PRIMARY_KEY);
select.append(" >= ");
select.append("?;");
- PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select.toString());
- PreparedStatement stmt = cachedStatements.putIfAbsent(scanType, scanStatement);
- if (stmt == null) return scanStatement;
- else return stmt;
+ PreparedStatement scanStatement = getShardConnectionByKey(key)
+ .prepareStatement(select.toString());
+ // Set a limit on the number of rows fetched. This prevents the JDBC implementation
+ // to fetch the entire result set. Some implementations, such as that of postgres
+ // fetches the entire result set instead of streaming through the results.
+ scanStatement.setFetchSize(scanFetchLimit);
+ PreparedStatement stmt = cachedStatements.putIfAbsent(scanType,
+ scanStatement);
+ if (stmt == null)
+ return scanStatement;
+ else
+ return stmt;
}
- @Override
- public int read(String tableName, String key, Set<String> fields,
- HashMap<String, ByteIterator> result) {
- if (tableName == null) {
+ @Override
+ public int read(String tableName, String key, Set<String> fields,
+ HashMap<String, ByteIterator> result) {
+ if (tableName == null) {
return -1;
}
if (key == null) {
return -1;
}
try {
- StatementType type = new StatementType(StatementType.Type.READ, tableName, 1, getShardIndexByKey(key));
+ StatementType type = new StatementType(StatementType.Type.READ,
+ tableName, 1, getShardIndexByKey(key));
PreparedStatement readStatement = cachedStatements.get(type);
if (readStatement == null) {
readStatement = createAndCacheReadStatement(type, key);
@@ -324,22 +364,24 @@ public int read(String tableName, String key, Set<String> fields,
resultSet.close();
return SUCCESS;
} catch (SQLException e) {
- System.err.println("Error in processing read of table " + tableName + ": "+e);
+ System.err.println("Error in processing read of table " + tableName
+ + ": " + e);
return -2;
}
- }
+ }
- @Override
- public int scan(String tableName, String startKey, int recordcount,
- Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
- if (tableName == null) {
+ @Override
+ public int scan(String tableName, String startKey, int recordcount,
+ Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
+ if (tableName == null) {
return -1;
}
if (startKey == null) {
return -1;
}
try {
- StatementType type = new StatementType(StatementType.Type.SCAN, tableName, 1, getShardIndexByKey(startKey));
+ StatementType type = new StatementType(StatementType.Type.SCAN,
+ tableName, 1, getShardIndexByKey(startKey));
PreparedStatement scanStatement = cachedStatements.get(type);
if (scanStatement == null) {
scanStatement = createAndCacheScanStatement(type, startKey);
@@ -362,11 +404,12 @@ public int scan(String tableName, String startKey, int recordcount,
System.err.println("Error in processing scan of table: " + tableName + e);
return -2;
}
- }
+ }
- @Override
- public int update(String tableName, String key, HashMap<String, ByteIterator> values) {
- if (tableName == null) {
+ @Override
+ public int update(String tableName, String key,
+ HashMap<String, ByteIterator> values) {
+ if (tableName == null) {
return -1;
}
if (key == null) {
@@ -374,7 +417,8 @@ public int update(String tableName, String key, HashMap<String, ByteIterator> va
}
try {
int numFields = values.size();
- StatementType type = new StatementType(StatementType.Type.UPDATE, tableName, numFields, getShardIndexByKey(key));
+ StatementType type = new StatementType(StatementType.Type.UPDATE,
+ tableName, numFields, getShardIndexByKey(key));
PreparedStatement updateStatement = cachedStatements.get(type);
if (updateStatement == null) {
updateStatement = createAndCacheUpdateStatement(type, key);
@@ -385,29 +429,34 @@ public int update(String tableName, String key, HashMap<String, ByteIterator> va
}
updateStatement.setString(index, key);
int result = updateStatement.executeUpdate();
- if (result == 1) return SUCCESS;
- else return 1;
+ if (result == 1)
+ return SUCCESS;
+ else
+ return 1;
} catch (SQLException e) {
- System.err.println("Error in processing update to table: " + tableName + e);
+ System.err.println("Error in processing update to table: " + tableName
+ + e);
return -1;
}
- }
+ }
- @Override
- public int insert(String tableName, String key, HashMap<String, ByteIterator> values) {
- if (tableName == null) {
- return -1;
- }
- if (key == null) {
- return -1;
- }
- try {
- int numFields = values.size();
- StatementType type = new StatementType(StatementType.Type.INSERT, tableName, numFields, getShardIndexByKey(key));
- PreparedStatement insertStatement = cachedStatements.get(type);
- if (insertStatement == null) {
- insertStatement = createAndCacheInsertStatement(type, key);
- }
+ @Override
+ public int insert(String tableName, String key,
+ HashMap<String, ByteIterator> values) {
+ if (tableName == null) {
+ return -1;
+ }
+ if (key == null) {
+ return -1;
+ }
+ try {
+ int numFields = values.size();
+ StatementType type = new StatementType(StatementType.Type.INSERT,
+ tableName, numFields, getShardIndexByKey(key));
+ PreparedStatement insertStatement = cachedStatements.get(type);
+ if (insertStatement == null) {
+ insertStatement = createAndCacheInsertStatement(type, key);
+ }
insertStatement.setString(1, key);
int index = 2;
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
@@ -415,35 +464,42 @@ public int insert(String tableName, String key, HashMap<String, ByteIterator> va
insertStatement.setString(index++, field);
}
int result = insertStatement.executeUpdate();
- if (result == 1) return SUCCESS;
- else return 1;
+ if (result == 1)
+ return SUCCESS;
+ else
+ return 1;
} catch (SQLException e) {
- System.err.println("Error in processing insert to table: " + tableName + e);
+ System.err.println("Error in processing insert to table: " + tableName
+ + e);
return -1;
}
- }
+ }
- @Override
- public int delete(String tableName, String key) {
- if (tableName == null) {
+ @Override
+ public int delete(String tableName, String key) {
+ if (tableName == null) {
return -1;
}
if (key == null) {
return -1;
}
try {
- StatementType type = new StatementType(StatementType.Type.DELETE, tableName, 1, getShardIndexByKey(key));
+ StatementType type = new StatementType(StatementType.Type.DELETE,
+ tableName, 1, getShardIndexByKey(key));
PreparedStatement deleteStatement = cachedStatements.get(type);
if (deleteStatement == null) {
deleteStatement = createAndCacheDeleteStatement(type, key);
}
deleteStatement.setString(1, key);
int result = deleteStatement.executeUpdate();
- if (result == 1) return SUCCESS;
- else return 1;
+ if (result == 1)
+ return SUCCESS;
+ else
+ return 1;
} catch (SQLException e) {
- System.err.println("Error in processing delete to table: " + tableName + e);
+ System.err.println("Error in processing delete to table: " + tableName
+ + e);
return -1;
}
- }
+ }
}
View
9 db/jdbc/src/com/yahoo/ycsb/db/JdbcDBClientConstants.java
@@ -42,6 +42,15 @@
/** Default number of fields in a record. */
public static final String FIELD_COUNT_PROPERTY_DEFAULT="10";
+ /**
+ * The name of the property that specify the maximum number of entries
+ * prefetched when a scan is executed.
+ */
+ public static final String SCAN_FETCH_LIMIT = "scan.fetch.limit";
+
+ /** Default limit for scan fetch. */
+ public static final int SCAN_FETCH_LIMIT_DEFAULT = 10;
+
/** Representing a NULL value. */
public static final String NULL_VALUE = "NULL";
Something went wrong with that request. Please try again.