From a2a7fca6d99a80b0e05f5976a4173e612f494f2a Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 10 Nov 2025 12:25:57 +0530 Subject: [PATCH 01/10] WIP --- .../model/config/ConnectionPoolConfig.java | 4 +- .../postgres/PostgresCollection.java | 81 ++++++++++++++++--- .../postgres/PostgresQueryExecutor.java | 5 ++ 3 files changed, 77 insertions(+), 13 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionPoolConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionPoolConfig.java index e84f1462..028fdd48 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionPoolConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionPoolConfig.java @@ -14,10 +14,10 @@ @Accessors(fluent = true) @AllArgsConstructor(access = AccessLevel.PRIVATE) public class ConnectionPoolConfig { - @NonNull @Nonnegative @Builder.Default Integer maxConnections = 16; + @NonNull @Nonnegative @Builder.Default Integer maxConnections = 1600; // Time duration to wait for obtaining a connection from the pool - @NonNull @Builder.Default Duration connectionAccessTimeout = Duration.ofSeconds(10); + @NonNull @Builder.Default Duration connectionAccessTimeout = Duration.ofSeconds(100); // Time duration to wait for surrendering an idle connection back to the pool @NonNull @Builder.Default Duration connectionSurrenderTimeout = Duration.ofMinutes(5); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index bad07de5..2e96c4f0 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -842,24 +842,43 @@ protected long countWithParser( } } + // This overload obtains a pooled connection and passes it to the iterator + // The iterator is responsible for returning the connection to the pool when closed protected CloseableIterator queryWithParser( org.hypertrace.core.documentstore.query.Query query, org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser) { + Connection pooledConnection = null; try { - ResultSet resultSet = queryExecutor.execute(client.getConnection(), queryParser); + pooledConnection = client.getPooledConnection(); + // Set autocommit for read-only queries to avoid "idle in transaction" state + pooledConnection.setAutoCommit(true); + LOGGER.info("Executing query on connection: " + pooledConnection.toString()); + ResultSet resultSet = queryExecutor.execute(pooledConnection, queryParser); if (queryParser.getPgColTransformer().getDocumentType() == DocumentType.NESTED) { return !query.getSelections().isEmpty() - ? new PostgresResultIteratorWithMetaData(resultSet) - : new PostgresResultIterator(resultSet); + ? new PostgresResultIteratorWithMetaData(resultSet, pooledConnection) + : new PostgresResultIterator(resultSet, pooledConnection); } else { - return new PostgresResultIteratorWithBasicTypes(resultSet, DocumentType.FLAT); + return new PostgresResultIteratorWithBasicTypes( + resultSet, pooledConnection, DocumentType.FLAT); } } catch (Exception e) { + // If exception occurs before iterator is created, clean up connection immediately + if (pooledConnection != null) { + try { + pooledConnection.close(); + LOGGER.debug("Returned connection to pool after exception: {}", pooledConnection); + } catch (SQLException ex) { + LOGGER.error("Failed to return connection to pool after exception", ex); + } + } throw new UnsupportedOperationException(e); } } + // This overload accepts an externally-managed connection (e.g., for transactions) + // The connection is NOT passed to iterators since the caller manages its lifecycle protected CloseableIterator queryWithParser( Connection connection, org.hypertrace.core.documentstore.query.Query query, @@ -1292,6 +1311,12 @@ public PostgresResultIteratorWithBasicTypes(ResultSet resultSet, DocumentType do super(resultSet, documentType); } + // New constructors that accept connection + public PostgresResultIteratorWithBasicTypes( + ResultSet resultSet, Connection connection, DocumentType documentType) { + super(resultSet, connection, documentType); + } + @Override public Document next() { try { @@ -1439,6 +1464,7 @@ static class PostgresResultIterator implements CloseableIterator { protected final ObjectMapper MAPPER = new ObjectMapper(); protected ResultSet resultSet; + protected Connection connection; // Hold reference to connection for cleanup protected boolean cursorMovedForward = false; protected boolean hasNext = false; @@ -1446,20 +1472,34 @@ static class PostgresResultIterator implements CloseableIterator { protected DocumentType documentType; public PostgresResultIterator(ResultSet resultSet) { - this(resultSet, true); + this(resultSet, null, true, DocumentType.NESTED); } PostgresResultIterator(ResultSet resultSet, boolean removeDocumentId) { - this(resultSet, removeDocumentId, DocumentType.NESTED); + this(resultSet, null, removeDocumentId, DocumentType.NESTED); } public PostgresResultIterator(ResultSet resultSet, DocumentType documentType) { - this(resultSet, true, documentType); + this(resultSet, null, true, documentType); + } + + // New constructor that accepts connection + public PostgresResultIterator(ResultSet resultSet, Connection connection) { + this(resultSet, connection, true, DocumentType.NESTED); + } + + public PostgresResultIterator( + ResultSet resultSet, Connection connection, DocumentType documentType) { + this(resultSet, connection, true, documentType); } PostgresResultIterator( - ResultSet resultSet, boolean removeDocumentId, DocumentType documentType) { + ResultSet resultSet, + Connection connection, + boolean removeDocumentId, + DocumentType documentType) { this.resultSet = resultSet; + this.connection = connection; this.removeDocumentId = removeDocumentId; this.documentType = documentType; } @@ -1521,7 +1561,16 @@ protected void closeResultSet() { resultSet.close(); } } catch (SQLException ex) { - LOGGER.error("Unable to close connection", ex); + LOGGER.error("Unable to close resultSet", ex); + } + // Return pooled connection back to pool + if (connection != null) { + try { + connection.close(); // For pooled connections, close() returns them to the pool + LOGGER.debug("Returned connection to pool: {}", connection); + } catch (SQLException ex) { + LOGGER.error("Unable to close/return connection to pool", ex); + } } } @@ -1538,11 +1587,21 @@ protected boolean shouldRemoveDocumentId() { static class PostgresResultIteratorWithMetaData extends PostgresResultIterator { public PostgresResultIteratorWithMetaData(ResultSet resultSet) { - super(resultSet, true); + super(resultSet, null, true, DocumentType.NESTED); } PostgresResultIteratorWithMetaData(ResultSet resultSet, boolean removeDocumentId) { - super(resultSet, removeDocumentId); + super(resultSet, null, removeDocumentId, DocumentType.NESTED); + } + + // New constructor that accepts connection + public PostgresResultIteratorWithMetaData(ResultSet resultSet, Connection connection) { + super(resultSet, connection, true, DocumentType.NESTED); + } + + PostgresResultIteratorWithMetaData( + ResultSet resultSet, Connection connection, boolean removeDocumentId) { + super(resultSet, connection, removeDocumentId, DocumentType.NESTED); } @Override diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java index 507987b4..31400d12 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java @@ -25,6 +25,11 @@ static org.hypertrace.core.documentstore.query.Query transformAndLog( protected ResultSet execute(final Connection connection, PostgresQueryParser queryParser) throws SQLException { + // Artificial 10ms delay using pg_sleep + try (PreparedStatement sleepStatement = connection.prepareStatement("SELECT pg_sleep(0.05)")) { + sleepStatement.execute(); + } + final String sqlQuery = queryParser.parse(); final PreparedStatement preparedStatement = buildPreparedStatement(sqlQuery, queryParser.getParamsBuilder().build(), connection); From 47a0094a44da5015907ed758d17b4492e173df6a Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 10 Nov 2025 13:56:56 +0530 Subject: [PATCH 02/10] WIP --- .../core/documentstore/postgres/PostgresQueryExecutor.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java index 31400d12..507987b4 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresQueryExecutor.java @@ -25,11 +25,6 @@ static org.hypertrace.core.documentstore.query.Query transformAndLog( protected ResultSet execute(final Connection connection, PostgresQueryParser queryParser) throws SQLException { - // Artificial 10ms delay using pg_sleep - try (PreparedStatement sleepStatement = connection.prepareStatement("SELECT pg_sleep(0.05)")) { - sleepStatement.execute(); - } - final String sqlQuery = queryParser.parse(); final PreparedStatement preparedStatement = buildPreparedStatement(sqlQuery, queryParser.getParamsBuilder().build(), connection); From 7818d29690a96be1ecad774362c345b3fe765d71 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 10 Nov 2025 13:59:52 +0530 Subject: [PATCH 03/10] Remove inadvertent changes --- .../model/config/ConnectionPoolConfig.java | 4 +-- .../postgres/PostgresCollection.java | 30 +++++++------------ 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionPoolConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionPoolConfig.java index 028fdd48..e84f1462 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionPoolConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionPoolConfig.java @@ -14,10 +14,10 @@ @Accessors(fluent = true) @AllArgsConstructor(access = AccessLevel.PRIVATE) public class ConnectionPoolConfig { - @NonNull @Nonnegative @Builder.Default Integer maxConnections = 1600; + @NonNull @Nonnegative @Builder.Default Integer maxConnections = 16; // Time duration to wait for obtaining a connection from the pool - @NonNull @Builder.Default Duration connectionAccessTimeout = Duration.ofSeconds(100); + @NonNull @Builder.Default Duration connectionAccessTimeout = Duration.ofSeconds(10); // Time duration to wait for surrendering an idle connection back to the pool @NonNull @Builder.Default Duration connectionSurrenderTimeout = Duration.ofMinutes(5); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index 2e96c4f0..c569a506 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -847,28 +847,26 @@ protected long countWithParser( protected CloseableIterator queryWithParser( org.hypertrace.core.documentstore.query.Query query, org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser) { - Connection pooledConnection = null; + Connection connection = null; try { - pooledConnection = client.getPooledConnection(); - // Set autocommit for read-only queries to avoid "idle in transaction" state - pooledConnection.setAutoCommit(true); - LOGGER.info("Executing query on connection: " + pooledConnection.toString()); - ResultSet resultSet = queryExecutor.execute(pooledConnection, queryParser); + connection = client.getPooledConnection(); + connection.setAutoCommit(true); + + ResultSet resultSet = queryExecutor.execute(connection, queryParser); if (queryParser.getPgColTransformer().getDocumentType() == DocumentType.NESTED) { return !query.getSelections().isEmpty() - ? new PostgresResultIteratorWithMetaData(resultSet, pooledConnection) - : new PostgresResultIterator(resultSet, pooledConnection); + ? new PostgresResultIteratorWithMetaData(resultSet, connection) + : new PostgresResultIterator(resultSet, connection); } else { - return new PostgresResultIteratorWithBasicTypes( - resultSet, pooledConnection, DocumentType.FLAT); + return new PostgresResultIteratorWithBasicTypes(resultSet, connection, DocumentType.FLAT); } } catch (Exception e) { // If exception occurs before iterator is created, clean up connection immediately - if (pooledConnection != null) { + if (connection != null) { try { - pooledConnection.close(); - LOGGER.debug("Returned connection to pool after exception: {}", pooledConnection); + connection.close(); + LOGGER.debug("Returned connection to pool after exception: {}", connection); } catch (SQLException ex) { LOGGER.error("Failed to return connection to pool after exception", ex); } @@ -1594,16 +1592,10 @@ public PostgresResultIteratorWithMetaData(ResultSet resultSet) { super(resultSet, null, removeDocumentId, DocumentType.NESTED); } - // New constructor that accepts connection public PostgresResultIteratorWithMetaData(ResultSet resultSet, Connection connection) { super(resultSet, connection, true, DocumentType.NESTED); } - PostgresResultIteratorWithMetaData( - ResultSet resultSet, Connection connection, boolean removeDocumentId) { - super(resultSet, connection, removeDocumentId, DocumentType.NESTED); - } - @Override protected Document prepareDocument() throws SQLException, IOException { ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); From 9035ec28d1697a32c736f7ec33cd40261ccb238e Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 10 Nov 2025 15:10:57 +0530 Subject: [PATCH 04/10] Fixed failing test cases --- .../postgres/PostgresCollection.java | 9 +++-- .../postgres/PostgresCollectionTest.java | 35 ++++++++++++------- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index c569a506..33947258 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -829,9 +829,11 @@ protected long countWithParser( String subQuery = queryParser.parse(); String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) p(countWithParser)", subQuery); try { + Connection connection = client.getPooledConnection(); + connection.setAutoCommit(true); PreparedStatement preparedStatement = queryExecutor.buildPreparedStatement( - sqlQuery, queryParser.getParamsBuilder().build(), client.getConnection()); + sqlQuery, queryParser.getParamsBuilder().build(), connection); ResultSet resultSet = preparedStatement.executeQuery(); resultSet.next(); return resultSet.getLong(1); @@ -862,9 +864,10 @@ protected CloseableIterator queryWithParser( return new PostgresResultIteratorWithBasicTypes(resultSet, connection, DocumentType.FLAT); } } catch (Exception e) { - // If exception occurs before iterator is created, clean up connection immediately if (connection != null) { try { + // Reset autoCommit to pool default (false) before returning connection + connection.setAutoCommit(false); connection.close(); LOGGER.debug("Returned connection to pool after exception: {}", connection); } catch (SQLException ex) { @@ -1564,6 +1567,8 @@ protected void closeResultSet() { // Return pooled connection back to pool if (connection != null) { try { + // Reset autoCommit to pool default (false) before returning connection + connection.setAutoCommit(false); connection.close(); // For pooled connections, close() returns them to the pool LOGGER.debug("Returned connection to pool: {}", connection); } catch (SQLException ex) { diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java index 7b1928e9..3e914ae6 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java @@ -70,6 +70,7 @@ class PostgresCollectionTest { @Mock private PostgresClient mockClient; @Mock private Connection mockConnection; + @Mock private Connection mockPooledConnection; @Mock private PreparedStatement mockSelectPreparedStatement; @Mock private PreparedStatement mockUpdatePreparedStatement; @Mock private ResultSet mockResultSet; @@ -496,7 +497,9 @@ void testUpdateBulkWithFilter() throws IOException, SQLException { COLLECTION_NAME); when(mockClient.getConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); + when(mockClient.getPooledConnection()).thenReturn(mockPooledConnection); + when(mockPooledConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(true).thenReturn(false); when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); @@ -551,8 +554,9 @@ void testUpdateBulkWithFilter() throws IOException, SQLException { assertFalse(oldDocument.hasNext()); // Obtain 2 connections: One for update and one for selecting - verify(mockClient, times(2)).getConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + verify(mockClient, times(1)).getPooledConnection(); + verify(mockClient, times(1)).getConnection(); + verify(mockPooledConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); @@ -666,7 +670,9 @@ void testUpdateBulkWithFilter_emptyResults() throws IOException, SQLException { COLLECTION_NAME); when(mockClient.getConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); + when(mockClient.getPooledConnection()).thenReturn(mockPooledConnection); + when(mockPooledConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(false); when(mockResultSet.isClosed()).thenReturn(false, true); @@ -713,9 +719,11 @@ void testUpdateBulkWithFilter_emptyResults() throws IOException, SQLException { assertFalse(oldDocument.hasNext()); - // Obtain 2 connections: One for update and one for selecting - verify(mockClient, times(2)).getConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + // Obtain 2 connections: One for update and one for selecting. SELECT obtains a pooled + // connection + verify(mockClient, times(1)).getPooledConnection(); + verify(mockClient, times(1)).getConnection(); + verify(mockPooledConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); @@ -759,7 +767,7 @@ void testUpdateBulkWithFilter_throwsExceptionBeforeUpdate() throws IOException, + "document->'date' DESC NULLS LAST", COLLECTION_NAME); - when(mockClient.getConnection()).thenReturn(mockConnection); + when(mockClient.getPooledConnection()).thenReturn(mockConnection); when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenThrow(SQLException.class); @@ -769,7 +777,7 @@ void testUpdateBulkWithFilter_throwsExceptionBeforeUpdate() throws IOException, postgresCollection.bulkUpdate( query, updates, UpdateOptions.builder().returnDocumentType(BEFORE_UPDATE).build())); - verify(mockClient, times(1)).getConnection(); + verify(mockClient, times(1)).getPooledConnection(); verify(mockConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); @@ -799,7 +807,9 @@ void testUpdateBulkWithFilter_throwsExceptionAfterUpdate() throws IOException, S COLLECTION_NAME); when(mockClient.getConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); + when(mockClient.getPooledConnection()).thenReturn(mockPooledConnection); + when(mockPooledConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenThrow(SQLException.class); final String updateQuery = @@ -845,8 +855,9 @@ void testUpdateBulkWithFilter_throwsExceptionAfterUpdate() throws IOException, S query, updates, UpdateOptions.builder().returnDocumentType(AFTER_UPDATE).build())); // Obtain 2 connections: One for update and one for selecting - verify(mockClient, times(2)).getConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + verify(mockClient, times(1)).getPooledConnection(); + verify(mockClient, times(1)).getConnection(); + verify(mockPooledConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); From 8baf19a8253f2f12900ed34a81298daf683d2890 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 10 Nov 2025 15:45:23 +0530 Subject: [PATCH 05/10] Fix connection leak in PostgresCollection#countWithParser --- .../documentstore/DocStoreQueryV1Test.java | 3 ++- .../postgres/PostgresCollection.java | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java index effab741..54cb9c30 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java @@ -162,7 +162,8 @@ public static void init() throws IOException { Datastore mongoDatastore = DatastoreProvider.getDatastore("Mongo", config); System.out.println(mongoDatastore.listCollections()); - + // psql -U postgres -c "SELECT count(*) as conns, state FROM pg_stat_activity WHERE datname = + // '\''myTestFlat'\'' GROUP BY state;" postgres = new GenericContainer<>(DockerImageName.parse("postgres:13.1")) .withEnv("POSTGRES_PASSWORD", "postgres") diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index 33947258..7efeff1f 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -828,15 +828,18 @@ protected long countWithParser( org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser) { String subQuery = queryParser.parse(); String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) p(countWithParser)", subQuery); - try { - Connection connection = client.getPooledConnection(); + try (Connection connection = client.getPooledConnection()) { connection.setAutoCommit(true); - PreparedStatement preparedStatement = - queryExecutor.buildPreparedStatement( - sqlQuery, queryParser.getParamsBuilder().build(), connection); - ResultSet resultSet = preparedStatement.executeQuery(); - resultSet.next(); - return resultSet.getLong(1); + try (PreparedStatement preparedStatement = + queryExecutor.buildPreparedStatement( + sqlQuery, queryParser.getParamsBuilder().build(), connection); + ResultSet resultSet = preparedStatement.executeQuery()) { + resultSet.next(); + long count = resultSet.getLong(1); + // Reset autoCommit before returning connection to pool + connection.setAutoCommit(false); + return count; + } } catch (SQLException e) { LOGGER.error( "SQLException querying documents. Original query: {}, sql query: {}", query, sqlQuery, e); From 0f9208b57f7e565f55598dbdf804895fc10b2c30 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 10 Nov 2025 15:46:04 +0530 Subject: [PATCH 06/10] Remove comment --- .../org/hypertrace/core/documentstore/DocStoreQueryV1Test.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java index 54cb9c30..8a1c65ca 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java @@ -162,8 +162,6 @@ public static void init() throws IOException { Datastore mongoDatastore = DatastoreProvider.getDatastore("Mongo", config); System.out.println(mongoDatastore.listCollections()); - // psql -U postgres -c "SELECT count(*) as conns, state FROM pg_stat_activity WHERE datname = - // '\''myTestFlat'\'' GROUP BY state;" postgres = new GenericContainer<>(DockerImageName.parse("postgres:13.1")) .withEnv("POSTGRES_PASSWORD", "postgres") From d441bec31b2f536b01a3d1bfc408823965ac1124 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 12 Nov 2025 11:46:55 +0530 Subject: [PATCH 07/10] Create separate connection pools for transactions --- .../postgres/PostgresClient.java | 13 ++++++ .../postgres/PostgresCollection.java | 29 ++++--------- .../postgres/PostgresConnectionPool.java | 42 +++++++++++++++---- 3 files changed, 55 insertions(+), 29 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java index 7ce94ac2..fa6a12cf 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java @@ -33,6 +33,7 @@ public PostgresClient(final PostgresConnectionConfig config) { this.connectionPool = new PostgresConnectionPool(connectionConfig); } + // todo: Deprecate this method. All connections should be obtained from the connection pool. public synchronized Connection getConnection() { try { if (connection == null) { @@ -48,10 +49,22 @@ public synchronized Connection getConnection() { return connection; } + /** + * Get a pooled connection with autoCommit=true. Use for read queries that don't need manual + * transaction management. + */ public Connection getPooledConnection() throws SQLException { return connectionPool.getConnection(); } + /** + * Get a pooled connection with autoCommit=false. Use for operations that require manual + * transaction management (commit/rollback). + */ + public Connection getTransactionalConnection() throws SQLException { + return connectionPool.getTransactionalConnection(); + } + public Map getCustomParameters() { return connectionConfig.customParameters(); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index 7efeff1f..890317c9 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -492,7 +492,7 @@ public Optional update( throws IOException { updateValidator.validate(updates); - try (final Connection connection = client.getPooledConnection()) { + try (final Connection connection = client.getTransactionalConnection()) { org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser parser = new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser( tableIdentifier, query); @@ -828,18 +828,13 @@ protected long countWithParser( org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser) { String subQuery = queryParser.parse(); String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) p(countWithParser)", subQuery); - try (Connection connection = client.getPooledConnection()) { - connection.setAutoCommit(true); - try (PreparedStatement preparedStatement = - queryExecutor.buildPreparedStatement( - sqlQuery, queryParser.getParamsBuilder().build(), connection); - ResultSet resultSet = preparedStatement.executeQuery()) { - resultSet.next(); - long count = resultSet.getLong(1); - // Reset autoCommit before returning connection to pool - connection.setAutoCommit(false); - return count; - } + try (Connection connection = client.getPooledConnection(); + PreparedStatement preparedStatement = + queryExecutor.buildPreparedStatement( + sqlQuery, queryParser.getParamsBuilder().build(), connection); + ResultSet resultSet = preparedStatement.executeQuery()) { + resultSet.next(); + return resultSet.getLong(1); } catch (SQLException e) { LOGGER.error( "SQLException querying documents. Original query: {}, sql query: {}", query, sqlQuery, e); @@ -855,8 +850,6 @@ protected CloseableIterator queryWithParser( Connection connection = null; try { connection = client.getPooledConnection(); - connection.setAutoCommit(true); - ResultSet resultSet = queryExecutor.execute(connection, queryParser); if (queryParser.getPgColTransformer().getDocumentType() == DocumentType.NESTED) { @@ -869,8 +862,6 @@ protected CloseableIterator queryWithParser( } catch (Exception e) { if (connection != null) { try { - // Reset autoCommit to pool default (false) before returning connection - connection.setAutoCommit(false); connection.close(); LOGGER.debug("Returned connection to pool after exception: {}", connection); } catch (SQLException ex) { @@ -1570,9 +1561,7 @@ protected void closeResultSet() { // Return pooled connection back to pool if (connection != null) { try { - // Reset autoCommit to pool default (false) before returning connection - connection.setAutoCommit(false); - connection.close(); // For pooled connections, close() returns them to the pool + connection.close(); LOGGER.debug("Returned connection to pool: {}", connection); } catch (SQLException ex) { LOGGER.error("Unable to close/return connection to pool", ex); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPool.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPool.java index 6e470ff8..d11ef2e3 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPool.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPool.java @@ -18,29 +18,52 @@ @Slf4j class PostgresConnectionPool { + private static final String VALIDATION_QUERY = "SELECT 1"; private static final Duration VALIDATION_QUERY_TIMEOUT = Duration.ofSeconds(5); - private final PoolingDataSource dataSource; + // data source for pools with auto-commits enabled + private final PoolingDataSource regularDataSource; + // data source for pools with auto-commits disabled. This is used for transactional operations + // that manage their own commit logic + private final PoolingDataSource transactionalDataSource; PostgresConnectionPool(final PostgresConnectionConfig config) { - this.dataSource = createPooledDataSource(config); + this.regularDataSource = createPooledDataSource(config, true); + this.transactionalDataSource = createPooledDataSource(config, false); } + /** + * Get a connection from the regular pool with autoCommit=true. Use for read-only queries that + * don't need manual transaction management. + */ public Connection getConnection() throws SQLException { - return dataSource.getConnection(); + return regularDataSource.getConnection(); + } + + /** + * Get a connection from the transactional pool with autoCommit=false. Use for operations that + * require manual transaction management (commit/rollback). + */ + public Connection getTransactionalConnection() throws SQLException { + return transactionalDataSource.getConnection(); } public void close() { try { - dataSource.close(); + regularDataSource.close(); + } catch (final SQLException e) { + log.warn("Unable to close regular Postgres connection pool", e); + } + try { + transactionalDataSource.close(); } catch (final SQLException e) { - log.warn("Unable to close Postgres connection pool", e); + log.warn("Unable to close transactional Postgres connection pool", e); } } private PoolingDataSource createPooledDataSource( - final PostgresConnectionConfig config) { + final PostgresConnectionConfig config, final boolean autoCommit) { final ConnectionFactory connectionFactory = new DriverManagerConnectionFactory(config.toConnectionString(), config.buildProperties()); final PoolableConnectionFactory poolableConnectionFactory = @@ -50,7 +73,7 @@ private PoolingDataSource createPooledDataSource( final ConnectionPoolConfig poolConfig = config.connectionPoolConfig(); setPoolProperties(connectionPool, poolConfig); - setFactoryProperties(poolableConnectionFactory, connectionPool); + setFactoryProperties(poolableConnectionFactory, connectionPool, autoCommit); return new PoolingDataSource<>(connectionPool); } @@ -72,12 +95,13 @@ private void setPoolProperties( private void setFactoryProperties( PoolableConnectionFactory poolableConnectionFactory, - GenericObjectPool connectionPool) { + GenericObjectPool connectionPool, + boolean autoCommit) { poolableConnectionFactory.setPool(connectionPool); poolableConnectionFactory.setValidationQuery(VALIDATION_QUERY); poolableConnectionFactory.setValidationQueryTimeout((int) VALIDATION_QUERY_TIMEOUT.toSeconds()); poolableConnectionFactory.setDefaultReadOnly(false); - poolableConnectionFactory.setDefaultAutoCommit(false); + poolableConnectionFactory.setDefaultAutoCommit(autoCommit); poolableConnectionFactory.setDefaultTransactionIsolation(TRANSACTION_READ_COMMITTED); poolableConnectionFactory.setPoolStatements(false); } From 766ddc7dd3ff0a2d355d7c58a8aaa1aa5baf6698 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 12 Nov 2025 12:08:16 +0530 Subject: [PATCH 08/10] Fix failing test case --- .../postgres/PostgresCollectionTest.java | 80 +++++++++++-------- 1 file changed, 45 insertions(+), 35 deletions(-) diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java index 3e914ae6..f4388538 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java @@ -64,6 +64,7 @@ @ExtendWith(MockitoExtension.class) class PostgresCollectionTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String COLLECTION_NAME = "test_collection"; private static final long currentTime = 1658956123L; @@ -71,6 +72,7 @@ class PostgresCollectionTest { @Mock private PostgresClient mockClient; @Mock private Connection mockConnection; @Mock private Connection mockPooledConnection; + @Mock private Connection mockTransactionalConnection; @Mock private PreparedStatement mockSelectPreparedStatement; @Mock private PreparedStatement mockUpdatePreparedStatement; @Mock private ResultSet mockResultSet; @@ -145,8 +147,9 @@ void testUpdateAtomicWithFilter() throws IOException, SQLException { + "LIMIT 1 " + "FOR UPDATE", COLLECTION_NAME); - when(mockClient.getPooledConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); + when(mockClient.getTransactionalConnection()).thenReturn(mockTransactionalConnection); + when(mockTransactionalConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(true); when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); @@ -183,7 +186,8 @@ void testUpdateAtomicWithFilter() throws IOException, SQLException { + "FROM concatenated " + "WHERE \"%s\".id=concatenated.id", COLLECTION_NAME, COLLECTION_NAME, COLLECTION_NAME); - when(mockConnection.prepareStatement(updateQuery)).thenReturn(mockUpdatePreparedStatement); + when(mockTransactionalConnection.prepareStatement(updateQuery)) + .thenReturn(mockUpdatePreparedStatement); when(mockClock.millis()).thenReturn(currentTime); @@ -195,13 +199,13 @@ void testUpdateAtomicWithFilter() throws IOException, SQLException { assertEquals(document, oldDocument.get()); assertEquals(DocumentType.NESTED, document.getDocumentType()); - verify(mockClient, times(1)).getPooledConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + verify(mockClient, times(1)).getTransactionalConnection(); + verify(mockTransactionalConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); - verify(mockConnection, times(1)).prepareStatement(updateQuery); + verify(mockTransactionalConnection, times(1)).prepareStatement(updateQuery); verify(mockUpdatePreparedStatement).setObject(1, "{lastUpdatedTime}"); verify(mockUpdatePreparedStatement).setObject(2, currentTime); @@ -213,13 +217,13 @@ void testUpdateAtomicWithFilter() throws IOException, SQLException { verify(mockUpdatePreparedStatement).setObject(8, "2022-08-09T18:53:17Z"); verify(mockUpdatePreparedStatement).setObject(9, id); // Ensure the transaction is committed - verify(mockConnection, times(1)).commit(); + verify(mockTransactionalConnection, times(1)).commit(); // Ensure the resources are closed verify(mockResultSet, times(1)).close(); verify(mockSelectPreparedStatement, times(1)).close(); verify(mockUpdatePreparedStatement, times(1)).close(); - verify(mockConnection, times(1)).close(); + verify(mockTransactionalConnection, times(1)).close(); } @Test @@ -247,8 +251,9 @@ void testUpdateAtomicWithFilter_getNone() throws IOException, SQLException { + "FOR UPDATE", COLLECTION_NAME); - when(mockClient.getPooledConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); + when(mockClient.getTransactionalConnection()).thenReturn(mockTransactionalConnection); + when(mockTransactionalConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(true); when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); @@ -283,7 +288,8 @@ void testUpdateAtomicWithFilter_getNone() throws IOException, SQLException { + "FROM concatenated " + "WHERE \"%s\".id=concatenated.id", COLLECTION_NAME, COLLECTION_NAME, COLLECTION_NAME); - when(mockConnection.prepareStatement(updateQuery)).thenReturn(mockUpdatePreparedStatement); + when(mockTransactionalConnection.prepareStatement(updateQuery)) + .thenReturn(mockUpdatePreparedStatement); when(mockClock.millis()).thenReturn(currentTime); @@ -293,13 +299,13 @@ void testUpdateAtomicWithFilter_getNone() throws IOException, SQLException { assertFalse(oldDocument.isPresent()); - verify(mockClient, times(1)).getPooledConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + verify(mockClient, times(1)).getTransactionalConnection(); + verify(mockTransactionalConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); - verify(mockConnection, times(1)).prepareStatement(updateQuery); + verify(mockTransactionalConnection, times(1)).prepareStatement(updateQuery); verify(mockUpdatePreparedStatement).setObject(1, "{lastUpdatedTime}"); verify(mockUpdatePreparedStatement).setObject(2, currentTime); @@ -311,13 +317,13 @@ void testUpdateAtomicWithFilter_getNone() throws IOException, SQLException { verify(mockUpdatePreparedStatement).setObject(8, "2022-08-09T18:53:17Z"); verify(mockUpdatePreparedStatement).setObject(9, id); // Ensure the transaction is committed - verify(mockConnection, times(1)).commit(); + verify(mockTransactionalConnection, times(1)).commit(); // Ensure the resources are closed verify(mockResultSet, times(1)).close(); verify(mockSelectPreparedStatement, times(1)).close(); verify(mockUpdatePreparedStatement, times(1)).close(); - verify(mockConnection, times(1)).close(); + verify(mockTransactionalConnection, times(1)).close(); } @Test @@ -342,8 +348,9 @@ void testUpdateAtomicWithFilter_emptyResults() throws IOException, SQLException + "LIMIT 1 " + "FOR UPDATE", COLLECTION_NAME); - when(mockClient.getPooledConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); + when(mockClient.getTransactionalConnection()).thenReturn(mockTransactionalConnection); + when(mockTransactionalConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(false); when(mockResultSet.isClosed()).thenReturn(false, true); @@ -354,19 +361,19 @@ void testUpdateAtomicWithFilter_emptyResults() throws IOException, SQLException assertTrue(oldDocument.isEmpty()); - verify(mockClient, times(1)).getPooledConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + verify(mockClient, times(1)).getTransactionalConnection(); + verify(mockTransactionalConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); // Ensure the transaction is committed - verify(mockConnection, times(1)).commit(); + verify(mockTransactionalConnection, times(1)).commit(); // Ensure the resources are closed verify(mockResultSet, times(1)).close(); verify(mockSelectPreparedStatement, times(1)).close(); - verify(mockConnection, times(1)).close(); + verify(mockTransactionalConnection, times(1)).close(); } @Test @@ -393,13 +400,6 @@ void testUpdateAtomicWithFilter_throwsException() throws Exception { + "LIMIT 1 " + "FOR UPDATE", COLLECTION_NAME); - when(mockClient.getPooledConnection()).thenReturn(mockConnection); - when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); - when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); - when(mockResultSet.next()).thenReturn(true); - when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); - mockResultSetMetadata(id); - final String updateQuery = String.format( "WITH concatenated AS " @@ -425,7 +425,17 @@ void testUpdateAtomicWithFilter_throwsException() throws Exception { + "FROM concatenated " + "WHERE \"%s\".id=concatenated.id", COLLECTION_NAME, COLLECTION_NAME, COLLECTION_NAME); - when(mockConnection.prepareStatement(updateQuery)).thenReturn(mockUpdatePreparedStatement); + + when(mockClient.getTransactionalConnection()).thenReturn(mockTransactionalConnection); + when(mockTransactionalConnection.prepareStatement(selectQuery)) + .thenReturn(mockSelectPreparedStatement); + when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); + when(mockResultSet.next()).thenReturn(true); + when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); + mockResultSetMetadata(id); + + when(mockTransactionalConnection.prepareStatement(updateQuery)) + .thenReturn(mockUpdatePreparedStatement); when(mockClock.millis()).thenReturn(currentTime); @@ -437,13 +447,13 @@ void testUpdateAtomicWithFilter_throwsException() throws Exception { postgresCollection.update( query, updates, UpdateOptions.builder().returnDocumentType(BEFORE_UPDATE).build())); - verify(mockClient, times(1)).getPooledConnection(); - verify(mockConnection, times(1)).prepareStatement(selectQuery); + verify(mockClient, times(1)).getTransactionalConnection(); + verify(mockTransactionalConnection, times(1)).prepareStatement(selectQuery); verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap"); verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z"); verify(mockSelectPreparedStatement, times(1)).executeQuery(); - verify(mockConnection, times(1)).prepareStatement(updateQuery); + verify(mockTransactionalConnection, times(1)).prepareStatement(updateQuery); verify(mockUpdatePreparedStatement).setObject(1, "{lastUpdatedTime}"); verify(mockUpdatePreparedStatement).setObject(2, currentTime); @@ -456,13 +466,13 @@ void testUpdateAtomicWithFilter_throwsException() throws Exception { verify(mockUpdatePreparedStatement).setObject(9, id); // Ensure the transaction is rolled back - verify(mockConnection, times(1)).rollback(); + verify(mockTransactionalConnection, times(1)).rollback(); // Ensure the resources are closed verify(mockResultSet, times(1)).close(); verify(mockSelectPreparedStatement, times(1)).close(); verify(mockUpdatePreparedStatement, times(1)).close(); - verify(mockConnection, times(1)).close(); + verify(mockTransactionalConnection, times(1)).close(); } @Test From ddca169954f7e62e339e59309c16649f8e773486 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 12 Nov 2025 12:34:03 +0530 Subject: [PATCH 09/10] Added PostgresConnectionPoolIntegrationTest.java --- ...PostgresConnectionPoolIntegrationTest.java | 276 ++++++++++++++++++ 1 file changed, 276 insertions(+) create mode 100644 document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPoolIntegrationTest.java diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPoolIntegrationTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPoolIntegrationTest.java new file mode 100644 index 00000000..63063086 --- /dev/null +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPoolIntegrationTest.java @@ -0,0 +1,276 @@ +package org.hypertrace.core.documentstore.postgres; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.hypertrace.core.documentstore.model.config.ConnectionConfig; +import org.hypertrace.core.documentstore.model.config.ConnectionCredentials; +import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig; +import org.hypertrace.core.documentstore.model.config.DatabaseType; +import org.hypertrace.core.documentstore.model.config.Endpoint; +import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class PostgresConnectionPoolIntegrationTest { + + private static GenericContainer postgres; + private static String host; + private static int port; + + @BeforeAll + public static void init() { + postgres = + new GenericContainer<>(DockerImageName.parse("postgres:13.1")) + .withEnv("POSTGRES_PASSWORD", "postgres") + .withEnv("POSTGRES_USER", "postgres") + .withExposedPorts(5432) + .waitingFor(Wait.forListeningPort()); + postgres.start(); + + host = postgres.getHost(); + port = postgres.getMappedPort(5432); + } + + @AfterAll + public static void shutdown() { + postgres.stop(); + } + + @Test + public void testGetConnection() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + try (final Connection connection = pool.getConnection()) { + assertNotNull(connection); + assertTrue(connection.getAutoCommit(), "Regular connection should have autoCommit=true"); + assertFalse(connection.isClosed()); + + // Verify the connection works by executing a simple query + try (final PreparedStatement stmt = connection.prepareStatement("SELECT 1"); + final ResultSet rs = stmt.executeQuery()) { + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + } + } + + pool.close(); + } + + @Test + public void testGetTransactionalConnection() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + try (final Connection connection = pool.getTransactionalConnection()) { + assertNotNull(connection); + assertFalse( + connection.getAutoCommit(), "Transactional connection should have autoCommit=false"); + assertFalse(connection.isClosed()); + + // Verify the connection works by executing a simple query + try (final PreparedStatement stmt = connection.prepareStatement("SELECT 2"); + final ResultSet rs = stmt.executeQuery()) { + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + } + + // Verify we can commit manually + connection.commit(); + } + + pool.close(); + } + + @Test + public void testBothPoolsIndependent() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + // Get connections from both pools simultaneously + try (final Connection regularConn = pool.getConnection(); + final Connection transactionalConn = pool.getTransactionalConnection()) { + + assertNotNull(regularConn); + assertNotNull(transactionalConn); + + // Verify they have different autoCommit settings + assertTrue(regularConn.getAutoCommit()); + assertFalse(transactionalConn.getAutoCommit()); + + // Both should work independently + try (final PreparedStatement stmt1 = regularConn.prepareStatement("SELECT 'regular'"); + final ResultSet rs1 = stmt1.executeQuery()) { + assertTrue(rs1.next()); + assertEquals("regular", rs1.getString(1)); + } + + try (final PreparedStatement stmt2 = + transactionalConn.prepareStatement("SELECT 'transactional'"); + final ResultSet rs2 = stmt2.executeQuery()) { + assertTrue(rs2.next()); + assertEquals("transactional", rs2.getString(1)); + } + + transactionalConn.commit(); + } + + pool.close(); + } + + @Test + public void testConnectionPooling() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + // Get and release connections multiple times + Connection conn1 = pool.getConnection(); + assertNotNull(conn1); + conn1.close(); + + Connection conn2 = pool.getConnection(); + assertNotNull(conn2); + conn2.close(); + + // Verify pooling is working by getting multiple connections from transactional pool + Connection tConn1 = pool.getTransactionalConnection(); + assertNotNull(tConn1); + tConn1.close(); + + Connection tConn2 = pool.getTransactionalConnection(); + assertNotNull(tConn2); + tConn2.close(); + + pool.close(); + } + + @Test + public void testTransactionalCommitAndRollback() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + // Create a test table + try (final Connection setupConn = pool.getTransactionalConnection()) { + try (final PreparedStatement stmt = + setupConn.prepareStatement( + "CREATE TABLE IF NOT EXISTS test_table (id INT PRIMARY KEY, value TEXT)")) { + stmt.execute(); + } + setupConn.commit(); + } + + // Test commit + try (final Connection conn = pool.getTransactionalConnection()) { + try (final PreparedStatement stmt = + conn.prepareStatement("INSERT INTO test_table (id, value) VALUES (1, 'test')")) { + stmt.execute(); + } + conn.commit(); + + // Verify data was committed + try (final PreparedStatement stmt = + conn.prepareStatement("SELECT value FROM test_table WHERE id = 1"); + final ResultSet rs = stmt.executeQuery()) { + assertTrue(rs.next()); + assertEquals("test", rs.getString(1)); + } + } + + // Test rollback + try (final Connection conn = pool.getTransactionalConnection()) { + try (final PreparedStatement stmt = + conn.prepareStatement("INSERT INTO test_table (id, value) VALUES (2, 'rollback_me')")) { + stmt.execute(); + } + conn.rollback(); + + // Verify data was not committed + try (final PreparedStatement stmt = + conn.prepareStatement("SELECT value FROM test_table WHERE id = 2"); + final ResultSet rs = stmt.executeQuery()) { + assertFalse(rs.next(), "Data should have been rolled back"); + } + } + + // Cleanup + try (final Connection cleanupConn = pool.getTransactionalConnection()) { + try (final PreparedStatement stmt = + cleanupConn.prepareStatement("DROP TABLE IF EXISTS test_table")) { + stmt.execute(); + } + cleanupConn.commit(); + } + + pool.close(); + } + + @Test + public void testClose() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + // Get connections to ensure pools are active + final Connection regularConnection = pool.getConnection(); + final Connection transactionalConnection = pool.getTransactionalConnection(); + + assertNotNull(regularConnection); + assertNotNull(transactionalConnection); + + // Close the connections back to the pool + regularConnection.close(); + transactionalConnection.close(); + + // Close the pool - should not throw + pool.close(); + + // After closing the pool, trying to get connections should fail with IllegalStateException + assertThrows(IllegalStateException.class, pool::getConnection); + assertThrows(IllegalStateException.class, pool::getTransactionalConnection); + } + + @Test + public void testCloseIdempotent() throws SQLException { + final PostgresConnectionConfig config = createTestConfig(); + final PostgresConnectionPool pool = new PostgresConnectionPool(config); + + // First close + pool.close(); + + // Second close should not throw + pool.close(); + } + + private static PostgresConnectionConfig createTestConfig() { + return (PostgresConnectionConfig) + ConnectionConfig.builder() + .type(DatabaseType.POSTGRES) + .addEndpoint(Endpoint.builder().host(host).port(port).build()) + .database("postgres") + .credentials( + ConnectionCredentials.builder().username("postgres").password("postgres").build()) + .connectionPoolConfig( + ConnectionPoolConfig.builder() + .maxConnections(5) + .connectionAccessTimeout(Duration.ofSeconds(10)) + .connectionSurrenderTimeout(Duration.ofSeconds(30)) + .build()) + .build(); + } +} From 750a6c0c2d8343713663a93ec97ac734d3295baf Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 12 Nov 2025 12:47:47 +0530 Subject: [PATCH 10/10] Spotless --- .../postgres/PostgresConnectionPoolIntegrationTest.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPoolIntegrationTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPoolIntegrationTest.java index 63063086..8c427be8 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPoolIntegrationTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresConnectionPoolIntegrationTest.java @@ -11,8 +11,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.Duration; -import java.util.HashMap; -import java.util.Map; import org.hypertrace.core.documentstore.model.config.ConnectionConfig; import org.hypertrace.core.documentstore.model.config.ConnectionCredentials; import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig; @@ -123,7 +121,7 @@ public void testBothPoolsIndependent() throws SQLException { } try (final PreparedStatement stmt2 = - transactionalConn.prepareStatement("SELECT 'transactional'"); + transactionalConn.prepareStatement("SELECT 'transactional'"); final ResultSet rs2 = stmt2.executeQuery()) { assertTrue(rs2.next()); assertEquals("transactional", rs2.getString(1)); @@ -186,7 +184,7 @@ public void testTransactionalCommitAndRollback() throws SQLException { // Verify data was committed try (final PreparedStatement stmt = - conn.prepareStatement("SELECT value FROM test_table WHERE id = 1"); + conn.prepareStatement("SELECT value FROM test_table WHERE id = 1"); final ResultSet rs = stmt.executeQuery()) { assertTrue(rs.next()); assertEquals("test", rs.getString(1)); @@ -203,7 +201,7 @@ public void testTransactionalCommitAndRollback() throws SQLException { // Verify data was not committed try (final PreparedStatement stmt = - conn.prepareStatement("SELECT value FROM test_table WHERE id = 2"); + conn.prepareStatement("SELECT value FROM test_table WHERE id = 2"); final ResultSet rs = stmt.executeQuery()) { assertFalse(rs.next(), "Data should have been rolled back"); }