Conversation
…ction is active.
WalkthroughAdds DB connection health checks before Slick writes, introduces UsingUtils for AutoCloseable management, adds retry/backoff for DB open, refactors PramenDb/RdbJdbc initialization into factory/using patterns, updates Slick utilities, and adapts tests to use the new factories and using pattern. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant PramenDb
participant JdbcUrlSelector
participant AlgorithmUtils
participant RdbJdbc
participant Database
Caller->>PramenDb: apply(jdbcConfig)
PramenDb->>JdbcUrlSelector: selectWorkingUrl(jdbcConfig)
PramenDb->>AlgorithmUtils: actionWithRetry(attempts, log, BACKOFF_MIN_MS, BACKOFF_MAX_MS)
AlgorithmUtils->>JdbcUrlSelector: attempt open working URL
JdbcUrlSelector->>RdbJdbc: RdbJdbc.apply(jdbcConfig) (create Connection)
RdbJdbc->>Database: Database.forURL(...) (may fail)
alt success
Database-->>PramenDb: Database + profile
PramenDb->>RdbJdbc: UsingUtils.using(RdbJdbc.connection) { setupDatabase(connection) }
RdbJdbc-->>PramenDb: resource closed by UsingUtils
PramenDb-->>Caller: initialized PramenDb
else failure & retries left
Database-->>AlgorithmUtils: throw
AlgorithmUtils->>AlgorithmUtils: sleep random backoff ms
AlgorithmUtils->>JdbcUrlSelector: retry next URL/attempt
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Fix all issues with AI agents
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala`:
- Around line 53-58: RdbJdbc currently wraps the shared PramenDb.jdbcConnection
and its close() both has a bug (uses if (connection.isClosed) ...) and will
prematurely close the shared connection when UsingUtils.using exits; fix by
either (1) in setupDatabase() open a fresh private Connection for RdbJdbc so
RdbJdbc manages/ closes its own connection, or (2) change RdbJdbc to accept a
“doNotClose” flag or a DoNotClose wrapper so close() does not close the passed
shared PramenDb.jdbcConnection, and also correct the logic in RdbJdbc.close() to
use if (!connection.isClosed) connection.close(); ensure PramenDb.close() still
closes the real shared connection exactly once.
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala`:
- Line 28: The close logic in class RdbJdbc is inverted and only calls
connection.close() when the connection is already closed, causing leaks; update
the RdbJdbc.close implementation (the AutoCloseable close method) to close the
connection when it is open (e.g., call connection.close() if
!connection.isClosed() or simply try to close and catch exceptions) and ensure
any exceptions are handled/logged and the connection reference is not left open.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala`:
- Around line 113-117: The backoff calculation in JdbcUrlSelectorImpl is wrong
and passes a negative bound to Random.nextInt causing an exception; update the
computation that sets backoffS (used in the retry block around
getNextUrl/currentUrl and retriesLeft) to use a positive range, e.g.
Random.nextInt(BACKOFF_MAX_S - BACKOFF_MIN_S + 1) + BACKOFF_MIN_S (or otherwise
ensure you compute max-min+1) before calling Thread.sleep, so the retry logic
and log (log.error(..., ex)) work correctly.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala`:
- Around line 92-101: The recursive retry call drops the backoff parameters and
misspells "seconds": ensure the recursive call to actionWithRetry forwards the
backoffMinMs and backoffMaxMs (so replace actionWithRetry(attemptsLeft,
log)(action) with a call that includes backoffMinMs and backoffMaxMs) and fix
the log message typo "seconnds" → "seconds"; reference the actionWithRetry
function, backoffMinMs, backoffMaxMs, log, getErrorMessage(ex) and attemptsLeft
when making the change.
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala`:
- Around line 46-55: The code calls ensureDbConnected(db) twice in executeQuery
and executeAction; remove the first pre-try invocation and keep a single
ensureDbConnected(db) inside the try block to avoid redundant connection checks
and extra roundtrips. Update executeQuery and executeAction to call
ensureDbConnected(db) only once (inside their try), preserving existing error
wrapping and logging around
db.run(action).execute()/db.run(action).andThen(...).execute() and ensure no
other logic depends on the removed pre-try call.
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/UsingUtils.scala`:
- Around line 33-40: The using[T <: AutoCloseable,U](resource: => T)(action: T
=> U) helper currently allows openedResource to be null despite the scaladoc;
add an explicit null check immediately after assigning openedResource and throw
a clear NPE (or IllegalArgumentException) with a descriptive message if
openedResource == null to fail fast; update any subsequent logic that assumes
openedResource is non-null (e.g., the try/catch/action(openedResource) and
finally/close handling) to rely on that guard so no confusing NPE occurs later.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala`:
- Around line 31-33: The RdbJdbc.close() method has inverted logic and never
closes the JDBC connection; update the implementation in RdbJdbc.close() to
close the connection when it is not already closed (e.g., change the condition
from if (connection.isClosed) to if (!connection.isClosed)) or simply call
connection.close() unconditionally with proper null/safety checks and exception
handling so calls like rdb.close() in BookkeeperJdbcSuite actually release the
resource.
♻️ Duplicate comments (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala (1)
34-36: Consistent pattern, but sameclose()bug applies.Same resource leak concern as noted in
BookkeeperJdbcSuite.scala- theRdbJdbc.close()method has inverted logic and won't close the connection.pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala (1)
35-37: Consistent pattern, but sameclose()bug applies.Same resource leak concern as noted in other test files - the
RdbJdbc.close()method has inverted logic.pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala (1)
49-51: Consistent pattern, but sameclose()bug applies.Same resource leak concern as noted in other test files - the
RdbJdbc.close()method has inverted logic and won't close the connection.
pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala
Outdated
Show resolved
Hide resolved
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala
Show resolved
Hide resolved
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala
Outdated
Show resolved
Hide resolved
| def executeQuery[E, U](db: Database, query: Query[E, U, Seq]): Seq[U] = { | ||
| ensureDbConnected(db) | ||
|
|
||
| val action = query.result | ||
| val sql = action.statements.mkString("; ") | ||
|
|
||
| try { | ||
| val start = Instant.now | ||
| ensureDbConnected(db) | ||
| val result = db.run(action).execute() |
There was a problem hiding this comment.
Avoid duplicate connection checks per call.
ensureDbConnected(db) is invoked twice in both executeQuery (Line 47 + Line 54) and executeAction (Line 82 + Line 88). This adds an extra roundtrip per call without extra safety. Keep a single check inside the try block for consistent error wrapping.
🔧 Proposed fix
def executeQuery[E, U](db: Database, query: Query[E, U, Seq]): Seq[U] = {
- ensureDbConnected(db)
-
val action = query.result
val sql = action.statements.mkString("; ")
try {
val start = Instant.now
ensureDbConnected(db)
@@
def executeAction[R, E <: Effect](db: Database, action: SqlAction[R, NoStream, E]): R = {
- ensureDbConnected(db)
-
val sql = action.statements.mkString("; ")
try {
val start = Instant.now
ensureDbConnected(db)Also applies to: 81-89
🤖 Prompt for AI Agents
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala`
around lines 46 - 55, The code calls ensureDbConnected(db) twice in executeQuery
and executeAction; remove the first pre-try invocation and keep a single
ensureDbConnected(db) inside the try block to avoid redundant connection checks
and extra roundtrips. Update executeQuery and executeAction to call
ensureDbConnected(db) only once (inside their try), preserving existing error
wrapping and logging around
db.run(action).execute()/db.run(action).andThen(...).execute() and ensure no
other logic depends on the removed pre-try call.
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/UsingUtils.scala
Show resolved
Hide resolved
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala
Outdated
Show resolved
Hide resolved
Unit Test Coverage
Files
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala (1)
62-66: Statement resource leak ifexecutethrows.If
statement.execute(ddl)throws an exception, the statement is never closed, leading to a resource leak. Use try-finally to ensure cleanup.🐛 Proposed fix
override def executeDDL(ddl: String): Unit = { val statement = connection.createStatement() - statement.execute(ddl) - statement.close() + try { + statement.execute(ddl) + } finally { + statement.close() + } }
🤖 Fix all issues with AI agents
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala`:
- Around line 92-101: Normalize negative backoff inputs before computing and
sleeping: in actionWithRetry when using backoffMinMs and backoffMaxMs, coerce
them to non‑negative (e.g., max(0, backoffMinMs) and max(0, backoffMaxMs)), then
check that normalized backoffMax > backoffMin before calling Random.nextInt and
Thread.sleep; if not, skip the randomized sleep path and use the fallback
"Retrying..." branch so Thread.sleep never receives a negative argument and the
original exception isn't masked (references: actionWithRetry, backoffMinMs,
backoffMaxMs, getErrorMessage).
♻️ Duplicate comments (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala (1)
115-117: Off-by-one remains: maximum backoff (10s) is never used.The critical negative-bound bug from the previous review is now fixed. However,
Random.nextInt(BACKOFF_MAX_S - BACKOFF_MIN_S)returns[0, 8], sobackoffSranges[1, 9]—the defined max of 10 seconds is never selected. Add+ 1insidenextIntfor an inclusive upper bound if intended.- val backoffS = Random.nextInt(BACKOFF_MAX_S - BACKOFF_MIN_S) + BACKOFF_MIN_S + val backoffS = Random.nextInt(BACKOFF_MAX_S - BACKOFF_MIN_S + 1) + BACKOFF_MIN_Spramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala (1)
80-102: Duplicate connection check inexecuteAction.
ensureDbConnected(db)is called twice—at line 81 (before try) and line 87 (inside try). This adds unnecessary overhead and is inconsistent with the other methods (executeQuery,executeCount,executeMaxString) which have only one call. Remove the duplicate and keep a single check either before the try (like other methods) or inside the try for consistent error handling.pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala (1)
95-95: Fix typo in log message (“seconnds”).The message still contains “seconnds”, which looks unpolished in logs.
🧹 Nitpick comments (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala (1)
29-31: Consider making backoff constants private or moving to companion object.These constants are implementation details of the retry logic. Making them
privatewould better encapsulate the class internals.- val BACKOFF_MIN_S = 1 - val BACKOFF_MAX_S = 10 + private val BACKOFF_MIN_S = 1 + private val BACKOFF_MAX_S = 10pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala (1)
179-189: Consider using a shorter timeout for the validity check.
conn.isValid()is typically a quick ping operation (e.g.,SELECT 1). UsingFutureImplicits.executionTimeout(likely designed for query execution) may be excessive. A dedicated shorter timeout (e.g., 5-10 seconds) would fail faster on truly broken connections while still allowing for minor network latency.♻️ Suggested refinement
+ private val CONNECTION_VALIDITY_TIMEOUT_SECONDS = 10 + def ensureDbConnected(db: Database): Unit = { val check = SimpleDBIO { ctx => val conn = ctx.connection - if (!conn.isValid(FutureImplicits.executionTimeout.toSeconds.toInt)) + if (!conn.isValid(CONNECTION_VALIDITY_TIMEOUT_SECONDS)) throw new RuntimeException("Connection not valid") } AlgorithmUtils.actionWithRetry(PramenDb.DEFAULT_RETRIES, log, PramenDb.BACKOFF_MIN_MS, PramenDb.BACKOFF_MAX_MS) { db.run(check).execute() } }pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala (1)
28-28: Minor formatting nit: missing space before opening brace.-class RdbJdbc(connection: Connection) extends AutoCloseable with Rdb{ +class RdbJdbc(connection: Connection) extends AutoCloseable with Rdb {
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala
Outdated
Show resolved
Hide resolved
dffa9da to
b6a9a9c
Compare
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala`:
- Around line 52-55: The code calls RdbJdbc(jdbcConfig) then passes
rdb.connection into pramenDb.setupDatabase and immediately calls rdb.close(),
risking a double-close; change the pattern to match other tests by removing the
explicit rdb.close() here (or, if setupDatabase does not close the connection,
ensure a single centralized close in a finally/loan pattern). Specifically
update the block around RdbJdbc(jdbcConfig), rdb.executeDDL(...),
pramenDb.setupDatabase(rdb.connection) to avoid closing the same connection
twice and follow the established connection management used elsewhere.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala`:
- Around line 35-38: The test currently constructs RdbJdbc (rdb), runs
rdb.executeDDL and pramenDb.setupDatabase and then calls rdb.close(), which can
lead to a double-close if setupDatabase also closes the connection; change the
pattern to mirror other tests by ensuring the RdbJdbc instance is closed in a
single, safe place—e.g., wrap usage of RdbJdbc (RdbJdbc(jdbcConfig)) around a
try/finally or similar resource block and remove any extra closes inside
pramenDb.setupDatabase invocation so only the outer rdb.close() runs; reference
RdbJdbc, rdb.executeDDL, pramenDb.setupDatabase and rdb.close when updating the
code.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala`:
- Around line 49-52: The test currently creates RdbJdbc via RdbJdbc(jdbcConfig),
calls rdb.executeDDL and then pramenDb.setupDatabase(rdb.connection) and then
rdb.close(), risking a double-close if setupDatabase closes the passed
connection; change the flow to obtain the Connection once and ensure a single
close: either call pramenDb.setupDatabase with a connection created separately
and only close that connection in a finally block, or call
pramenDb.setupDatabase on the RdbJdbc instance (if such an overload exists) and
defer rdb.close() until after setup; ensure only one close is performed on the
underlying connection by adjusting the order and using try/finally around
rdb.close() or the Connection.close().
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala`:
- Around line 37-40: The test shows ambiguous ownership of the JDBC connection
because pramenDb.setupDatabase(rdb.connection) may close the connection and then
the test calls rdb.close() again; remove the redundant rdb.close() call (or
alternatively change setupDatabase to not close the passed connection) so
ownership is clear — update the test to only call rdb.executeDDL(...) and
pramenDb.setupDatabase(rdb.connection) without calling rdb.close(), referencing
RdbJdbc, rdb.executeDDL, pramenDb.setupDatabase, and RdbJdbc.close/isClosed to
guide the change.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala`:
- Around line 34-37: Remove the redundant rdb.close() call after
pramenDb.setupDatabase(rdb.connection); the setupDatabase method takes ownership
of the provided connection (it wraps it in new RdbJdbc(...) inside
UsingUtils.using and will close it), so delete the rdb.close() line and leave
the rest (keep val rdb = RdbJdbc(jdbcConfig), the DROP DDL and the setupDatabase
call) unchanged to avoid double-closing the connection.
♻️ Duplicate comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala (1)
46-58: Clarify ownership of the passed JDBC connection.Line 52 wraps the provided Connection in
RdbJdbcinsideUsingUtils.using, which closes it. That can surprise callers and leads to a redundant close inPramenDb.apply. Consider leaving connection ownership to the caller (or explicitly documenting thatsetupDatabaseconsumes it). Please verify that no call site reuses the connection afterward.#!/bin/bash # List setupDatabase call sites to verify connection ownership expectations. rg -n "setupDatabase\(" --type=scala -C3pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala (1)
92-96: Fix typo in retry log message.Line 95 says “seconnds”.
✏️ Proposed fix
- log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying in $backoffS seconnds...") + log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying in $backoffS seconds...")
🧹 Nitpick comments (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala (1)
53-56: Ensure RdbJdbc closes even when setup fails.If
executeDDLorsetupDatabasethrows,rdb.close()won’t run. A simple try/finally avoids leaking the connection across tests.♻️ Suggested tweak
before { - val rdb = RdbJdbc(jdbcConfig) - rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - pramenDb.setupDatabase(rdb.connection) - rdb.close() + val rdb = RdbJdbc(jdbcConfig) + try { + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + pramenDb.setupDatabase(rdb.connection) + } finally { + rdb.close() + } RdbExampleTable.IncrementalTable.initTable(getConnection) }
.../core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala
Outdated
Show resolved
Hide resolved
pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala
Outdated
Show resolved
Hide resolved
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala
Outdated
Show resolved
Hide resolved
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala
Outdated
Show resolved
Hide resolved
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala`:
- Around line 50-53: The test reassigns pramenDb without closing the previous
instance, leaking the Slick connection pool; before creating a new
PramenDb(jdbcConfig) call close the existing pramenDb (call pramenDb.close()) if
it is non-null/defined so the previous Slick Database is shut down; update the
setup code surrounding UsingUtils.using(RdbJdbc(jdbcConfig)) { ... } and the
pramenDb = PramenDb(jdbcConfig) line to explicitly close the prior pramenDb
instance to prevent dangling connections.
♻️ Duplicate comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala (1)
115-117: Backoff range skipsBACKOFF_MAX_Sand can break if bounds converge.
Random.nextIntis exclusive of the upper bound, soBACKOFF_MAX_Sis never reached; if the bounds are ever equal, this will throw. Consider an inclusive range with a guard.🔧 Suggested fix
- val backoffS = Random.nextInt(BACKOFF_MAX_S - BACKOFF_MIN_S) + BACKOFF_MIN_S + val backoffS = + if (BACKOFF_MAX_S <= BACKOFF_MIN_S) BACKOFF_MIN_S + else Random.nextInt((BACKOFF_MAX_S - BACKOFF_MIN_S) + 1) + BACKOFF_MIN_S
🧹 Nitpick comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala (1)
69-81: Consider usingUsingUtils.usingfor statement management ingetDbVersion().
executeDDLusesUsingUtils.usingfor safe statement closure, butgetDbVersion()manually manages the statement. If an exception occurs betweencreateStatement()andstatement.close()(e.g., duringrs.next()orrs.getInt(1)), the statement may leak.♻️ Suggested refactor for consistency
private def getDbVersion(): Int = { - val statement = connection.createStatement() - val dbVersion = try { - val rs = statement.executeQuery(s"SELECT version FROM $dbVersionTableName;") - rs.next() - rs.getInt(1) - } catch { - case NonFatal(_) => 0 - } - - statement.close() - dbVersion + UsingUtils.using(connection.createStatement()) { statement => + try { + val rs = statement.executeQuery(s"SELECT version FROM $dbVersionTableName;") + rs.next() + rs.getInt(1) + } catch { + case NonFatal(_) => 0 + } + } }pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala (1)
194-200: Consider returning value fromactionWithRetryinstead of mutating external var.The current pattern uses a mutable
varthat's assigned inside the retry block. While functionally correct, a more idiomatic approach would be to haveactionWithRetryreturn the result directly.♻️ Optional: More idiomatic approach
- var database: JdbcBackend.DatabaseDef = null - AlgorithmUtils.actionWithRetry(numberOfAttempts, log, BACKOFF_MIN_MS, BACKOFF_MAX_MS) { - database = jdbcConfig.user match { + val database = AlgorithmUtils.actionWithRetry(numberOfAttempts, log, BACKOFF_MIN_MS, BACKOFF_MAX_MS) { + jdbcConfig.user match { case Some(user) => Database.forURL(url = workingUrl, driver = jdbcConfig.driver, user = user, password = jdbcConfig.password.getOrElse(""), prop = prop, executor = AsyncExecutor("Rdb", 2, 10)) case None => Database.forURL(url = workingUrl, driver = jdbcConfig.driver, prop = prop, executor = AsyncExecutor("Rdb", 2, 10)) } }This would require
actionWithRetryto return the result of the action block (if it doesn't already support this).
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala (1)
39-45: GuardpramenDb.close()against null in teardown.If
beforefails before initializingpramenDb,afterAllwill throw an NPE and can mask the real failure. A null-safe close keeps teardown resilient.Suggested fix
- override def afterAll(): Unit = { - pramenDb.close() - super.afterAll() - } + override def afterAll(): Unit = { + if (pramenDb != null) pramenDb.close() + super.afterAll() + }pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala (1)
45-47: GuardpramenDb.close()inafterAllto avoid NPE after setup failures.If
beforethrows,afterAllwill still run andpramenDbcan be null.🔧 Proposed change
override def afterAll(): Unit = { - pramenDb.close() + if (pramenDb != null) pramenDb.close() super.afterAll() }
🤖 Fix all issues with AI agents
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala`:
- Around line 37-42: The DROP SCHEMA runs while the previous PramenDb may still
be open; change the order so you close the prior pramenDb before running the
DDL: call pramenDb.close() (and null it) if pramenDb != null before invoking
UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => rdb.executeDDL("DROP SCHEMA
PUBLIC CASCADE;") }, ensuring you reference the existing pramenDb, RdbJdbc and
jdbcConfig symbols in the test setup.
🧹 Nitpick comments (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala (1)
43-46: Consider adding null check for defensive cleanup.If the
beforeblock fails beforepramenDbis assigned (e.g., DDL execution throws),pramenDbremains null andafterAllwould throw an NPE.♻️ Suggested fix
override def afterAll(): Unit = { - pramenDb.close() + if (pramenDb != null) pramenDb.close() super.afterAll() }pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala (1)
49-55: Close existingpramenDbbefore opening the temporary JDBC connection.
This avoids overlapping connections during cleanup and aligns better with the “single active connection” goal.🔧 Suggested reorder
before { - UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => - rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - } - if (pramenDb != null) pramenDb.close() + if (pramenDb != null) pramenDb.close() + UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + } pramenDb = PramenDb(jdbcConfig)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala
Show resolved
Hide resolved
ed8b9ae to
3cd2352
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala`:
- Around line 151-162: You open a JDBC connection via getConnection before
calling openDb, so if openDb throws the connection is leaked; fix by moving the
openDb call and PramenDb construction inside UsingUtils.using(connection) so the
UsingUtils.using(connection) block wraps both openDb(jdbcConfig, url) and
pramenDb.setupDatabase(conn), ensuring the connection is always closed on
failure; specifically, call UsingUtils.using(connection) { conn => val
(database, profile) = openDb(jdbcConfig, url); val pramenDb = new
PramenDb(jdbcConfig, url, database, profile); pramenDb.setupDatabase(conn);
pramenDb } and return the pramenDb result from the using block instead of doing
openDb earlier.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala`:
- Around line 49-54: Add an override of afterAll to close the final pramenDb
instance and call the superclass teardown: implement override def afterAll():
Unit that checks if pramenDb != null and calls pramenDb.close(), then calls
super.afterAll(); this mirrors the cleanup in the before block and prevents the
pramenDb resource leak in the suite that uses BeforeAndAfterAll.
pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala
Outdated
Show resolved
Hide resolved
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala
Show resolved
Hide resolved
…koff time a little.
3cd2352 to
ba0d1e3
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala (1)
171-171: Typo in test description."partision" should be "partition".
📝 Suggested fix
- "throw an exception on connection errors when deleting metadata from a partision" in { + "throw an exception on connection errors when deleting metadata from a partition" in {
🧹 Nitpick comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala (1)
46-59: Minor: Double-close on the connection—safe but semantically unclear.Inside
setupDatabase,UsingUtils.using(new RdbJdbc(jdbcConnection))will callRdbJdbc.close()→connection.close(). Then the caller'sUsingUtils.using(connection)(line 158) also callsconnection.close(). Per JDBC spec, closing an already-closed connection is a no-op, so this is safe but the ownership semantics are confusing.Consider either:
- Having
setupDatabasenot close the connection (use a non-closing RdbJdbc wrapper), or- Documenting that
setupDatabasecloses the provided connection
Closes #699
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests
✏️ Tip: You can customize this high-level summary in your review settings.