-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-16005: [Java] Fix ArrayConsumer when using ArrowVectorIterator #12692
ARROW-16005: [Java] Fix ArrayConsumer when using ArrowVectorIterator #12692
Conversation
for (int i = 1; i <= consumers.length; i++) { | ||
ArrowType arrowType = config.getJdbcToArrowTypeConverter() | ||
.apply(new JdbcFieldInfo(resultSet.getMetaData(), i)); | ||
consumers[i - 1] = JdbcToArrowUtils.getConsumer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because ArrayConsumer
requires a FieldVector
to be passed, I've opted for lazily initialising the consumers after the first VectorSchemaRoot
is created.
https://github.com/apache/arrow/pull/12692/files#diff-f812c76a565e7c56500943f512b8498487209b15ed036d404d703854841df3d0R152 will update the vector in the consumer on subsequent iterations.
@@ -90,13 +97,12 @@ public void consume(ResultSet resultSet) throws SQLException, IOException { | |||
int count = 0; | |||
try (ResultSet rs = array.getResultSet()) { | |||
while (rs.next()) { | |||
ensureInnerVectorCapacity(innerVectorIndex + count + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't work out the value of this innerVectorIndex
? It didn't seem to ever be reset but nor did it seem to be used when consuming results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because of how a ListVector is laid out in memory. The list [[1, 2], [], [3, 4, 5]]
is represented as the child vector [1, 2, 3, 4, 5]
and the offsets [0, 2, 2, 5]
. ensureInnerVectorCapacity
is resizing the child vector, so when we call consume
for the last element, we want to ensure the child vector has enough capacity for the current elements, along with all the previous elements, and it looks like that's what innerVectorIndex
is tracking.
In other words when we call consume
for what will be [3, 4, 5]
we need to ensure the child vector has space for at least 3, 4, 5, … elements not 1, 2, 3… elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still not entirely convinced we can remove innerVectorIndex
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reverted the change to innerVectorIndex
. Only thing I've done is reset it to 0 when we reset inner vector.
I've added unit tests for Arrays which covers the two issues when reusing VectorSchemaRoot
:
- NPE error from
ArrowVectorIterator
. ArrayConsumer
not reseting the delegate consumer.
public void resetValueVector(ListVector vector) { | ||
super.resetValueVector(vector); | ||
|
||
FieldVector childVector = vector.getDataVector(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When VectorSchemaRoot
is reused in ArrowVectorIterator
, we currently hit the issue that the currentIndex
here is reset to 0
but is never updated in the delegate consumer. As such, subsequent iterations will result in null
array values because the ListVector
(and data vector) is reset https://github.com/apache/arrow/pull/12692/files#diff-f812c76a565e7c56500943f512b8498487209b15ed036d404d703854841df3d0R150.
For example, if you have a batch size of 2 and a ResultSet
with 4 rows, the second iteration will be writing values into index 0 and 1 in the ListVector
but the offsets for those in the data vector will be pointing at null values (because it was reset) and the values written to the data vector will be at larger indexes.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this!
Is there a unit test we can add to cover this?
return root; | ||
} | ||
|
||
private void ensureInitialized(VectorSchemaRoot root) throws SQLException { | ||
if (!initialized) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this isn't right if !config.isReuseVectorSchemaRoot()
because we'll have recreated the root. Since ensureInitialized
is only ever called when creating a new root, I don't think we need to guard this with initialized
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes good point!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it's fine as its currently written. After creating a new VectorSchemaRoot
we'll call load
and this will call JdbcConsumer#resetValueVector
passing the vectors of the new VectorSchemaRoot
and obtaining a reference to them (https://github.com/apache/arrow/pull/12692/files#diff-f812c76a565e7c56500943f512b8498487209b15ed036d404d703854841df3d0R161). This will happen before we consume data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually sorry I see we have to recreate the delegate consumer as it references the child vector so it does make sense to initialize each time.
@@ -90,13 +97,12 @@ public void consume(ResultSet resultSet) throws SQLException, IOException { | |||
int count = 0; | |||
try (ResultSet rs = array.getResultSet()) { | |||
while (rs.next()) { | |||
ensureInnerVectorCapacity(innerVectorIndex + count + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because of how a ListVector is laid out in memory. The list [[1, 2], [], [3, 4, 5]]
is represented as the child vector [1, 2, 3, 4, 5]
and the offsets [0, 2, 2, 5]
. ensureInnerVectorCapacity
is resizing the child vector, so when we call consume
for the last element, we want to ensure the child vector has enough capacity for the current elements, along with all the previous elements, and it looks like that's what innerVectorIndex
is tracking.
In other words when we call consume
for what will be [3, 4, 5]
we need to ensure the child vector has space for at least 3, 4, 5, … elements not 1, 2, 3… elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for plugging away at this. As mentioned it would be good to see a unit test to cover this too (I guess with both reusing/not reusing the VectorSchemaRoot)
@toddfarmer would you like to take a glance here as well?
@@ -90,13 +97,12 @@ public void consume(ResultSet resultSet) throws SQLException, IOException { | |||
int count = 0; | |||
try (ResultSet rs = array.getResultSet()) { | |||
while (rs.next()) { | |||
ensureInnerVectorCapacity(innerVectorIndex + count + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still not entirely convinced we can remove innerVectorIndex
here?
I've updated the unit tests for |
@@ -204,6 +219,11 @@ public void setRowCount(int rowCount) { | |||
this.rowCount = rowCount; | |||
} | |||
|
|||
@Override | |||
public String toString() { | |||
return "Table{name='" + name + "', type='" + type + "'}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have added this for debugging purposes, helpful in the ParameterizedTest
to easily see which test file is failing. I updated the table names to match the YML file name.
"[101, 102, 103]", | ||
"[104, null, null]", | ||
"[107, 108, 109]", | ||
"[110]" | ||
}; | ||
String[] expectedArrayColValues = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this test case for arrays specifically to test the fix to ArrayConsumer
when resetting the delegate. Without the reset you end up reading null values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay.
Thanks for adding the tests and refactoring everything! I left some minor questions but I think this is ready.
} | ||
} | ||
return valueArr; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like this could be reused from Table.getListValues? Especially as this seems to also handle nulls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored so they can be reused
@@ -119,7 +134,10 @@ public void testVectorSchemaRootReuse() throws SQLException, IOException { | |||
assertNotNull(cur); | |||
|
|||
// verify the first column, with may contain nulls. | |||
assertEquals(expectedColValues[batchCount], cur.getVector(0).toString()); | |||
assertEquals(expectedIntColValues[batchCount], cur.getVector(0).toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a better way to assert equality than the string representation (perhaps with getObject as done elsewhere)? This is a little brittle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched to use the assertIntVectorValues
and assertListVectorValues
@lidavidm this should be good to merge |
Fixes https://issues.apache.org/jira/browse/ARROW-16005.