#757 Add support for dynamically loaded JDBC drivers and ensure connections are reused in JDBC Native#760
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (6)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (2)
WalkthroughAdd dynamic JDBC driver loading and selector-driven connection management; refactor JDBC utilities/readers to accept a driver-aware JdbcUrlSelector, introduce AutoCloseable contracts for readers/iterators, ensure readers are closed, expand Delta AnalysisException checks, and bump project versions to 1.14.0-SNAPSHOT. ChangesJDBC Driver Support and Resource Lifecycle
Test Updates and Version Alignment
Estimated code review effort 🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala`:
- Line 24: The TableReader trait was changed to extend AutoCloseable but doesn't
provide a default close implementation, breaking downstream implementations; add
a default no-op override def close(): Unit = () in the TableReader trait so
existing external implementations remain binary/source compatible and
resource-owning readers can still override close() as needed, referencing the
TableReader trait and its close() method for where to add this default.
In
`@pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`:
- Line 57: The AnalysisException pattern in BookkeeperDeltaTable (line with case
ex: AnalysisException) is too broad and can match "table/database does not
exist" errors; narrow the match to only column/field resolution failures (e.g.,
check for "cannot resolve" or a message pattern like "column .* does not exist")
or, better, proactively verify the table exists before calling migrateModel()
using Spark's catalog (spark.catalog.tableExists) to avoid attempting
SaveMode.Append on a missing table; update the match in the exception handler
and/or add a table existence check in init()/migrateModel() so migrateModel()
only runs when the target table is present.
In
`@pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`:
- Around line 57-60: The current catch in BookkeeperDeltaTable matches
AnalysisException messages containing "does not exist", which is too broad and
can catch missing-table/database errors and lead migrateModel() to attempt
SaveMode.Append on a non-existent table; update the handling in the code that
calls migrateModel()/spark.table(recordsFullTableName) to either (a) narrow the
pattern to column-resolution errors (e.g. match "cannot resolve" or messages
that mention "column ... does not exist") or (b) check
spark.catalog.tableExists(recordsFullTableName) before calling migrateModel(),
and only call migrateModel() when the table exists but the schema indicates
missing columns (use the BookkeeperDeltaTable init/migrateModel() flow
accordingly so you don’t mask genuine missing-table/database errors).
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala`:
- Line 63: The catch in BookkeeperDeltaPath currently matches AnalysisException
messages containing the generic phrase "does not exist", which can accidentally
swallow unrelated errors (e.g., missing table or path) before migrateModel()
runs; update the match in the exception handler in BookkeeperDeltaPath (the
AnalysisException branch used around migrateModel/init logic) to only target
schema/column resolution failures (e.g., check ex.getMessage contains "cannot
resolve" or a case-insensitive regex like ".*column.*does not exist.*") or add
an explicit path existence check (call the path existence check used in init()
before performing migrateModel()) so that non-schema errors are not
misclassified and are propagated instead of being masked.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scala`:
- Around line 71-83: The loadDriver method in JdbcUrlSelector currently creates
a URLClassLoader and returns only the Driver, leaking the loader; modify
loadDriver to return/encapsulate both the Driver and its URLClassLoader (e.g., a
small Closeable wrapper class like DriverWithClassLoader), update callers to
store that wrapper, and ensure JdbcUrlSelectorImpl.close() calls loader.close()
(via wrapper.close()) in addition to closing the JDBC Connection so the jar
classloader is properly released.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala`:
- Around line 120-125: The cached JDBC connection is replaced without closing
the old one; before assigning connection = newConnection in the branch that
checks connection validity, attempt to close the existing connection in a
best-effort block: if connection != null then try connection.close() (guarding
with !connection.isClosed if desired) and ignore/log any SQLException, then
proceed to assign the new handle from getNewConnection(...) and register it with
ThreadClosableRegistry.registerCloseable(connection); perform the close inside a
try/catch so failures don't prevent creating/assigning the new connection.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala`:
- Around line 177-184: getResultSet(jdbcConfig: JdbcConfig, url: String, query:
String, jdbcDriverJarPath: Option[String]) and its overload
getResultSet(connection, jdbcConfig, query) currently return only a ResultSet
which leaks the owning Statement and Connection; change them to return a
closeable resource object (e.g., JdbcResultSetWrapper or similar) that holds the
Connection, Statement and ResultSet and implements AutoCloseable/Closable so its
close() closes rs, stmt, and conn in the correct order; update both getResultSet
overloads to construct and return this wrapper (the first to open the Connection
and pass it into the second), and update all callers to use the new wrapper with
try-with-resources / try-finally so the whole (Connection, Statement, ResultSet)
stack is always closed.
- Around line 121-130: The current for-comprehension scopes the selector-owned
Connection into the Using/cleanup logic (via "connection <- conn"), causing the
cached connection to be closed; change withResultSet so it does NOT put the
selector-owned Connection into the Using scope: obtain the connection reference
directly from jdbcUrlSelector.getConnection (e.g. assign the returned conn to a
plain val) and only wrap/using the Statement and ResultSet (created via
connection.createStatement and executeQuery) so that Statement/ResultSet are
closed but the selector retains ownership and lifetime of the Connection; keep
references to executeQuery and action unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6e38d49c-e309-4210-82c7-f85aae4726a9
📒 Files selected for processing (32)
pramen/api/pom.xmlpramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scalapramen/core/pom.xmlpramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scalapramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelector.scalapramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scalapramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scalapramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scalapramen/core/src/main/scala/za/co/absa/pramen/core/source/JdbcSource.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcSparkUtils.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/QueryExecutorJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scalapramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderSpy.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderStub.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaTableSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/JdbcUrlSelectorImplSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcNativeSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcSparkUtilsSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/QueryExecutorJdbcSuite.scalapramen/extras/pom.xmlpramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/ReaderSpy.scalapramen/pom.xml
| import java.time.LocalDate | ||
|
|
||
| trait TableReader { | ||
| trait TableReader extends AutoCloseable { |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
repo_root="."
target="pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala"
echo "== File: $target =="
sed -n '1,120p' "$target" | nl -ba
echo
echo "== Find implementations/uses of TableReader in repo =="
rg -n --hidden --no-ignore-vcs "extends\s+TableReader\b|with\s+TableReader\b|TableReader\s*(" pramen -S || true
echo
echo "== Find any existing close() methods on TableReader implementers =="
rg -n --hidden --no-ignore-vcs "\bclass\b|\btrait\b|object\s" pramen/api/src/main/scala/za/co/absa/pramen/api -S || trueRepository: AbsaOSS/pramen
Length of output: 197
🏁 Script executed:
#!/bin/bash
set -euo pipefail
target="pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala"
echo "== Confirm path exists =="
ls -la "$target" || true
echo
echo "== Show TableReader.scala (first 120 lines) =="
if [ -f "$target" ]; then
awk '{printf "%6d\t%s\n", NR, $0}' "$target" | sed -n '1,120p'
else
echo "File not found: $target"
fi
echo
echo "== Locate any other TableReader.scala files (in case path differs) =="
fd -t f -a TableReader.scala . || true
echo
echo "== Find repo usages/implementations of TableReader =="
rg -n --hidden --no-ignore-vcs "\bextends\s+TableReader\b|\bwith\s+TableReader\b|\bTableReader\s*\[" pramen -S || true
echo
echo "== Find any close() implementations in API module (TableReader related) =="
rg -n --hidden --no-ignore-vcs "\bdef\s+close\s*\(" pramen/api/src/main/scala -S || true
echo
echo "== Find close() signatures on TableReader itself =="
rg -n --hidden --no-ignore-vcs "trait\s+TableReader\b|AutoCloseable|override\s+def\s+close\s*\(" "$target" pramen/api/src/main/scala -S || trueRepository: AbsaOSS/pramen
Length of output: 3837
🏁 Script executed:
#!/bin/bash
set -euo pipefail
files=(
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderSpy.scala
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/ReaderStub.scala
pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/ReaderSpy.scala
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scala
)
echo "== close() occurrences in TableReader implementations =="
for f in "${files[@]}"; do
echo
echo "--- $f ---"
if [ -f "$f" ]; then
rg -n "\bdef\s+close\s*\(" "$f" || true
else
echo "MISSING: $f"
fi
done
echo
echo "== Show TableReaderSpark / JdbcBase definitions around class/trait header + close if present =="
for f in \
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala \
pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scala
do
echo
echo "--- $f (context) ---"
sed -n '1,120p' "$f" | awk '{printf "%4d\t%s\n", NR, $0}' | sed -n '1,120p'
doneRepository: AbsaOSS/pramen
Length of output: 12719
Adding AutoCloseable to TableReader forces an abstract close() contract downstream (binary/source break)
za.co.absa.pramen.api.TableReader now extends AutoCloseable (line 24) but doesn’t define a default close(), so external implementations that don’t already implement def close(): Unit will fail to compile (and can hit AbstractMethodError at runtime when close() is invoked). Provide a default no-op override def close(): Unit = () in the trait and let resource-owning readers override it.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pramen/api/src/main/scala/za/co/absa/pramen/api/TableReader.scala` at line
24, The TableReader trait was changed to extend AutoCloseable but doesn't
provide a default close implementation, breaking downstream implementations; add
a default no-op override def close(): Unit = () in the TableReader trait so
existing external implementations remain binary/source compatible and
resource-owning readers can still override close() as needed, referencing the
TableReader trait and its close() method for where to add this default.
| spark.table(recordsFullTableName).as[DataChunk] | ||
| } catch { | ||
| case ex: AnalysisException if ex.getMessage().contains("cannot resolve") => | ||
| case ex: AnalysisException if ex.getMessage().contains("cannot resolve") || ex.getMessage().contains("does not exist") => |
There was a problem hiding this comment.
Overly broad error pattern matching.
The pattern "does not exist" is generic and may match errors beyond schema validation failures, such as:
- "Table does not exist" (missing table entirely)
- "Database does not exist" (catalog issue)
While init() creates the records table on construction, external deletion or catalog issues could still trigger these non-schema errors. If that occurs, migrateModel() will fail with SaveMode.Append on a non-existent table, masking the original error with a confusing secondary error.
Consider a more specific pattern that targets column/field resolution failures:
case ex: AnalysisException if ex.getMessage().contains("cannot resolve") ||
ex.getMessage().toLowerCase.matches(".*column.*does not exist.*") =>Alternatively, verify the table exists before attempting migration using spark.catalog.tableExists().
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`
at line 57, The AnalysisException pattern in BookkeeperDeltaTable (line with
case ex: AnalysisException) is too broad and can match "table/database does not
exist" errors; narrow the match to only column/field resolution failures (e.g.,
check for "cannot resolve" or a message pattern like "column .* does not exist")
or, better, proactively verify the table exists before calling migrateModel()
using Spark's catalog (spark.catalog.tableExists) to avoid attempting
SaveMode.Append on a missing table; update the match in the exception handler
and/or add a table existence check in init()/migrateModel() so migrateModel()
only runs when the target table is present.
| case ex: AnalysisException if ex.getMessage().contains("cannot resolve") || ex.getMessage().contains("does not exist") => | ||
| // Spark 2 and 3 | ||
| migrateModel() | ||
| spark.table(recordsFullTableName).as[DataChunk] |
There was a problem hiding this comment.
Overly broad error pattern matching.
The pattern "does not exist" is generic and may match errors beyond schema validation failures, such as:
- "Table does not exist" (missing table entirely)
- "Database does not exist" (catalog issue)
While init() creates the records table on construction, external deletion or catalog issues could still trigger these non-schema errors. If that occurs, migrateModel() will fail with SaveMode.Append on a non-existent table, masking the original error with a confusing secondary error.
Consider a more specific pattern that targets column/field resolution failures:
case ex: AnalysisException if ex.getMessage().contains("cannot resolve") ||
ex.getMessage().toLowerCase.matches(".*column.*does not exist.*") =>Alternatively, verify the table exists before attempting migration using spark.catalog.tableExists().
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`
around lines 57 - 60, The current catch in BookkeeperDeltaTable matches
AnalysisException messages containing "does not exist", which is too broad and
can catch missing-table/database errors and lead migrateModel() to attempt
SaveMode.Append on a non-existent table; update the handling in the code that
calls migrateModel()/spark.table(recordsFullTableName) to either (a) narrow the
pattern to column-resolution errors (e.g. match "cannot resolve" or messages
that mention "column ... does not exist") or (b) check
spark.catalog.tableExists(recordsFullTableName) before calling migrateModel(),
and only call migrateModel() when the table exists but the schema indicates
missing columns (use the BookkeeperDeltaTable init/migrateModel() flow
accordingly so you don’t mask genuine missing-table/database errors).
| load() | ||
| } catch { | ||
| case ex: AnalysisException if ex.getMessage().contains("cannot resolve") => | ||
| case ex: AnalysisException if ex.getMessage().contains("cannot resolve") || ex.getMessage().contains("does not exist") => |
There was a problem hiding this comment.
Overly broad error pattern matching.
The pattern "does not exist" is generic and may match errors beyond schema validation failures, such as:
- "Table does not exist" (missing table entirely)
- "Path does not exist" (filesystem issue)
While init() creates the records path on construction, external deletion or race conditions could still trigger these non-schema errors. If that occurs, migrateModel() will fail with SaveMode.Append on a non-existent path, masking the original error with a confusing secondary error.
Consider a more specific pattern that targets column/field resolution failures:
case ex: AnalysisException if ex.getMessage().contains("cannot resolve") ||
ex.getMessage().toLowerCase.matches(".*column.*does not exist.*") =>Alternatively, verify the path exists before attempting migration.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala`
at line 63, The catch in BookkeeperDeltaPath currently matches AnalysisException
messages containing the generic phrase "does not exist", which can accidentally
swallow unrelated errors (e.g., missing table or path) before migrateModel()
runs; update the match in the exception handler in BookkeeperDeltaPath (the
AnalysisException branch used around migrateModel/init logic) to only target
schema/column resolution failures (e.g., check ex.getMessage contains "cannot
resolve" or a case-insensitive regex like ".*column.*does not exist.*") or add
an explicit path existence check (call the path existence check used in init()
before performing migrateModel()) so that non-schema errors are not
misclassified and are propagated instead of being masked.
| private[core] def getResultSet(jdbcConfig: JdbcConfig, | ||
| url: String, | ||
| query: String, | ||
| jdbcDriverJarPath: Option[String]): ResultSet = { | ||
| val driverOpt = jdbcDriverJarPath.map(path => JdbcUrlSelector.loadDriver(path, jdbcConfig.driver)) | ||
| val connection = getJdbcConnection(jdbcConfig, url, driverOpt) | ||
|
|
||
| getResultSet(connection, jdbcConfig, query) |
There was a problem hiding this comment.
Keep Connection and Statement ownership with the returned result set.
The overload at Lines 177-184 opens a fresh JDBC Connection, and the core helper at Lines 187-213 creates a Statement, but both return only a ResultSet. The native iterator/count paths close the ResultSet only, so executor reads leak the connection and all callers leak the statement/open cursor. This needs a closeable wrapper or iterator that owns the full (Connection, Statement, ResultSet) stack.
Also applies to: 187-213
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala`
around lines 177 - 184, getResultSet(jdbcConfig: JdbcConfig, url: String, query:
String, jdbcDriverJarPath: Option[String]) and its overload
getResultSet(connection, jdbcConfig, query) currently return only a ResultSet
which leaks the owning Statement and Connection; change them to return a
closeable resource object (e.g., JdbcResultSetWrapper or similar) that holds the
Connection, Statement and ResultSet and implements AutoCloseable/Closable so its
close() closes rs, stmt, and conn in the correct order; update both getResultSet
overloads to construct and return this wrapper (the first to open the Connection
and pass it into the second), and update all callers to use the new wrapper with
try-with-resources / try-finally so the whole (Connection, Statement, ResultSet)
stack is always closed.
Unit Test Coverage
Files
|
…connections in ResultSetToRowIterator.
Closes #757
Summary by CodeRabbit
New Features
Improvements
Chores