From b11dcb6578b33b20e43a3e1c3d89b9bd26cc2b92 Mon Sep 17 00:00:00 2001 From: Au_Miner <358671982@qq.com> Date: Wed, 20 May 2026 19:27:40 +0800 Subject: [PATCH] [FLINK-36561][table] Fix ResultSet.wasNull() not reflecting per-column null values in JDBC driver --- .../flink/table/jdbc/FlinkResultSet.java | 32 ++++++---- .../flink/table/jdbc/FlinkResultSetTest.java | 62 +++++++++++++++++++ 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java index 4735ce4669af1..52b043062c28a 100644 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java @@ -168,8 +168,9 @@ public String getString(int columnIndex) throws SQLException { checkValidColumn(columnIndex); StringData stringData = currentRow.getString(columnIndex - 1); + wasNull = stringData == null; try { - return stringData == null ? null : stringData.toString(); + return wasNull ? null : stringData.toString(); } catch (Exception e) { throw new SQLDataException(e); } @@ -180,8 +181,9 @@ public boolean getBoolean(int columnIndex) throws SQLException { checkClosed(); checkValidRow(); checkValidColumn(columnIndex); + wasNull = currentRow.isNullAt(columnIndex - 1); try { - return !currentRow.isNullAt(columnIndex - 1) && currentRow.getBoolean(columnIndex - 1); + return !wasNull && currentRow.getBoolean(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -192,8 +194,9 @@ public byte getByte(int columnIndex) throws SQLException { checkClosed(); checkValidRow(); checkValidColumn(columnIndex); + wasNull = currentRow.isNullAt(columnIndex - 1); try { - return currentRow.isNullAt(columnIndex - 1) ? 0 : currentRow.getByte(columnIndex - 1); + return wasNull ? 0 : currentRow.getByte(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -204,8 +207,9 @@ public short getShort(int columnIndex) throws SQLException { checkClosed(); checkValidRow(); checkValidColumn(columnIndex); + wasNull = currentRow.isNullAt(columnIndex - 1); try { - return currentRow.isNullAt(columnIndex - 1) ? 0 : currentRow.getShort(columnIndex - 1); + return wasNull ? 0 : currentRow.getShort(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -216,8 +220,9 @@ public int getInt(int columnIndex) throws SQLException { checkClosed(); checkValidRow(); checkValidColumn(columnIndex); + wasNull = currentRow.isNullAt(columnIndex - 1); try { - return currentRow.isNullAt(columnIndex - 1) ? 0 : currentRow.getInt(columnIndex - 1); + return wasNull ? 0 : currentRow.getInt(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -228,9 +233,9 @@ public long getLong(int columnIndex) throws SQLException { checkClosed(); checkValidRow(); checkValidColumn(columnIndex); - + wasNull = currentRow.isNullAt(columnIndex - 1); try { - return currentRow.isNullAt(columnIndex - 1) ? 0L : currentRow.getLong(columnIndex - 1); + return wasNull ? 0L : currentRow.getLong(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -241,8 +246,9 @@ public float getFloat(int columnIndex) throws SQLException { checkClosed(); checkValidRow(); checkValidColumn(columnIndex); + wasNull = currentRow.isNullAt(columnIndex - 1); try { - return currentRow.isNullAt(columnIndex - 1) ? 0 : currentRow.getFloat(columnIndex - 1); + return wasNull ? 0 : currentRow.getFloat(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -253,8 +259,9 @@ public double getDouble(int columnIndex) throws SQLException { checkClosed(); checkValidRow(); checkValidColumn(columnIndex); + wasNull = currentRow.isNullAt(columnIndex - 1); try { - return currentRow.isNullAt(columnIndex - 1) ? 0 : currentRow.getDouble(columnIndex - 1); + return wasNull ? 0 : currentRow.getDouble(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -270,8 +277,9 @@ public byte[] getBytes(int columnIndex) throws SQLException { checkClosed(); checkValidRow(); checkValidColumn(columnIndex); + wasNull = currentRow.isNullAt(columnIndex - 1); try { - return currentRow.getBinary(columnIndex - 1); + return wasNull ? null : currentRow.getBinary(columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -377,6 +385,7 @@ public Object getObject(int columnIndex) throws SQLException { checkValidColumn(columnIndex); try { Object object = fieldGetterList.get(columnIndex - 1).getFieldOrNull(currentRow); + wasNull = object == null; DataType dataType = dataTypeList.get(columnIndex - 1); return convertToJavaObject(object, dataType.getLogicalType()); } catch (Exception e) { @@ -486,8 +495,9 @@ public BigDecimal getBigDecimal(int columnIndex) throws SQLException { dataType.getLogicalType().getClass().getSimpleName())); } DecimalType decimalType = (DecimalType) dataType.getLogicalType(); + wasNull = currentRow.isNullAt(columnIndex - 1); try { - return currentRow.isNullAt(columnIndex - 1) + return wasNull ? null : currentRow .getDecimal( diff --git a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java index a1dafcaa9c191..9423761803834 100644 --- a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java +++ b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java @@ -45,6 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -111,6 +112,67 @@ public void testResultSetPrimitiveData() throws Exception { } } + @Test + public void testWasNullReflectsPerColumnNullness() throws Exception { + Map map = new HashMap<>(); + Map valueMap = new HashMap<>(); + valueMap.put(1, 1L); + map.put(StringData.fromString("1"), new GenericMapData(valueMap)); + RowData mixedRow = + GenericRowData.of( + true, + null, + (short) 2, + null, + 4L, + null, + 6.0d, + null, + StringData.fromString("v"), + null, + new GenericMapData(map)); + CloseableIterator data = + CloseableIterator.adapterForIterator( + Collections.singletonList(mixedRow).iterator()); + try (ResultSet resultSet = + new FlinkResultSet( + new TestingStatement(), + new StatementResult( + SCHEMA, data, true, ResultKind.SUCCESS, JobID.generate()))) { + assertTrue(resultSet.next()); + + assertTrue(resultSet.getBoolean(1)); + assertFalse(resultSet.wasNull()); + resultSet.getByte(2); + assertTrue(resultSet.wasNull()); + assertEquals((short) 2, resultSet.getShort(3)); + assertFalse(resultSet.wasNull()); + resultSet.getInt(4); + assertTrue(resultSet.wasNull()); + assertEquals(4L, resultSet.getLong(5)); + assertFalse(resultSet.wasNull()); + resultSet.getFloat(6); + assertTrue(resultSet.wasNull()); + assertEquals(6.0d, resultSet.getDouble(7)); + assertFalse(resultSet.wasNull()); + assertNull(resultSet.getBigDecimal(8)); + assertTrue(resultSet.wasNull()); + assertEquals("v", resultSet.getString(9)); + assertFalse(resultSet.wasNull()); + assertNull(resultSet.getBytes(10)); + assertTrue(resultSet.wasNull()); + assertNotNull(resultSet.getObject(11)); + assertFalse(resultSet.wasNull()); + + assertNull(resultSet.getString(2)); + assertTrue(resultSet.wasNull()); + assertNull(resultSet.getObject(4)); + assertTrue(resultSet.wasNull()); + assertEquals(4L, resultSet.getLong(5)); + assertFalse(resultSet.wasNull()); + } + } + @Test public void testStringResultSetNullData() throws Exception { CloseableIterator data =