Skip to content

Commit

Permalink
Ensure queries respect fetch size (#299)
Browse files Browse the repository at this point in the history
When querying the database with a fetch size set, some DB engines ignore that setting and get all results into memory, which might crash the client code due to out of memory, or at least cause long GC pauses.

This commit makes PostgreSQL and CockroachDB respect the fetch size, by wrapping the queries in a transaction if needed, so that a server side cursor is kept while the results are being fetched.

In the case of MySQL, server side cursors can be enabled so that it respects the fetch size, but due to bug https://bugs.mysql.com/bug.php?id=106465 this would make the driver ignore the query timeouts.

Co-authored-by: José Fidalgo <jose.fidalgo@feedzai.com>
  • Loading branch information
jmf-tls and José Fidalgo committed Feb 17, 2022
1 parent 9f8bcf4 commit fa3d23e
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,9 @@ public ResultColumn createResultColumn(String name, Object value) {
protected QueryExceptionHandler getQueryExceptionHandler() {
return PG_QUERY_EXCEPTION_HANDLER;
}

@Override
protected boolean needsWrapInTransaction(final int fetchSize) {
return fetchSize > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
Expand Down Expand Up @@ -56,20 +57,31 @@ public abstract class ResultIterator implements AutoCloseable {
/**
* The list of columns names for the given query.
*/
private final List<String> columnNames;
private final List<String> columnNames = new ArrayList<>();

/**
* Signals if the result set is already close.
* Signals if the result set is already closed.
*/
private boolean closed = false;
/**
* Signals if statement in place is closeable. E.g. Prepared statements must no be closed.
* Signals if statement in place is closeable. E.g. Prepared statements must not be closed.
*/
private boolean statementCloseable;
private final boolean statementCloseable;

/**
* The number of rows processed by the iterator so far.
*/
private int currentRowCount;
private int currentRowCount = 0;

/**
* Signals whether the iterator query was wrapped in a transaction before being sent to the database.
*/
private final boolean isWrappedInTransaction;

/**
* Flag to keep the previous state of the {@link Connection#getAutoCommit() conection autocommit}.
*/
private final boolean previousAutocommit;

/**
* Creates a new instance of {@link ResultIterator} for regular statements (on-demand query).
Expand All @@ -82,14 +94,21 @@ public abstract class ResultIterator implements AutoCloseable {
* @param isPreparedStatement True if the given statement is a {@link PreparedStatement}.
* @throws DatabaseEngineException If a database access error occurs.
*/
private ResultIterator(Statement statement, String sql, boolean isPreparedStatement) throws DatabaseEngineException {
private ResultIterator(final Statement statement, final String sql, final boolean isPreparedStatement) throws DatabaseEngineException {
this.statement = statement;
this.columnNames = new ArrayList<>();
this.currentRowCount = 0;

// Process column names.
try {
long start = System.currentTimeMillis();

final Connection connection = statement.getConnection();
this.previousAutocommit = connection.getAutoCommit();
this.isWrappedInTransaction = needsWrapInTransaction(statement.getFetchSize());

if (isWrappedInTransaction) {
connection.setAutoCommit(false);
}

if (isPreparedStatement) {
this.resultSet = ((PreparedStatement) statement).executeQuery();
this.statementCloseable = false;
Expand Down Expand Up @@ -136,8 +155,7 @@ public ResultIterator(PreparedStatement statement) throws DatabaseEngineExceptio
* Retrieves the number of rows processed by the iterator so far. If the iteration
* hasn't started, this method returns 0.
*
* @return The number of rows processed by the iterator so far or 0 if the
* iteration hasn't started.
* @return The number of rows processed by the iterator so far or 0 if the iteration hasn't started.
*/
public int getCurrentRowCount() {
return this.currentRowCount;
Expand All @@ -147,13 +165,10 @@ public int getCurrentRowCount() {
* Retrieves the next row in the result set.
* <p>
* This method also closes the result set upon the last call on the result set.
* </p>
* <p>
* If the statement in place is not a {@link PreparedStatement} it also closes the statement.
* </p>
* <p>
* If an exception is thrown the calling thread is responsible for repeating the action in place.
* </p>
*
* @return The result row.
* @throws DatabaseEngineException If a database access error occurs.
Expand Down Expand Up @@ -250,11 +265,11 @@ public List<String> getColumnNames() {
/**
* Attempts to cancel the current query. This relies on the JDBC driver supporting
* {@link Statement#cancel()}, which is not guaranteed on all drivers.
*
* <p>
* A possible use case for this method is to implement a timeout; If that's the case, see also
* {@link com.feedzai.commons.sql.abstraction.engine.AbstractDatabaseEngine#iterator(String, int, int)} for
* an alternative way to accomplish this.
*
* <p>
* This method is expected to be invoked from a thread distinct of the one that is reading
* from the result set.
*
Expand All @@ -278,22 +293,34 @@ public boolean cancel() {
@Override
public void close() {
// Check for previous closed.
if (!closed) {
if (closed) {
return;
}

if (statement != null && isWrappedInTransaction) {
try {
if (resultSet != null) {
resultSet.close();
}
statement.getConnection().setAutoCommit(previousAutocommit);
} catch (final Exception e) {
logger.warn("Could not reset autocommit.", e);
}
}

if (resultSet != null) {
try {
resultSet.close();
} catch (final Exception e) {
logger.warn("Could not close result set.", e);
}
if (statementCloseable && statement != null) {
try {
statement.close();
} catch (final Exception e) {
logger.warn("Could not close statement.", e);
}
}

if (statementCloseable && statement != null) {
try {
statement.close();
} catch (final Exception e) {
logger.warn("Could not close statement.", e);
}
}

// Assume closed even if it fails.
closed = true;
}
Expand Down Expand Up @@ -333,4 +360,17 @@ private DatabaseEngineException closeAndHandleException(final Exception exceptio
protected QueryExceptionHandler getQueryExceptionHandler() {
return DEFAULT_QUERY_EXCEPTION_HANDLER;
}

/**
* Indicates whether this iterator needs to run inside a transaction.
*
* PostgreSQL (and also CockroachDB) need this in order to keep a cursor on the server, otherwise the fetchsize
* setting is ignored and all results are fetched from the database into memory at once.
*
* @param fetchSize The fetch size for result sets obtained in this iterator.
* @return Whether this iterator needs to run inside a transaction.
*/
protected boolean needsWrapInTransaction(final int fetchSize) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import com.feedzai.commons.sql.abstraction.ddl.DbEntity;
import com.feedzai.commons.sql.abstraction.ddl.DbEntityType;
import com.feedzai.commons.sql.abstraction.dml.result.ResultColumn;
import com.feedzai.commons.sql.abstraction.dml.result.ResultIterator;
import com.feedzai.commons.sql.abstraction.engine.AbstractDatabaseEngine;
import com.feedzai.commons.sql.abstraction.engine.ConnectionResetException;
import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine;
import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineDriver;
import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineException;
import com.feedzai.commons.sql.abstraction.engine.DatabaseFactory;
import com.feedzai.commons.sql.abstraction.engine.DatabaseFactoryException;
Expand Down Expand Up @@ -61,6 +63,8 @@
import static com.feedzai.commons.sql.abstraction.engine.impl.abs.AbstractEngineSchemaTest.Ieee754Support.UNSUPPORTED;
import static com.feedzai.commons.sql.abstraction.util.StringUtils.quotize;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -188,6 +192,57 @@ public void udfTimesTwoTest() throws Exception {
}
}

/**
* Tests that the different DB engines respect the fetch size set in a query.
*
* @throws Exception If any error occurs, thus failing the test.
*/
@Test
public void testFetchSize() throws Exception {
final DatabaseEngineDriver engineDriver = DatabaseEngineDriver.fromEngine(properties.getProperty(ENGINE));

final TestRouter testRouter = new TestRouter(engineDriver.defaultPort());
testRouter.init();

final Properties testProps = TestRouter.getPatchedDbProperties(properties, testRouter.getDbPort(), testRouter.getLocalPort());
testProps.setProperty(PdbProperties.SOCKET_TIMEOUT, "2");
testProps.setProperty(PdbProperties.CHECK_CONNECTION_TIMEOUT, "2");

try (final DatabaseEngine engine = DatabaseFactory.getConnection(testProps)) {
final DbEntity entity = createSpecialValuesEntity();
engine.addEntity(entity);
final String entityName = entity.getName();

for (int i = 0; i < 10; i++) {
engine.persist(entityName, entry().set(ID_COL, i).build());
}

// besides the connection timeouts, also set 2 seconds timeout for the query; fetch size = 3
try (ResultIterator iterator = engine.iterator(select(all()).from(table(entityName)), 3, 2)) {
// if the first 3 rows aren't fetched immediately on the query, they will be fetched next
iterator.next();

// break the connection to the DB
testRouter.breakConnections();

// get next result, which should already have been fetched; connection shouldn't be needed and this shouldn't fail
assertThatCode(iterator::next)
.doesNotThrowAnyException();

// same as previous; ensures that the 3rd result is already fetched, and retries don't prevent from getting it
assertThatCode(iterator::next)
.doesNotThrowAnyException();

// 4th row should be fetched now, but since the connection is broken it should fail
// (unless the driver already fetched all results, which should fail the test)
assertThatThrownBy(iterator::next)
.isInstanceOf(DatabaseEngineException.class);
}
} finally {
testRouter.close();
}
}

/**
* After changing the oracle double data type from DOUBLE PRECISION to BINARY_DOUBLE the special
* value 'NaN' should be inserted into the database without any error.
Expand Down
Loading

0 comments on commit fa3d23e

Please sign in to comment.