Search before asking
Fluss version
0.9.0 (latest release)
Please describe the bug 🐞
In org.apache.fluss.flink.sink.FlinkComplexTypeITCase#testComplexTypesInLogTable, change from tEnv.executeSql("select * from complex_log_test").collect() to tEnv.executeSql("select * from complex_log_test limit 10").collect() , exception will occurs:
Caused by: java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0))
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.chk(ArrowBuf.java:306)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.getByte(ArrowBuf.java:508)
at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseFixedWidthVector.isSet(BaseFixedWidthVector.java:826)
at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseFixedWidthVector.isNull(BaseFixedWidthVector.java:815)
at org.apache.fluss.row.arrow.vectors.ArrowIntColumnVector.isNullAt(ArrowIntColumnVector.java:43)
at org.apache.fluss.row.columnar.ColumnarArray.isNullAt(ColumnarArray.java:55)
at org.apache.fluss.row.InternalArray.lambda$createElementGetter$530a4d25$1(InternalArray.java:157)
at org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter.lambda$createInternalConverter$500d942e$1(FlussRowToFlinkRowConverter.java:164)
at org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter.lambda$wrapIntoNullableInternalConverter$266ab4eb$1(FlussRowToFlinkRowConverter.java:94)
at org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter.toFlinkRowData(FlussRowToFlinkRowConverter.java:75)
at org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter.toFlinkRowData(FlussRowToFlinkRowConverter.java:67)
at org.apache.fluss.flink.utils.PushdownUtils.limitScan(PushdownUtils.java:325)
... 89 more
Root Cause
collectRows(batchScanner) collects all InternalRow objects into a list first, then converts them to Flink rows in a separate loop. For Arrow format, these InternalRow objects are ColumnarRow backed by Arrow vectors:
- Simple types (int, string):
FieldGetter.getFieldOrNull() copies the value — safe.
- Complex types (array, map, row):
getFieldOrNull() returns a lazy view (ColumnarArray, ColumnarMap) that still references the underlying Arrow FieldVector buffers.
When pollBatch() transitions to the next batch, VectorSchemaRoot.clear() deallocates all Arrow buffers (capacity → 0). All previously collected ColumnarArray references now point to empty buffers, causing IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0)).
Solution
Convert each row to Flink format immediately during iteration (before the batch transition clears the Arrow buffers). The FlussRowToFlinkRowConverter.toFlinkRowData() deep-copies all values (including array elements) into Flink's GenericRowData/GenericArrayData, so the resulting Flink objects are fully materialized and independent of Arrow buffers.
Are you willing to submit a PR?
Search before asking
Fluss version
0.9.0 (latest release)
Please describe the bug 🐞
In org.apache.fluss.flink.sink.FlinkComplexTypeITCase#testComplexTypesInLogTable, change from
tEnv.executeSql("select * from complex_log_test").collect()totEnv.executeSql("select * from complex_log_test limit 10").collect(), exception will occurs:Root Cause
collectRows(batchScanner)collects allInternalRowobjects into a list first, then converts them to Flink rows in a separate loop. For Arrow format, theseInternalRowobjects areColumnarRowbacked by Arrow vectors:FieldGetter.getFieldOrNull()copies the value — safe.getFieldOrNull()returns a lazy view (ColumnarArray,ColumnarMap) that still references the underlying ArrowFieldVectorbuffers.When
pollBatch()transitions to the next batch,VectorSchemaRoot.clear()deallocates all Arrow buffers (capacity → 0). All previously collectedColumnarArrayreferences now point to empty buffers, causingIndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0)).Solution
Convert each row to Flink format immediately during iteration (before the batch transition clears the Arrow buffers). The
FlussRowToFlinkRowConverter.toFlinkRowData()deep-copies all values (including array elements) into Flink'sGenericRowData/GenericArrayData, so the resulting Flink objects are fully materialized and independent of Arrow buffers.Are you willing to submit a PR?