diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java index effab741..8a1c65ca 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java @@ -162,7 +162,6 @@ public static void init() throws IOException { Datastore mongoDatastore = DatastoreProvider.getDatastore("Mongo", config); System.out.println(mongoDatastore.listCollections()); - postgres = new GenericContainer<>(DockerImageName.parse("postgres:13.1")) .withEnv("POSTGRES_PASSWORD", "postgres") diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPoolIntegrationTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPoolIntegrationTest.java new file mode 100644 index 00000000..8c427be8 --- /dev/null +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPoolIntegrationTest.java @@ -0,0 +1,274 @@ +package org.hypertrace.core.documentstore.postgres; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import org.hypertrace.core.documentstore.model.config.ConnectionConfig; +import org.hypertrace.core.documentstore.model.config.ConnectionCredentials; +import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig; +import org.hypertrace.core.documentstore.model.config.DatabaseType; +import org.hypertrace.core.documentstore.model.config.Endpoint; +import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class PostgresConnectionPoolIntegrationTest { + + private static GenericContainer postgres; + private static String host; + private static int port; + + @BeforeAll + public static void init() { + postgres = + new GenericContainer<>(DockerImageName.parse("postgres:13.1")) + .withEnv("POSTGRES_PASSWORD", "postgres") + .withEnv("POSTGRES_USER", "postgres") + .withExposedPorts(5432) + .waitingFor(Wait.forListeningPort()); + postgres.start(); + + host = postgres.getHost(); + port = postgres.getMappedPort(5432); + } + + @AfterAll + public static void shutdown() { + postgres.stop(); + } + + @Test + public void testGetConnection() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + try (final Connection connection = pool.getConnection()) { + assertNotNull(connection); + assertTrue(connection.getAutoCommit(), "Regular connection should have autoCommit=true"); + assertFalse(connection.isClosed()); + + // Verify the connection works by executing a simple query + try (final PreparedStatement stmt = connection.prepareStatement("SELECT 1"); + final ResultSet rs = stmt.executeQuery()) { + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + } + } + + pool.close(); + } + + @Test + public void testGetTransactionalConnection() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + try (final Connection connection = pool.getTransactionalConnection()) { + assertNotNull(connection); + assertFalse( + connection.getAutoCommit(), "Transactional connection should have autoCommit=false"); + assertFalse(connection.isClosed()); + + // Verify the connection works by executing a simple query + try (final PreparedStatement stmt = connection.prepareStatement("SELECT 2"); + final ResultSet rs = stmt.executeQuery()) { + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + } + + // Verify we can commit manually + connection.commit(); + } + + pool.close(); + } + + @Test + public void testBothPoolsIndependent() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + // Get connections from both pools simultaneously + try (final Connection regularConn = pool.getConnection(); + final Connection transactionalConn = pool.getTransactionalConnection()) { + + assertNotNull(regularConn); + assertNotNull(transactionalConn); + + // Verify they have different autoCommit settings + assertTrue(regularConn.getAutoCommit()); + assertFalse(transactionalConn.getAutoCommit()); + + // Both should work independently + try (final PreparedStatement stmt1 = regularConn.prepareStatement("SELECT 'regular'"); + final ResultSet rs1 = stmt1.executeQuery()) { + assertTrue(rs1.next()); + assertEquals("regular", rs1.getString(1)); + } + + try (final PreparedStatement stmt2 = + transactionalConn.prepareStatement("SELECT 'transactional'"); + final ResultSet rs2 = stmt2.executeQuery()) { + assertTrue(rs2.next()); + assertEquals("transactional", rs2.getString(1)); + } + + transactionalConn.commit(); + } + + pool.close(); + } + + @Test + public void testConnectionPooling() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + // Get and release connections multiple times + Connection conn1 = pool.getConnection(); + assertNotNull(conn1); + conn1.close(); + + Connection conn2 = pool.getConnection(); + assertNotNull(conn2); + conn2.close(); + + // Verify pooling is working by getting multiple connections from transactional pool + Connection tConn1 = pool.getTransactionalConnection(); + assertNotNull(tConn1); + tConn1.close(); + + Connection tConn2 = pool.getTransactionalConnection(); + assertNotNull(tConn2); + tConn2.close(); + + pool.close(); + } + + @Test + public void testTransactionalCommitAndRollback() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + // Create a test table + try (final Connection setupConn = pool.getTransactionalConnection()) { + try (final PreparedStatement stmt = + setupConn.prepareStatement( + "CREATE TABLE IF NOT EXISTS test_table (id INT PRIMARY KEY, value TEXT)")) { + stmt.execute(); + } + setupConn.commit(); + } + + // Test commit + try (final Connection conn = pool.getTransactionalConnection()) { + try (final PreparedStatement stmt = + conn.prepareStatement("INSERT INTO test_table (id, value) VALUES (1, 'test')")) { + stmt.execute(); + } + conn.commit(); + + // Verify data was committed + try (final PreparedStatement stmt = + conn.prepareStatement("SELECT value FROM test_table WHERE id = 1"); + final ResultSet rs = stmt.executeQuery()) { + assertTrue(rs.next()); + assertEquals("test", rs.getString(1)); + } + } + + // Test rollback + try (final Connection conn = pool.getTransactionalConnection()) { + try (final PreparedStatement stmt = + conn.prepareStatement("INSERT INTO test_table (id, value) VALUES (2, 'rollback_me')")) { + stmt.execute(); + } + conn.rollback(); + + // Verify data was not committed + try (final PreparedStatement stmt = + conn.prepareStatement("SELECT value FROM test_table WHERE id = 2"); + final ResultSet rs = stmt.executeQuery()) { + assertFalse(rs.next(), "Data should have been rolled back"); + } + } + + // Cleanup + try (final Connection cleanupConn = pool.getTransactionalConnection()) { + try (final PreparedStatement stmt = + cleanupConn.prepareStatement("DROP TABLE IF EXISTS test_table")) { + stmt.execute(); + } + cleanupConn.commit(); + } + + pool.close(); + } + + @Test + public void testClose() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + // Get connections to ensure pools are active + final Connection regularConnection = pool.getConnection(); + final Connection transactionalConnection = pool.getTransactionalConnection(); + + assertNotNull(regularConnection); + assertNotNull(transactionalConnection); + + // Close the connections back to the pool + regularConnection.close(); + transactionalConnection.close(); + + // Close the pool - should not throw + pool.close(); + + // After closing the pool, trying to get connections should fail with IllegalStateException + assertThrows(IllegalStateException.class, pool::getConnection); + assertThrows(IllegalStateException.class, pool::getTransactionalConnection); + } + + @Test + public void testCloseIdempotent() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + // First close + pool.close(); + + // Second close should not throw + pool.close(); + } + + private static PostgresConnectionConfig createTestConfig() { + return (PostgresConnectionConfig) + ConnectionConfig.builder() + .type(DatabaseType.POSTGRES) + .addEndpoint(Endpoint.builder().host(host).port(port).build()) + .database("postgres") + .credentials( + ConnectionCredentials.builder().username("postgres").password("postgres").build()) + .connectionPoolConfig( + ConnectionPoolConfig.builder() + .maxConnections(5) + .connectionAccessTimeout(Duration.ofSeconds(10)) + .connectionSurrenderTimeout(Duration.ofSeconds(30)) + .build()) + .build(); + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java index 7ce94ac2..fa6a12cf 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java @@ -33,6 +33,7 @@ public PostgresClient(final PostgresConnectionConfig config) { this.connectionPool = new PostgresConnectionPool(connectionConfig); } + // todo: Deprecate this method. All connections should be obtained from the connection pool. public synchronized Connection getConnection() { try { if (connection == null) { @@ -48,10 +49,22 @@ public synchronized Connection getConnection() { return connection; } + /** + * Get a pooled connection with autoCommit=true. Use for read queries that don't need manual + * transaction management. + */ public Connection getPooledConnection() throws SQLException { return connectionPool.getConnection(); } + /** + * Get a pooled connection with autoCommit=false. Use for operations that require manual + * transaction management (commit/rollback). + */ + public Connection getTransactionalConnection() throws SQLException { + return connectionPool.getTransactionalConnection(); + } + public Map getCustomParameters() { return connectionConfig.customParameters(); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index bad07de5..890317c9 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -492,7 +492,7 @@ public Optional update( throws IOException { updateValidator.validate(updates); - try (final Connection connection = client.getPooledConnection()) { + try (final Connection connection = client.getTransactionalConnection()) { org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser parser = new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser( tableIdentifier, query); @@ -828,11 +828,11 @@ protected long countWithParser( org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser) { String subQuery = queryParser.parse(); String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) p(countWithParser)", subQuery); - try { - PreparedStatement preparedStatement = - queryExecutor.buildPreparedStatement( - sqlQuery, queryParser.getParamsBuilder().build(), client.getConnection()); - ResultSet resultSet = preparedStatement.executeQuery(); + try (Connection connection = client.getPooledConnection(); + PreparedStatement preparedStatement = + queryExecutor.buildPreparedStatement( + sqlQuery, queryParser.getParamsBuilder().build(), connection); + ResultSet resultSet = preparedStatement.executeQuery()) { resultSet.next(); return resultSet.getLong(1); } catch (SQLException e) { @@ -842,24 +842,38 @@ protected long countWithParser( } } + // This overload obtains a pooled connection and passes it to the iterator + // The iterator is responsible for returning the connection to the pool when closed protected CloseableIterator queryWithParser( org.hypertrace.core.documentstore.query.Query query, org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser) { + Connection connection = null; try { - ResultSet resultSet = queryExecutor.execute(client.getConnection(), queryParser); + connection = client.getPooledConnection(); + ResultSet resultSet = queryExecutor.execute(connection, queryParser); if (queryParser.getPgColTransformer().getDocumentType() == DocumentType.NESTED) { return !query.getSelections().isEmpty() - ? new PostgresResultIteratorWithMetaData(resultSet) - : new PostgresResultIterator(resultSet); + ? new PostgresResultIteratorWithMetaData(resultSet, connection) + : new PostgresResultIterator(resultSet, connection); } else { - return new PostgresResultIteratorWithBasicTypes(resultSet, DocumentType.FLAT); + return new PostgresResultIteratorWithBasicTypes(resultSet, connection, DocumentType.FLAT); } } catch (Exception e) { + if (connection != null) { + try { + connection.close(); + LOGGER.debug("Returned connection to pool after exception: {}", connection); + } catch (SQLException ex) { + LOGGER.error("Failed to return connection to pool after exception", ex); + } + } throw new UnsupportedOperationException(e); } } + // This overload accepts an externally-managed connection (e.g., for transactions) + // The connection is NOT passed to iterators since the caller manages its lifecycle protected CloseableIterator queryWithParser( Connection connection, org.hypertrace.core.documentstore.query.Query query, @@ -1292,6 +1306,12 @@ public PostgresResultIteratorWithBasicTypes(ResultSet resultSet, DocumentType do super(resultSet, documentType); } + // New constructors that accept connection + public PostgresResultIteratorWithBasicTypes( + ResultSet resultSet, Connection connection, DocumentType documentType) { + super(resultSet, connection, documentType); + } + @Override public Document next() { try { @@ -1439,6 +1459,7 @@ static class PostgresResultIterator implements CloseableIterator { protected final ObjectMapper MAPPER = new ObjectMapper(); protected ResultSet resultSet; + protected Connection connection; // Hold reference to connection for cleanup protected boolean cursorMovedForward = false; protected boolean hasNext = false; @@ -1446,20 +1467,34 @@ static class PostgresResultIterator implements CloseableIterator { protected DocumentType documentType; public PostgresResultIterator(ResultSet resultSet) { - this(resultSet, true); + this(resultSet, null, true, DocumentType.NESTED); } PostgresResultIterator(ResultSet resultSet, boolean removeDocumentId) { - this(resultSet, removeDocumentId, DocumentType.NESTED); + this(resultSet, null, removeDocumentId, DocumentType.NESTED); } public PostgresResultIterator(ResultSet resultSet, DocumentType documentType) { - this(resultSet, true, documentType); + this(resultSet, null, true, documentType); + } + + // New constructor that accepts connection + public PostgresResultIterator(ResultSet resultSet, Connection connection) { + this(resultSet, connection, true, DocumentType.NESTED); + } + + public PostgresResultIterator( + ResultSet resultSet, Connection connection, DocumentType documentType) { + this(resultSet, connection, true, documentType); } PostgresResultIterator( - ResultSet resultSet, boolean removeDocumentId, DocumentType documentType) { + ResultSet resultSet, + Connection connection, + boolean removeDocumentId, + DocumentType documentType) { this.resultSet = resultSet; + this.connection = connection; this.removeDocumentId = removeDocumentId; this.documentType = documentType; } @@ -1521,7 +1556,16 @@ protected void closeResultSet() { resultSet.close(); } } catch (SQLException ex) { - LOGGER.error("Unable to close connection", ex); + LOGGER.error("Unable to close resultSet", ex); + } + // Return pooled connection back to pool + if (connection != null) { + try { + connection.close(); + LOGGER.debug("Returned connection to pool: {}", connection); + } catch (SQLException ex) { + LOGGER.error("Unable to close/return connection to pool", ex); + } } } @@ -1538,11 +1582,15 @@ protected boolean shouldRemoveDocumentId() { static class PostgresResultIteratorWithMetaData extends PostgresResultIterator { public PostgresResultIteratorWithMetaData(ResultSet resultSet) { - super(resultSet, true); + super(resultSet, null, true, DocumentType.NESTED); } PostgresResultIteratorWithMetaData(ResultSet resultSet, boolean removeDocumentId) { - super(resultSet, removeDocumentId); + super(resultSet, null, removeDocumentId, DocumentType.NESTED); + } + + public PostgresResultIteratorWithMetaData(ResultSet resultSet, Connection connection) { + super(resultSet, connection, true, DocumentType.NESTED); } @Override diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPool.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPool.java index 6e470ff8..d11ef2e3 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPool.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPool.java @@ -18,29 +18,52 @@ @Slf4j class PostgresConnectionPool { + private static final String VALIDATION_QUERY = "SELECT 1"; private static final Duration VALIDATION_QUERY_TIMEOUT = Duration.ofSeconds(5); - private final PoolingDataSource dataSource; + // data source for pools with auto-commits enabled + private final PoolingDataSource regularDataSource; + // data source for pools with auto-commits disabled. This is used for transactional operations + // that manage their own commit logic + private final PoolingDataSource transactionalDataSource; PostgresConnectionPool(final PostgresConnectionConfig config) { - this.dataSource = createPooledDataSource(config); + this.regularDataSource = createPooledDataSource(config, true); + this.transactionalDataSource = createPooledDataSource(config, false); } + /** + * Get a connection from the regular pool with autoCommit=true. Use for read-only queries that + * don't need manual transaction management. + */ public Connection getConnection() throws SQLException { - return dataSource.getConnection(); + return regularDataSource.getConnection(); + } + + /** + * Get a connection from the transactional pool with autoCommit=false. Use for operations that + * require manual transaction management (commit/rollback). + */ + public Connection getTransactionalConnection() throws SQLException { + return transactionalDataSource.getConnection(); } public void close() { try { - dataSource.close(); + regularDataSource.close(); + } catch (final SQLException e) { + log.warn("Unable to close regular Postgres connection pool", e); + } + try { + transactionalDataSource.close(); } catch (final SQLException e) { - log.warn("Unable to close Postgres connection pool", e); + log.warn("Unable to close transactional Postgres connection pool", e); } } private PoolingDataSource createPooledDataSource( - final PostgresConnectionConfig config) { + final PostgresConnectionConfig config, final boolean autoCommit) { final ConnectionFactory connectionFactory = new DriverManagerConnectionFactory(config.toConnectionString(), config.buildProperties()); final PoolableConnectionFactory poolableConnectionFactory = @@ -50,7 +73,7 @@ private PoolingDataSource createPooledDataSource( final ConnectionPoolConfig poolConfig = config.connectionPoolConfig(); setPoolProperties(connectionPool, poolConfig); - setFactoryProperties(poolableConnectionFactory, connectionPool); + setFactoryProperties(poolableConnectionFactory, connectionPool, autoCommit); return new PoolingDataSource<>(connectionPool); } @@ -72,12 +95,13 @@ private void setPoolProperties( private void setFactoryProperties( PoolableConnectionFactory poolableConnectionFactory, - GenericObjectPool connectionPool) { + GenericObjectPool connectionPool, + boolean autoCommit) { poolableConnectionFactory.setPool(connectionPool); poolableConnectionFactory.setValidationQuery(VALIDATION_QUERY); poolableConnectionFactory.setValidationQueryTimeout((int) VALIDATION_QUERY_TIMEOUT.toSeconds()); poolableConnectionFactory.setDefaultReadOnly(false); - poolableConnectionFactory.setDefaultAutoCommit(false); + poolableConnectionFactory.setDefaultAutoCommit(autoCommit); poolableConnectionFactory.setDefaultTransactionIsolation(TRANSACTION_READ_COMMITTED); poolableConnectionFactory.setPoolStatements(false); } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java index 7b1928e9..f4388538 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java @@ -64,12 +64,15 @@ @ExtendWith(MockitoExtension.class) class PostgresCollectionTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String COLLECTION_NAME = "test_collection"; private static final long currentTime = 1658956123L; @Mock private PostgresClient mockClient; @Mock private Connection mockConnection; + @Mock private Connection mockPooledConnection; + @Mock private Connection mockTransactionalConnection; @Mock private PreparedStatement mockSelectPreparedStatement; @Mock private PreparedStatement mockUpdatePreparedStatement; @Mock private ResultSet mockResultSet; @@ -144,8 +147,9 @@ void testUpdateAtomicWithFilter() throws IOException, SQLException { + "LIMIT 1 " + "FOR UPDATE", COLLECTION_NAME); - when(mockClient.getPooledConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); + when(mockClient.getTransactionalConnection()).thenReturn(mockTransactionalConnection); + when(mockTransactionalConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(true); when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); @@ -182,7 +186,8 @@ void testUpdateAtomicWithFilter() throws IOException, SQLException { + "FROM concatenated " + "WHERE \"%s\".id=concatenated.id", COLLECTION_NAME, COLLECTION_NAME, COLLECTION_NAME); - when(mockConnection.prepareStatement(updateQuery)).thenReturn(mockUpdatePreparedStatement); + when(mockTransactionalConnection.prepareStatement(updateQuery)) + .thenReturn(mockUpdatePreparedStatement); when(mockClock.millis()).thenReturn(currentTime); @@ -194,13 +199,13 @@ void testUpdateAtomicWithFilter() throws IOException, SQLException { assertEquals(document, oldDocument.get()); assertEquals(DocumentType.NESTED, document.getDocumentType()); - verify(mockClient, times(1)).getPooledConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + verify(mockClient, times(1)).getTransactionalConnection(); + verify(mockTransactionalConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); - verify(mockConnection, times(1)).prepareStatement(updateQuery); + verify(mockTransactionalConnection, times(1)).prepareStatement(updateQuery); verify(mockUpdatePreparedStatement).setObject(1, "{lastUpdatedTime}"); verify(mockUpdatePreparedStatement).setObject(2, currentTime); @@ -212,13 +217,13 @@ void testUpdateAtomicWithFilter() throws IOException, SQLException { verify(mockUpdatePreparedStatement).setObject(8, "2022-08-09T18:53:17Z"); verify(mockUpdatePreparedStatement).setObject(9, id); // Ensure the transaction is committed - verify(mockConnection, times(1)).commit(); + verify(mockTransactionalConnection, times(1)).commit(); // Ensure the resources are closed verify(mockResultSet, times(1)).close(); verify(mockSelectPreparedStatement, times(1)).close(); verify(mockUpdatePreparedStatement, times(1)).close(); - verify(mockConnection, times(1)).close(); + verify(mockTransactionalConnection, times(1)).close(); } @Test @@ -246,8 +251,9 @@ void testUpdateAtomicWithFilter_getNone() throws IOException, SQLException { + "FOR UPDATE", COLLECTION_NAME); - when(mockClient.getPooledConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); + when(mockClient.getTransactionalConnection()).thenReturn(mockTransactionalConnection); + when(mockTransactionalConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(true); when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); @@ -282,7 +288,8 @@ void testUpdateAtomicWithFilter_getNone() throws IOException, SQLException { + "FROM concatenated " + "WHERE \"%s\".id=concatenated.id", COLLECTION_NAME, COLLECTION_NAME, COLLECTION_NAME); - when(mockConnection.prepareStatement(updateQuery)).thenReturn(mockUpdatePreparedStatement); + when(mockTransactionalConnection.prepareStatement(updateQuery)) + .thenReturn(mockUpdatePreparedStatement); when(mockClock.millis()).thenReturn(currentTime); @@ -292,13 +299,13 @@ void testUpdateAtomicWithFilter_getNone() throws IOException, SQLException { assertFalse(oldDocument.isPresent()); - verify(mockClient, times(1)).getPooledConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + verify(mockClient, times(1)).getTransactionalConnection(); + verify(mockTransactionalConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); - verify(mockConnection, times(1)).prepareStatement(updateQuery); + verify(mockTransactionalConnection, times(1)).prepareStatement(updateQuery); verify(mockUpdatePreparedStatement).setObject(1, "{lastUpdatedTime}"); verify(mockUpdatePreparedStatement).setObject(2, currentTime); @@ -310,13 +317,13 @@ void testUpdateAtomicWithFilter_getNone() throws IOException, SQLException { verify(mockUpdatePreparedStatement).setObject(8, "2022-08-09T18:53:17Z"); verify(mockUpdatePreparedStatement).setObject(9, id); // Ensure the transaction is committed - verify(mockConnection, times(1)).commit(); + verify(mockTransactionalConnection, times(1)).commit(); // Ensure the resources are closed verify(mockResultSet, times(1)).close(); verify(mockSelectPreparedStatement, times(1)).close(); verify(mockUpdatePreparedStatement, times(1)).close(); - verify(mockConnection, times(1)).close(); + verify(mockTransactionalConnection, times(1)).close(); } @Test @@ -341,8 +348,9 @@ void testUpdateAtomicWithFilter_emptyResults() throws IOException, SQLException + "LIMIT 1 " + "FOR UPDATE", COLLECTION_NAME); - when(mockClient.getPooledConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); + when(mockClient.getTransactionalConnection()).thenReturn(mockTransactionalConnection); + when(mockTransactionalConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(false); when(mockResultSet.isClosed()).thenReturn(false, true); @@ -353,19 +361,19 @@ void testUpdateAtomicWithFilter_emptyResults() throws IOException, SQLException assertTrue(oldDocument.isEmpty()); - verify(mockClient, times(1)).getPooledConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + verify(mockClient, times(1)).getTransactionalConnection(); + verify(mockTransactionalConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); // Ensure the transaction is committed - verify(mockConnection, times(1)).commit(); + verify(mockTransactionalConnection, times(1)).commit(); // Ensure the resources are closed verify(mockResultSet, times(1)).close(); verify(mockSelectPreparedStatement, times(1)).close(); - verify(mockConnection, times(1)).close(); + verify(mockTransactionalConnection, times(1)).close(); } @Test @@ -392,13 +400,6 @@ void testUpdateAtomicWithFilter_throwsException() throws Exception { + "LIMIT 1 " + "FOR UPDATE", COLLECTION_NAME); - when(mockClient.getPooledConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); - when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); - when(mockResultSet.next()).thenReturn(true); - when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); - mockResultSetMetadata(id); - final String updateQuery = String.format( "WITH concatenated AS " @@ -424,7 +425,17 @@ void testUpdateAtomicWithFilter_throwsException() throws Exception { + "FROM concatenated " + "WHERE \"%s\".id=concatenated.id", COLLECTION_NAME, COLLECTION_NAME, COLLECTION_NAME); - when(mockConnection.prepareStatement(updateQuery)).thenReturn(mockUpdatePreparedStatement); + + when(mockClient.getTransactionalConnection()).thenReturn(mockTransactionalConnection); + when(mockTransactionalConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); + when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); + when(mockResultSet.next()).thenReturn(true); + when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); + mockResultSetMetadata(id); + + when(mockTransactionalConnection.prepareStatement(updateQuery)) + .thenReturn(mockUpdatePreparedStatement); when(mockClock.millis()).thenReturn(currentTime); @@ -436,13 +447,13 @@ void testUpdateAtomicWithFilter_throwsException() throws Exception { postgresCollection.update( query, updates, UpdateOptions.builder().returnDocumentType(BEFORE_UPDATE).build())); - verify(mockClient, times(1)).getPooledConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + verify(mockClient, times(1)).getTransactionalConnection(); + verify(mockTransactionalConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); - verify(mockConnection, times(1)).prepareStatement(updateQuery); + verify(mockTransactionalConnection, times(1)).prepareStatement(updateQuery); verify(mockUpdatePreparedStatement).setObject(1, "{lastUpdatedTime}"); verify(mockUpdatePreparedStatement).setObject(2, currentTime); @@ -455,13 +466,13 @@ void testUpdateAtomicWithFilter_throwsException() throws Exception { verify(mockUpdatePreparedStatement).setObject(9, id); // Ensure the transaction is rolled back - verify(mockConnection, times(1)).rollback(); + verify(mockTransactionalConnection, times(1)).rollback(); // Ensure the resources are closed verify(mockResultSet, times(1)).close(); verify(mockSelectPreparedStatement, times(1)).close(); verify(mockUpdatePreparedStatement, times(1)).close(); - verify(mockConnection, times(1)).close(); + verify(mockTransactionalConnection, times(1)).close(); } @Test @@ -496,7 +507,9 @@ void testUpdateBulkWithFilter() throws IOException, SQLException { COLLECTION_NAME); when(mockClient.getConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); + when(mockClient.getPooledConnection()).thenReturn(mockPooledConnection); + when(mockPooledConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(true).thenReturn(false); when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); @@ -551,8 +564,9 @@ void testUpdateBulkWithFilter() throws IOException, SQLException { assertFalse(oldDocument.hasNext()); // Obtain 2 connections: One for update and one for selecting - verify(mockClient, times(2)).getConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + verify(mockClient, times(1)).getPooledConnection(); + verify(mockClient, times(1)).getConnection(); + verify(mockPooledConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); @@ -666,7 +680,9 @@ void testUpdateBulkWithFilter_emptyResults() throws IOException, SQLException { COLLECTION_NAME); when(mockClient.getConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); + when(mockClient.getPooledConnection()).thenReturn(mockPooledConnection); + when(mockPooledConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(false); when(mockResultSet.isClosed()).thenReturn(false, true); @@ -713,9 +729,11 @@ void testUpdateBulkWithFilter_emptyResults() throws IOException, SQLException { assertFalse(oldDocument.hasNext()); - // Obtain 2 connections: One for update and one for selecting - verify(mockClient, times(2)).getConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + // Obtain 2 connections: One for update and one for selecting. SELECT obtains a pooled + // connection + verify(mockClient, times(1)).getPooledConnection(); + verify(mockClient, times(1)).getConnection(); + verify(mockPooledConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); @@ -759,7 +777,7 @@ void testUpdateBulkWithFilter_throwsExceptionBeforeUpdate() throws IOException, + "document->'date' DESC NULLS LAST", COLLECTION_NAME); - when(mockClient.getConnection()).thenReturn(mockConnection); + when(mockClient.getPooledConnection()).thenReturn(mockConnection); when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenThrow(SQLException.class); @@ -769,7 +787,7 @@ void testUpdateBulkWithFilter_throwsExceptionBeforeUpdate() throws IOException, postgresCollection.bulkUpdate( query, updates, UpdateOptions.builder().returnDocumentType(BEFORE_UPDATE).build())); - verify(mockClient, times(1)).getConnection(); + verify(mockClient, times(1)).getPooledConnection(); verify(mockConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); @@ -799,7 +817,9 @@ void testUpdateBulkWithFilter_throwsExceptionAfterUpdate() throws IOException, S COLLECTION_NAME); when(mockClient.getConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); + when(mockClient.getPooledConnection()).thenReturn(mockPooledConnection); + when(mockPooledConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenThrow(SQLException.class); final String updateQuery = @@ -845,8 +865,9 @@ void testUpdateBulkWithFilter_throwsExceptionAfterUpdate() throws IOException, S query, updates, UpdateOptions.builder().returnDocumentType(AFTER_UPDATE).build())); // Obtain 2 connections: One for update and one for selecting - verify(mockClient, times(2)).getConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + verify(mockClient, times(1)).getPooledConnection(); + verify(mockClient, times(1)).getConnection(); + verify(mockPooledConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery();