Skip to content

#626 Add JDBC Native support for binary and array types#628

Merged
yruslan merged 6 commits into
mainfrom
feature/626-add-jdbc-native-bin-and-array
Aug 27, 2025
Merged

#626 Add JDBC Native support for binary and array types#628
yruslan merged 6 commits into
mainfrom
feature/626-add-jdbc-native-bin-and-array

Conversation

@yruslan
Copy link
Copy Markdown
Collaborator

@yruslan yruslan commented Aug 27, 2025

Closes #626

Summary by CodeRabbit

  • New Features

    • Native support for JDBC array columns (strings, numbers, booleans, decimals, dates, timestamps, binary) with null-safe handling.
    • Improved schema inference and extraction for binary/BLOB and multi-bit BIT fields.
    • Array handling auto-enabled for supported JDBC drivers (e.g., PostgreSQL, HSQLDB).
  • Tests

    • Added integration tests, sample table fixture, and config to validate end-to-end native JDBC ingestion of array fields.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Aug 27, 2025

Walkthrough

Adds driver-gated ARRAY support to JDBC native ingestion: introduces DRIVERS_SUPPORT_ARRAYS and threads an arraysSupported flag into ResultSetToRowIterator; extends schema inference and value extraction for ARRAY and binary/BIT types; adds tests, config, and sample table for array columns; updates tests for the new constructor.

Changes

Cohort / File(s) Summary
Core JDBC utils
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala
Adds DRIVERS_SUPPORT_ARRAYS set and computes arraysSupported from driver; passes it into ResultSetToRowIterator for native DataFrame/RDD construction.
Result set iteration (arrays + binary handling)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala
Adds arraysSupported: Boolean constructor param; implements ARRAY element-type detection, ARRAY value extraction/unwrapping, binary/BIT size handling, nullSafeMap helper, ARRAY element kind constants, and updates schema/type mapping logic.
Integration test config
pramen/core/src/test/resources/test/config/integration_ingestion_native_array.conf
New config to run native JDBC ingestion tests against an array-containing table (HSQLDB) and write to a parquet metastore table.
Integration test: arrays
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala
New integration suite that creates the Arrays table, runs the native ingestion pipeline, and asserts resulting Parquet JSON for multiple array types (including nulls).
RDB sample table (arrays fixture)
pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala
Adds Arrays test fixture with DDL and inserts covering string, bool, numeric, decimal, date, timestamp, and binary arrays.
Unit tests update for constructor
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala
Updates ResultSetToRowIterator instantiations to include the new arraysSupported = true parameter across multiple test cases.
Minor test rename (flag variable)
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeTableLongSuite.scala
Renames useJdbsNativeuseJdbcNative in method signature and config construction.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant App as AppRunner
  participant JNU as JdbcNativeUtils
  participant DB as JDBC Driver
  participant RIt as ResultSetToRowIterator
  participant SP as Spark

  App->>JNU: getJdbcNativeDataFrame(conn, query, driver, ...)
  JNU->>JNU: arraysSupported = DRIVERS_SUPPORT_ARRAYS.contains(driver)
  JNU->>DB: executeQuery(query)
  DB-->>JNU: ResultSet
  JNU->>RIt: new ResultSetToRowIterator(rs, sanitizeDateTime, incorrectDecimalsAsString, arraysSupported)
  loop per row
    RIt->>RIt: map JDBC types (incl. ARRAY/BLOB/BIT>1) using arraysSupported
    RIt-->>JNU: Row
  end
  JNU-->>SP: RDD[Row] -> DataFrame
  SP-->>App: DataFrame
  note right of RIt: arraysSupported gates ARRAY handling
Loading
sequenceDiagram
  autonumber
  participant RIt as ResultSetToRowIterator
  participant RS as ResultSet
  participant SQLA as java.sql.Array

  RIt->>RS: getObject(col)
  alt ARRAY column and arraysSupported == true
    RS-->>RIt: java.sql.Array
    RIt->>SQLA: getArray()
    SQLA-->>RIt: Object[] elems
    RIt->>RIt: nullSafeMap(elems) -> coerce each element to Spark type
    RIt-->>RIt: Spark ArrayType value
  else ARRAY unsupported or non-ARRAY
    RIt->>RIt: existing handling (BLOB/VARBINARY/BIT/NUMERIC/DATE/TIMESTAMP/etc.)
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Assessment against linked issues

Objective Addressed Explanation
Add support for PostgreSQL type array of string in JDBC Native (#626)

Poem

A rabbit taps on keys with glee,
Arrays now hop from JDBC!
Elements mapped, nulls kept kind,
Rows compiled, each type aligned.
Hops and carrots in tidy rows—🥕🐰

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/626-add-jdbc-native-bin-and-array

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary or {PR Summary} to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (1)

212-240: Compile error and duplicate cases in setupColumnTypes; REAL mapping also wrong.

  • size is not in scope (case BIT if size > 1) → won’t compile.
  • Duplicate case BIT | BOOLEAN branch.
  • REAL should map to FLOAT path, not Decimal.
   private[core] def setupColumnTypes(): Unit = {
     for (i <- 1 to columnCount) {
       val dataType = rs.getMetaData.getColumnType(i)
+      val size = rs.getMetaData.getPrecision(i)
@@
-        case BIT if size > 1 => BLOB
-        case BIT | BOOLEAN   => BOOLEAN
+        case BIT if size > 1 => BLOB
+        case BIT | BOOLEAN   => BOOLEAN
         case BLOB            => BLOB
         case VARBINARY       => BLOB
         case LONGVARBINARY   => BLOB
-        case BIT | BOOLEAN   => BOOLEAN
         case TINYINT         => TINYINT
         case SMALLINT        => SMALLINT
         case INTEGER         => INTEGER
         case BIGINT          => BIGINT
-        case FLOAT           => FLOAT
-        case DOUBLE          => DOUBLE
-        case REAL            => getDecimalDataType(i)
+        case FLOAT           => FLOAT
+        case DOUBLE          => DOUBLE
+        case REAL            => FLOAT
         case NUMERIC         => getDecimalDataType(i)
         case DATE            => DATE
         case TIMESTAMP       => TIMESTAMP
         case ARRAY           => getArrayJdbcDataType(i)
         case _               => VARCHAR
🧹 Nitpick comments (10)
pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala (2)

123-136: DDL for arrays looks good; consider adding an empty-array edge case.

Table definition covers all major element types for HSQLDB/Postgres. To harden tests, add at least one row with empty arrays (e.g., ARRAY[] for each column) to cover zero-length handling vs. nulls.

Apply this patch to add a 4th insert with empty arrays:

@@
     val inserts: Seq[String] = Seq(
@@
       s"""INSERT INTO $tableName (
@@
         |);""".stripMargin,
+      s"""INSERT INTO $tableName (
+        |    str_array, bool_array, short_array, int_array, long_array,
+        |    dec_array, date_array, ts_array, bin_array
+        |)
+        |VALUES (
+        |    ARRAY[], ARRAY[], ARRAY[], ARRAY[], ARRAY[],
+        |    ARRAY[], ARRAY[], ARRAY[], ARRAY[]
+        |);""".stripMargin
     )

137-141: Minor grammar in column comment.

“This a flags array” → “This is a flags array”.

-      s"COMMENT ON COLUMN $tableName.bool_array IS 'This a flags array'"
+      s"COMMENT ON COLUMN $tableName.bool_array IS 'This is a flags array'"
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (1)

187-198: nullSafeMap is fine; consider widening result type for future flexibility.

Current T => T forces casts for cases like toString. A more general version T => U would avoid unsafe casts. Low priority.

-  private[core] def nullSafeMap[T](arr: Array[T])(f: T => T): Array[T] = {
+  private[core] def nullSafeMap[T, U >: Null](arr: Array[T])(f: T => U): Array[U] = {
@@
-      arr.map { v =>
+      arr.map { v =>
         if (v == null)
           null
         else
           f(v)
-      }.asInstanceOf[Array[T]]
+      }
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala (1)

220-220: Test updates to new constructor LGTM; consider adding array-focused unit tests.

Passing arraysSupported = true keeps existing expectations. Please add focused unit tests for:

  • Schema inference of ARRAY types (incl. VARBINARY[], DECIMAL[] with invalid precision fallback).
  • Data extraction null/empty arrays, and numeric up/down-casts (e.g., SHORT[]).

I can scaffold a small suite mocking ResultSet/SQLArray for each ARRAY_* branch.

Also applies to: 229-229, 241-241, 261-261, 269-269, 277-277, 305-305, 313-313, 331-331, 345-345, 365-365, 373-373, 391-391, 404-404

pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala (1)

49-49: Driver allowlist for arrays: good start; consider H2 if needed.

If H2 is in your test matrix later, it supports arrays and could be added. Optional.

pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala (5)

49-71: Verify ID expectations; identity columns usually start at 1.

DDL uses “GENERATED BY DEFAULT AS IDENTITY”, which commonly starts at 1. The expected JSON assumes IDs 0,1,2. If DB starts at 1, either update expected or exclude ID from comparison.

Options:

  • Preferred: Drop ID from equality and focus on array columns.
  • Or normalize dynamically: orderedDf.withColumn("ID0", col("ID") - min("ID").over()).drop("ID").

57-58: Timestamp rendering assumes +02:00 year-round.

Both winter and summer timestamps show +02:00; that implies a no-DST zone (Africa/Johannesburg). Ensure the pinned zone matches this, or adjust expectations.

Also applies to: 68-69


47-72: Avoid giant inline expected JSON; move to a resource for readability.

Keeps test concise and simplifies future updates.

-      val expectedData =
-        """[ {
-          |  "ID" : 0,
-          |  "STR_ARRAY" : [ "String1", "String2" ],
-          |  "BOOL_ARRAY" : [ true, false, true ],
-          |  "SHORT_ARRAY" : [ 10, 20, 30 ],
-          |  "INT_ARRAY" : [ 100, 200, 300 ],
-          |  "LONG_ARRAY" : [ 10000000000, 20000000000 ],
-          |  "DEC_ARRAY" : [ 123.45, 678.9 ],
-          |  "DATE_ARRAY" : [ "2025-01-01", "2025-12-31" ],
-          |  "TS_ARRAY" : [ "2025-01-01T10:00:00.000+02:00", "2025-06-01T12:30:00.000+02:00" ],
-          |  "BIN_ARRAY" : [ "3q2+7w==", "AQID" ]
-          |}, {
-          |  "ID" : 1,
-          |  "STR_ARRAY" : [ "String3", null ],
-          |  "BOOL_ARRAY" : [ true, null, true ],
-          |  "SHORT_ARRAY" : [ 50, null ],
-          |  "INT_ARRAY" : [ null, 200, 300 ],
-          |  "LONG_ARRAY" : [ 21234540000, null ],
-          |  "DEC_ARRAY" : [ null, 678.9 ],
-          |  "DATE_ARRAY" : [ "2025-01-01", null ],
-          |  "TS_ARRAY" : [ null, "2025-06-01T12:30:00.000+02:00" ],
-          |  "BIN_ARRAY" : [ "AavNIw==", null ]
-          |}, {
-          |  "ID" : 2
-          |} ]""".stripMargin
+      val expectedData = ResourceUtils.getResourceString("/test/data/expected_native_arrays.json")

91-103: Typo: useJdbsNative → useJdbcNative (clarity).

Minor naming fix; no behavior change.

-  def getConfig(basePath: String, useJdbsNative: Boolean = true): Config = {
+  def getConfig(basePath: String, useJdbcNative: Boolean = true): Config = {
...
-         |jdbc.native=$useJdbsNative
+         |jdbc.native=$useJdbcNative

45-47: Clarify the test name to reflect intent.

Make it obvious this validates array ingestion via native JDBC.

-    "support input.table setting" in {
+    "ingests array-typed columns via JDBC native" in {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 4ec65a5 and f21849f.

📒 Files selected for processing (6)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala (2 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (6 hunks)
  • pramen/core/src/test/resources/test/config/integration_ingestion_native_array.conf (1 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala (1 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala (1 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/JdbcNativeUtilsSuite.scala (12 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala (4)
pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala (4)
  • RdbExampleTable (53-199)
  • Arrays (117-182)
  • initTable (27-39)
  • dropTable (41-50)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ResourceUtils.scala (2)
  • ResourceUtils (21-24)
  • getResourceString (22-24)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala (2)
  • SparkUtils (38-622)
  • convertDataFrameToPrettyJSON (58-68)
pramen/extras/src/test/scala/za/co/absa/pramen/extras/fixtures/TextComparisonFixture.scala (1)
  • compareTextVertical (32-36)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcSparkUtils.scala (2)
  • JdbcSparkUtils (30-376)
  • addMetadataFromJdbc (47-81)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (2)
  • getSchema (55-64)
  • close (66-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Coverage on Scala 2.12.18
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
🔇 Additional comments (8)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (5)

28-28: Constructor change LGTM.

arraysSupported plumbs through cleanly and keeps behavior gated by driver support.


257-294: Array type inference: good coverage; minor note on TIME→Timestamp mapping.

Mapping time/time with tz to TimestampType is acceptable given Spark types. No action.


295-317: Mapping to ARRAY_ codes LGTM.*

Covers all supported element types and falls back to VARCHAR when unsupported.


319-325: SQLArray unwrapping LGTM; handle null correctly.

No issues.


358-370: ARRAY_ constants LGTM.*

Clear separation from JDBC Types values.

pramen/core/src/test/resources/test/config/integration_ingestion_native_array.conf (2)

46-50: Driver choice aligns with allowlist.

org.hsqldb.jdbc.JDBCDriver is in DRIVERS_SUPPORT_ARRAYS; config matches test fixture.


70-73: input.table casing vs. created table name.

Hibernate/HSQLDB are case-insensitive for unquoted identifiers, so "arrays" will resolve to ARRAYS. Just a note; no change required.

If any DB in CI is case-sensitive on identifiers, quote table name in DDL and config for consistency.

pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala (1)

34-37: Remove redundant timezone override in JdbcNativeArrayLongSuite

The JdbcNativeArrayLongSuite already mixes in SparkTestBase, which configures the session time zone to Africa/Johannesburg via

.config("spark.sql.session.timeZone", "Africa/Johannesburg")

in its builder (see SparkTestBase.scala:41) . As a result, timestamps in JSON will consistently render with +02:00 without any additional setup in individual suites.

  • No changes needed in beforeAll() of JdbcNativeArrayLongSuite.scala.
  • The optional diff is redundant and can be discarded.

Comment on lines 86 to 110
private[core] def getStructField(columnIndex: Int): StructField = {
val columnName = rs.getMetaData.getColumnName(columnIndex)
val dataType = rs.getMetaData.getColumnType(columnIndex)
val metadata = rs.getMetaData
val columnName = metadata.getColumnName(columnIndex)
val dataType = metadata.getColumnType(columnIndex)
val size = metadata.getPrecision(columnIndex)

dataType match {
case BIT | BOOLEAN => StructField(columnName, BooleanType)
case TINYINT => StructField(columnName, ByteType)
case SMALLINT => StructField(columnName, ShortType)
case INTEGER => StructField(columnName, IntegerType)
case BIGINT => StructField(columnName, LongType)
case FLOAT => StructField(columnName, FloatType)
case DOUBLE => StructField(columnName, DoubleType)
case REAL => StructField(columnName, getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
case NUMERIC => StructField(columnName, getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
case DATE => StructField(columnName, DateType)
case TIMESTAMP => StructField(columnName, TimestampType)
case _ => StructField(columnName, StringType)
case BIT if size > 1 => StructField(columnName, BinaryType)
case BIT | BOOLEAN => StructField(columnName, BooleanType)
case BLOB => StructField(columnName, BinaryType)
case VARBINARY => StructField(columnName, BinaryType)
case LONGVARBINARY => StructField(columnName, BinaryType)
case TINYINT => StructField(columnName, ByteType)
case SMALLINT => StructField(columnName, ShortType)
case INTEGER => StructField(columnName, IntegerType)
case BIGINT => StructField(columnName, LongType)
case FLOAT => StructField(columnName, FloatType)
case DOUBLE => StructField(columnName, DoubleType)
case REAL => StructField(columnName, getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
case NUMERIC => StructField(columnName, getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
case DATE => StructField(columnName, DateType)
case TIMESTAMP => StructField(columnName, TimestampType)
case ARRAY => StructField(columnName, getArrayDataType(columnIndex))
case _ => StructField(columnName, StringType)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Wrong mapping for JDBC REAL; should be FloatType.

REAL is single-precision. Mapping it to Decimal via getDecimalSparkSchema is incorrect and changes semantics.

-      case REAL            => StructField(columnName, getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
+      case REAL            => StructField(columnName, FloatType)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private[core] def getStructField(columnIndex: Int): StructField = {
val columnName = rs.getMetaData.getColumnName(columnIndex)
val dataType = rs.getMetaData.getColumnType(columnIndex)
val metadata = rs.getMetaData
val columnName = metadata.getColumnName(columnIndex)
val dataType = metadata.getColumnType(columnIndex)
val size = metadata.getPrecision(columnIndex)
dataType match {
case BIT | BOOLEAN => StructField(columnName, BooleanType)
case TINYINT => StructField(columnName, ByteType)
case SMALLINT => StructField(columnName, ShortType)
case INTEGER => StructField(columnName, IntegerType)
case BIGINT => StructField(columnName, LongType)
case FLOAT => StructField(columnName, FloatType)
case DOUBLE => StructField(columnName, DoubleType)
case REAL => StructField(columnName, getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
case NUMERIC => StructField(columnName, getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
case DATE => StructField(columnName, DateType)
case TIMESTAMP => StructField(columnName, TimestampType)
case _ => StructField(columnName, StringType)
case BIT if size > 1 => StructField(columnName, BinaryType)
case BIT | BOOLEAN => StructField(columnName, BooleanType)
case BLOB => StructField(columnName, BinaryType)
case VARBINARY => StructField(columnName, BinaryType)
case LONGVARBINARY => StructField(columnName, BinaryType)
case TINYINT => StructField(columnName, ByteType)
case SMALLINT => StructField(columnName, ShortType)
case INTEGER => StructField(columnName, IntegerType)
case BIGINT => StructField(columnName, LongType)
case FLOAT => StructField(columnName, FloatType)
case DOUBLE => StructField(columnName, DoubleType)
case REAL => StructField(columnName, getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
case NUMERIC => StructField(columnName, getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
case DATE => StructField(columnName, DateType)
case TIMESTAMP => StructField(columnName, TimestampType)
case ARRAY => StructField(columnName, getArrayDataType(columnIndex))
case _ => StructField(columnName, StringType)
}
private[core] def getStructField(columnIndex: Int): StructField = {
val metadata = rs.getMetaData
val columnName = metadata.getColumnName(columnIndex)
val dataType = metadata.getColumnType(columnIndex)
val size = metadata.getPrecision(columnIndex)
dataType match {
case BIT if size > 1 => StructField(columnName, BinaryType)
case BIT | BOOLEAN => StructField(columnName, BooleanType)
case BLOB => StructField(columnName, BinaryType)
case VARBINARY => StructField(columnName, BinaryType)
case LONGVARBINARY => StructField(columnName, BinaryType)
case TINYINT => StructField(columnName, ByteType)
case SMALLINT => StructField(columnName, ShortType)
case INTEGER => StructField(columnName, IntegerType)
case BIGINT => StructField(columnName, LongType)
case FLOAT => StructField(columnName, FloatType)
case DOUBLE => StructField(columnName, DoubleType)
case REAL => StructField(columnName, FloatType)
case NUMERIC => StructField(columnName, getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
case DATE => StructField(columnName, DateType)
case TIMESTAMP => StructField(columnName, TimestampType)
case ARRAY => StructField(columnName, getArrayDataType(columnIndex))
case _ => StructField(columnName, StringType)
}
}
🤖 Prompt for AI Agents
In
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala
around lines 86 to 110, the JDBC REAL type is incorrectly mapped to a Decimal
via getDecimalSparkSchema; change the mapping so REAL is mapped to FloatType
(single-precision) like FLOAT, replacing the current case that calls
getDecimalSparkSchema with StructField(columnName, FloatType) to preserve
correct semantics.

Comment on lines 121 to +185
dataType match {
case BIT | BOOLEAN =>
case BIT if size > 1 =>
val v = rs.getBytes(columnIndex)
if (rs.wasNull()) null else v
case BLOB | VARBINARY | LONGVARBINARY =>
val v = rs.getBytes(columnIndex)
if (rs.wasNull()) null else v
case BIT | BOOLEAN =>
val v = rs.getBoolean(columnIndex)
if (rs.wasNull()) null else v
case TINYINT =>
case TINYINT =>
val v = rs.getByte(columnIndex)
if (rs.wasNull()) null else v
case SMALLINT =>
case SMALLINT =>
val v = rs.getShort(columnIndex)
if (rs.wasNull()) null else v
case INTEGER =>
case INTEGER =>
val v = rs.getInt(columnIndex)
if (rs.wasNull()) null else v
case BIGINT =>
case BIGINT =>
val v = rs.getLong(columnIndex)
if (rs.wasNull()) null else v
case FLOAT =>
case FLOAT =>
val v = rs.getFloat(columnIndex)
if (rs.wasNull()) null else v
case DOUBLE =>
case DOUBLE =>
val v = rs.getDouble(columnIndex)
if (rs.wasNull()) null else v
case REAL => rs.getBigDecimal(columnIndex)
case NUMERIC => rs.getBigDecimal(columnIndex)
case DATE => sanitizeDate(rs.getDate(columnIndex))
case TIMESTAMP => sanitizeTimestamp(rs.getTimestamp(columnIndex))
case _ => rs.getString(columnIndex)
case REAL =>
rs.getBigDecimal(columnIndex)
case NUMERIC =>
rs.getBigDecimal(columnIndex)
case DATE =>
sanitizeDate(rs.getDate(columnIndex))
case TIMESTAMP =>
sanitizeTimestamp(rs.getTimestamp(columnIndex))
case ARRAY_STRING =>
nullSafeMap(getJdbcArray(columnIndex))(_.toString)
case ARRAY_BINARY =>
nullSafeMap(getJdbcArray(columnIndex))(_.asInstanceOf[Array[Byte]])
case ARRAY_BOOL =>
nullSafeMap(getJdbcArray(columnIndex))(_.asInstanceOf[java.lang.Boolean])
case ARRAY_SHORT =>
nullSafeMap(getJdbcArray(columnIndex)) {
case v: java.lang.Integer => new java.lang.Short(v.toShort)
case v: java.lang.Short => v
}
case ARRAY_INT =>
nullSafeMap(getJdbcArray(columnIndex))(_.asInstanceOf[java.lang.Integer])
case ARRAY_LONG =>
nullSafeMap(getJdbcArray(columnIndex))(_.asInstanceOf[java.lang.Long])
case ARRAY_FLOAT =>
nullSafeMap(getJdbcArray(columnIndex))(_.asInstanceOf[java.lang.Float])
case ARRAY_DOUBLE =>
nullSafeMap(getJdbcArray(columnIndex))(_.asInstanceOf[java.lang.Double])
case ARRAY_DECIMAL =>
nullSafeMap(getJdbcArray(columnIndex))(_.asInstanceOf[java.math.BigDecimal])
case ARRAY_DATE =>
nullSafeMap(getJdbcArray(columnIndex))(_.asInstanceOf[Date])
case ARRAY_TIMESTAMP =>
nullSafeMap(getJdbcArray(columnIndex))(_.asInstanceOf[Timestamp])
case _ =>
rs.getString(columnIndex)
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Compile error: ‘size’ is undefined in getCell; and REAL read path inconsistent.

  • size is not in scope here; pattern case BIT if size > 1 won’t compile.
  • REAL should read Float, not BigDecimal.
-    val dataType = columnIndexToTypeMap(columnIndex)
+    val dataType = columnIndexToTypeMap(columnIndex)
+    val size = rs.getMetaData.getPrecision(columnIndex)
@@
-      case REAL                             =>
-        rs.getBigDecimal(columnIndex)
+      case REAL                             =>
+        val v = rs.getFloat(columnIndex)
+        if (rs.wasNull()) null else v

Optionally, you can drop the BIT/size guard here and rely on setupColumnTypes remapping BIT(size>1)→BLOB. If you keep it, the added size fixes compilation.

Comment on lines 90 to 101
// Executing the query
val arraysSupported = DRIVERS_SUPPORT_ARRAYS.contains(jdbcConfig.driver)
println(arraysSupported)
val rs = getResultSet(jdbcConfig, url, query)
val driverIterator = new ResultSetToRowIterator(rs, jdbcConfig.sanitizeDateTime, jdbcConfig.incorrectDecimalsAsString)
val driverIterator = new ResultSetToRowIterator(rs, jdbcConfig.sanitizeDateTime, jdbcConfig.incorrectDecimalsAsString, arraysSupported)
val schema = JdbcSparkUtils.addMetadataFromJdbc(driverIterator.getSchema, rs.getMetaData)

driverIterator.close()

val rdd = spark.sparkContext.parallelize(Seq(query)).flatMap(q => {
new ResultSetToRowIterator(getResultSet(jdbcConfig, url, q), jdbcConfig.sanitizeDateTime, jdbcConfig.incorrectDecimalsAsString)
new ResultSetToRowIterator(getResultSet(jdbcConfig, url, q), jdbcConfig.sanitizeDateTime, jdbcConfig.incorrectDecimalsAsString, arraysSupported)
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove println; use logger.

Leaking to stdout in library code is undesirable.

-    val arraysSupported = DRIVERS_SUPPORT_ARRAYS.contains(jdbcConfig.driver)
-    println(arraysSupported)
+    val arraysSupported = DRIVERS_SUPPORT_ARRAYS.contains(jdbcConfig.driver)
+    log.debug(s"Arrays supported by driver '${jdbcConfig.driver}': $arraysSupported")
@@
-      new ResultSetToRowIterator(getResultSet(jdbcConfig, url, q), jdbcConfig.sanitizeDateTime, jdbcConfig.incorrectDecimalsAsString, arraysSupported)
+      new ResultSetToRowIterator(
+        getResultSet(jdbcConfig, url, q),
+        jdbcConfig.sanitizeDateTime,
+        jdbcConfig.incorrectDecimalsAsString,
+        arraysSupported
+      )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Executing the query
val arraysSupported = DRIVERS_SUPPORT_ARRAYS.contains(jdbcConfig.driver)
println(arraysSupported)
val rs = getResultSet(jdbcConfig, url, query)
val driverIterator = new ResultSetToRowIterator(rs, jdbcConfig.sanitizeDateTime, jdbcConfig.incorrectDecimalsAsString)
val driverIterator = new ResultSetToRowIterator(rs, jdbcConfig.sanitizeDateTime, jdbcConfig.incorrectDecimalsAsString, arraysSupported)
val schema = JdbcSparkUtils.addMetadataFromJdbc(driverIterator.getSchema, rs.getMetaData)
driverIterator.close()
val rdd = spark.sparkContext.parallelize(Seq(query)).flatMap(q => {
new ResultSetToRowIterator(getResultSet(jdbcConfig, url, q), jdbcConfig.sanitizeDateTime, jdbcConfig.incorrectDecimalsAsString)
new ResultSetToRowIterator(getResultSet(jdbcConfig, url, q), jdbcConfig.sanitizeDateTime, jdbcConfig.incorrectDecimalsAsString, arraysSupported)
})
// Executing the query
val arraysSupported = DRIVERS_SUPPORT_ARRAYS.contains(jdbcConfig.driver)
log.debug(s"Arrays supported by driver '${jdbcConfig.driver}': $arraysSupported")
val rs = getResultSet(jdbcConfig, url, query)
val driverIterator = new ResultSetToRowIterator(
rs,
jdbcConfig.sanitizeDateTime,
jdbcConfig.incorrectDecimalsAsString,
arraysSupported
)
val schema = JdbcSparkUtils.addMetadataFromJdbc(driverIterator.getSchema, rs.getMetaData)
driverIterator.close()
val rdd = spark.sparkContext.parallelize(Seq(query)).flatMap(q => {
new ResultSetToRowIterator(
getResultSet(jdbcConfig, url, q),
jdbcConfig.sanitizeDateTime,
jdbcConfig.incorrectDecimalsAsString,
arraysSupported
)
})
🤖 Prompt for AI Agents
In pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala
around lines 90 to 101, remove the println(arraysSupported) call and replace it
with the project's logger (e.g. logger.debug or logger.trace) to avoid writing
to stdout; if a logger is not already defined in the file, add an SLF4J logger
instance (private val logger = org.slf4j.LoggerFactory.getLogger(getClass)) and
log the arraysSupported value via logger.debug(s"arraysSupported =
$arraysSupported") so the information goes to the configured logging framework
instead of stdout.

import za.co.absa.pramen.core.fixtures.{RelationalDbFixture, TempDirFixture, TextComparisonFixture}
import za.co.absa.pramen.core.runner.AppRunner
import za.co.absa.pramen.core.samples.RdbExampleTable
import za.co.absa.pramen.core.utils.{ResourceUtils, SparkUtils}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect ResourceUtils import (compilation blocker).

ResourceUtils lives under za.co.absa.pramen.extras.utils, not core.utils.

-import za.co.absa.pramen.core.utils.{ResourceUtils, SparkUtils}
+import za.co.absa.pramen.extras.utils.ResourceUtils
+import za.co.absa.pramen.core.utils.SparkUtils
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import za.co.absa.pramen.core.utils.{ResourceUtils, SparkUtils}
import za.co.absa.pramen.extras.utils.ResourceUtils
import za.co.absa.pramen.core.utils.SparkUtils
🤖 Prompt for AI Agents
In
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala
around line 27, the import for ResourceUtils is incorrect (it currently imports
za.co.absa.pramen.core.utils), causing a compilation error; change the import to
the correct package za.co.absa.pramen.extras.utils.ResourceUtils (keep
SparkUtils import as-is) and run a compile to verify the test file resolves
successfully.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Aug 27, 2025

Unit Test Coverage

Overall Project 84.73% -0.75% 🍏
Files changed 59.88%

Module Coverage
pramen:core Jacoco Report 85.58% -0.82%
Files
Module File Coverage
pramen:core Jacoco Report ResultSetToRowIterator.scala 78.84% -37.87%
JdbcNativeUtils.scala 70.96% 🍏

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (1)

1-381: Critical: REAL columns are mapped to DoubleType and there are no tests for time/timetz arrays

Please address the following before merging:

  • Tests for PostgreSQL time[] and timetz[] types are missing
    • No occurrences of time[] or timetz[] in the test suite, so array deserialization for these types isn’t exercised.
    • Add end-to-end tests that read time[] and timetz[] columns from JDBC into Spark rows, verifying no ClassCastException and correct TimestampType values.

  • REAL type mapping is incorrect
    • In getStructField, you currently map REAL to DoubleType (should be FloatType).
    • In getCell, you use rs.getDouble for REAL (should be rs.getFloat).
    • In setupColumnTypes, you map REAL to DOUBLE (should be FLOAT).
    • Update all three locations so that REAL ⇒ FloatType/rs.getFloat/FLOAT.

Please add the missing tests and correct the REAL mappings.

♻️ Duplicate comments (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (3)

102-105: Map JDBC REAL to FloatType (single precision), not DoubleType.

JDBC REAL is single-precision; using Double changes semantics and may surprise downstream consumers.

-      case REAL            => StructField(columnName, DoubleType)
+      case REAL            => StructField(columnName, FloatType)

146-148: Read REAL with getFloat to match schema.

Keep value extraction consistent with FloatType.

-      case REAL                             =>
-        val v = rs.getDouble(columnIndex)
+      case REAL                             =>
+        val v = rs.getFloat(columnIndex)
         if (rs.wasNull()) null else v

226-229: setupColumnTypes: classify REAL as FLOAT, not DOUBLE.

Keeps the inferred type map consistent with schema and read-path.

-        case REAL            => DOUBLE
+        case REAL            => FLOAT
🧹 Nitpick comments (6)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala (2)

49-49: Limit visibility of driver allowlist; make it less brittle.

This list is an internal implementation detail. Narrow its visibility and add an explicit type annotation.

-  final val DRIVERS_SUPPORT_ARRAYS = Set("org.postgresql.Driver", "org.hsqldb.jdbc.JDBCDriver")
+  private[core] val DRIVERS_SUPPORT_ARRAYS: Set[String] =
+    Set("org.postgresql.Driver", "org.hsqldb.jdbc.JDBCDriver")

91-91: Add debug log for the arraysSupported decision.

Helps troubleshoot driver-specific behavior without stdout noise.

-    val arraysSupported = DRIVERS_SUPPORT_ARRAYS.contains(jdbcConfig.driver)
+    val arraysSupported = DRIVERS_SUPPORT_ARRAYS.contains(jdbcConfig.driver)
+    log.debug(s"Arrays supported by driver '${jdbcConfig.driver}': $arraysSupported")
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (4)

317-323: Null-safety in getJdbcArray.

Guard against drivers that return null without setting wasNull() reliably.

   private[core] def getJdbcArray(columnIndex: Int): Array[Object] = {
     val v = rs.getArray(columnIndex)
-    if (rs.wasNull()) null else {
-      v.getArray.asInstanceOf[Array[Object]]
-    }
+    if (v == null || rs.wasNull()) null
+    else v.getArray.asInstanceOf[Array[Object]]
   }

105-106: Nit: reuse local metadata variable.

Avoid repeated rs.getMetaData calls; minor clarity/perf improvement.

-      case NUMERIC         => StructField(columnName, getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
+      case NUMERIC         => StructField(columnName, getDecimalSparkSchema(metadata.getPrecision(columnIndex), metadata.getScale(columnIndex)))

283-284: Nit: reuse local metadata variable here as well.

-      case "numeric" | "decimal"                           => Option(getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
+      case "numeric" | "decimal"                           => Option(getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))

If metadata is also available in scope here, consider:

-      case "numeric" | "decimal"                           => Option(getDecimalSparkSchema(rs.getMetaData.getPrecision(columnIndex), rs.getMetaData.getScale(columnIndex)))
+      case "numeric" | "decimal"                           => Option(getDecimalSparkSchema(metadata.getPrecision(columnIndex), metadata.getScale(columnIndex)))

255-291: Optional: avoid early return in getArrayDataType.

Idiomatic Scala: structure the guard as an if/else and drop return.

-  private[core] def getArrayDataType(columnIndex: Int): DataType = {
-    if (!arraysSupported)
-      return StringType
-
-    val metadata = rs.getMetaData
-    ...
-    columnTypeOpt match {
-      case Some(dt) => ArrayType(dt)
-      case None     => StringType // Make the driver stringify the array.
-    }
-  }
+  private[core] def getArrayDataType(columnIndex: Int): DataType = {
+    if (!arraysSupported) {
+      StringType
+    } else {
+      val metadata = rs.getMetaData
+      ...
+      columnTypeOpt match {
+        case Some(dt) => ArrayType(dt)
+        case None     => StringType // Make the driver stringify the array.
+      }
+    }
+  }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between f21849f and 87aac7d.

📒 Files selected for processing (3)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala (2 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (6 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala
🧰 Additional context used
🧬 Code graph analysis (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcSparkUtils.scala (2)
  • JdbcSparkUtils (30-376)
  • addMetadataFromJdbc (47-81)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (2)
  • getSchema (55-64)
  • close (66-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/JdbcNativeUtils.scala (1)

93-100: Good: arraysSupported is threaded consistently to both iterators.

Constructor usage aligns with the new signature and keeps schema/value paths in sync.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala (1)

27-27: Fix incorrect ResourceUtils import (compilation blocker).

ResourceUtils is in extras.utils, not core.utils.

-import za.co.absa.pramen.core.utils.{ResourceUtils, SparkUtils}
+import za.co.absa.pramen.extras.utils.ResourceUtils
+import za.co.absa.pramen.core.utils.SparkUtils
🧹 Nitpick comments (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala (2)

95-97: Stabilize JSON comparison: order columns deterministically.

Row order is now stable; also fix column order to avoid flakiness if schema projection changes.

-        val actualData = SparkUtils.convertDataFrameToPrettyJSON(resultDf.orderBy("id"))
+        val orderedDf = resultDf
+          .select("ID","STR_ARRAY","BOOL_ARRAY","SHORT_ARRAY","INT_ARRAY","LONG_ARRAY","DEC_ARRAY","DATE_ARRAY","TS_ARRAY","BIN_ARRAY")
+          .orderBy("ID")
+        val actualData = SparkUtils.convertDataFrameToPrettyJSON(orderedDf)

91-94: Avoid double action: cache before count + collect.

Minor perf tweak in tests: count() + collect() triggers two jobs.

-        val resultDf = spark.read.parquet(table1Path.toString)
+        val resultDf = spark.read.parquet(table1Path.toString).cache()
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 87aac7d and 87dc1ac.

📒 Files selected for processing (3)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala (1 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeTableLongSuite.scala (3 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala
🧰 Additional context used
🧬 Code graph analysis (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeTableLongSuite.scala (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala (1)
  • getConfig (102-121)
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala (5)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (12)
  • core (70-84)
  • core (86-111)
  • core (113-183)
  • core (185-196)
  • core (198-208)
  • core (210-238)
  • core (240-253)
  • core (255-291)
  • core (293-315)
  • core (317-322)
  • core (324-337)
  • core (339-352)
pramen/core/src/test/scala/za/co/absa/pramen/core/samples/RdbExampleTable.scala (2)
  • RdbExampleTable (53-207)
  • Arrays (117-190)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ResourceUtils.scala (2)
  • ResourceUtils (21-24)
  • getResourceString (22-24)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala (2)
  • SparkUtils (38-622)
  • convertDataFrameToPrettyJSON (58-68)
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeTableLongSuite.scala (1)
  • getConfig (79-98)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (5)
pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeTableLongSuite.scala (2)

67-67: Good toggle to validate non-native path.

Passing useJdbcNative = false exercises the legacy JDBC path alongside the native one. Looks correct.


79-90: Config builder rename and wiring look correct.

Signature and property use (jdbc.native=$useJdbcNative) are consistent with the other suite and the pipeline.

pramen/core/src/test/scala/za/co/absa/pramen/core/integration/JdbcNativeArrayLongSuite.scala (3)

85-90: Pipeline run assertions look good.

Exit code check is sufficient here; downstream content is validated via JSON compare.


102-120: Config wiring LGTM.

Resource load + overrides and jdbc.native flag threading are consistent with the other suite.


47-84: No action needed: test suite already pins Spark timezone to Africa/Johannesburg (UTC+2)
Both pramen/core/src/test/scala/za/co/absa/pramen/core/base/SparkTestBase.scala and its extras counterpart configure

.config("spark.sql.session.timeZone", "Africa/Johannesburg")

ensuring all timestamps are rendered with a +02:00 offset consistently across CI runs.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (3)

276-281: Good fix: time/timetz arrays mapped to StringType.

Prevents ClassCastException when drivers return java.sql.Time for array elements.


104-105: REAL is single-precision; map to FloatType and read via getFloat.

REAL should be FloatType throughout; mixing DoubleType changes semantics and may break downstream schemas.

--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala
@@
-      case REAL            => StructField(columnName, DoubleType)
+      case REAL            => StructField(columnName, FloatType)
@@
-      case REAL                             =>
-        val v = rs.getDouble(columnIndex)
-        if (rs.wasNull()) null else v
+      case REAL                             =>
+        val v = rs.getFloat(columnIndex)
+        if (rs.wasNull()) null else v
@@
-        case REAL            => DOUBLE
+        case REAL            => FLOAT

Also applies to: 146-149, 227-229


274-281: Correct array element mapping for “real”.

Map “real” to FloatType (keep “float8” as DoubleType) to align with JDBC/PG types.

-      case "float8" | "real"                               => Some(DoubleType)
+      case "float8"                                        => Some(DoubleType)
+      case "real"                                          => Some(FloatType)
🧹 Nitpick comments (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (4)

176-179: Apply date/time sanitization to array elements for consistency.

Scalar DATE/TIMESTAMP are sanitized; arrays should be too.

-      case ARRAY_DATE                       =>
-        nullSafeMap(getJdbcArray(columnIndex))(_.asInstanceOf[Date])
+      case ARRAY_DATE                       =>
+        nullSafeMap(getJdbcArray(columnIndex))(v => sanitizeDate(v.asInstanceOf[Date]))
@@
-      case ARRAY_TIMESTAMP                  =>
-        nullSafeMap(getJdbcArray(columnIndex))(_.asInstanceOf[Timestamp])
+      case ARRAY_TIMESTAMP                  =>
+        nullSafeMap(getJdbcArray(columnIndex))(v => sanitizeTimestamp(v.asInstanceOf[Timestamp]))

316-321: Make getJdbcArray robust to primitive arrays returned by some drivers.

Some JDBC drivers may return primitive arrays (e.g., int[]) that aren’t Array[Object]; safely box them.

   private[core] def getJdbcArray(columnIndex: Int): Array[Object] = {
-    val v = rs.getArray(columnIndex)
-    if (rs.wasNull()) null else {
-      v.getArray.asInstanceOf[Array[Object]]
-    }
+    val v = rs.getArray(columnIndex)
+    if (rs.wasNull()) null
+    else {
+      val raw = v.getArray
+      raw match {
+        case o: Array[Object] => o
+        case a: Array[_] =>
+          val comp = a.getClass.getComponentType
+          if (!comp.isPrimitive) a.asInstanceOf[Array[Object]]
+          else {
+            val len = java.lang.reflect.Array.getLength(a)
+            val boxed = new Array[Object](len)
+            var i = 0
+            while (i < len) {
+              boxed(i) = java.lang.reflect.Array.get(a, i).asInstanceOf[Object]
+              i += 1
+            }
+            boxed
+          }
+      }
+    }
   }

185-195: Generalize nullSafeMap to avoid casts and allow type-changing transforms.

Current T => T plus asInstanceOf works but hides type issues. Consider A=>B with ClassTag.

-  private[core] def nullSafeMap[T](arr: Array[T])(f: T => T): Array[T] = {
+  private[core] def nullSafeMap[A, B >: Null : scala.reflect.ClassTag](arr: Array[A])(f: A => B): Array[B] = {
     if (arr == null) {
       null
     } else {
-      arr.map { v =>
-        if (v == null)
-          null
-        else
-          f(v)
-      }.asInstanceOf[Array[T]]
+      arr.map { v =>
+        if (v == null) null.asInstanceOf[B] else f(v)
+      }
     }
   }

86-110: Minor: reuse metadata locals when computing decimal schema.

You already have metadata; pass size/scale locals instead of calling rs.getMetaData again. Micro-optimization/readability.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 87dc1ac and 082fec5.

📒 Files selected for processing (1)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Coverage on Scala 2.12.18
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
🔇 Additional comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/impl/ResultSetToRowIterator.scala (1)

217-221: Confirm BIT(n>1) read-path via getBytes across target drivers.

BIT(n>1) is inferred as BLOB/BinaryType and read with getBytes; some drivers return textual bit strings. Please confirm for PG/HSQLDB.

Also applies to: 122-125, 93-99

@yruslan yruslan merged commit 34f1849 into main Aug 27, 2025
9 checks passed
@yruslan yruslan deleted the feature/626-add-jdbc-native-bin-and-array branch August 27, 2025 13:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add support for PostgreSQL type array of primitive types in JDBC Native

1 participant