From 5a6ef179d017761a25f5990b44264fe597b97ad8 Mon Sep 17 00:00:00 2001 From: John-W-Lewis Date: Thu, 14 May 2026 12:20:37 +0100 Subject: [PATCH 1/2] Add VectorOps utility for type-independent vector copy operations Provides shareCopy (shared memory), transferCopy (move ownership), and deepCopy (independent clone) for both FieldVector and VectorSchemaRoot, implemented purely via getFieldBuffers/loadFieldBuffers without depending on TransferPair. --- .../apache/arrow/vector/util/VectorOps.java | 300 ++++++++++++++++ .../arrow/vector/util/TestVectorOps.java | 324 ++++++++++++++++++ 2 files changed, 624 insertions(+) create mode 100644 vector/src/main/java/org/apache/arrow/vector/util/VectorOps.java create mode 100644 vector/src/test/java/org/apache/arrow/vector/util/TestVectorOps.java diff --git a/vector/src/main/java/org/apache/arrow/vector/util/VectorOps.java b/vector/src/main/java/org/apache/arrow/vector/util/VectorOps.java new file mode 100644 index 000000000..d3b931951 --- /dev/null +++ b/vector/src/main/java/org/apache/arrow/vector/util/VectorOps.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; + + +/** + * Generic utility operations for creating new vectors and roots from existing ones, without + * requiring per-type implementations. These operations work by manipulating whole buffers via {@link + * FieldVector#getFieldBuffers()} and {@link FieldVector#loadFieldBuffers}, rather than + * type-specific internal logic. + * + *

Three modes of creation are provided: + * + *

+ */ +public final class VectorOps { + + private VectorOps() {} + + /** + * Create a new vector sharing the same underlying memory as the source. Reference counts are + * incremented so that the memory is only released when all sharing vectors have been closed. The + * source vector is not modified. + * + *

Uses each source vector's own allocator for the target. + * + * @param source the vector to share from + * @param the vector type + * @return a new vector sharing the same underlying memory + */ + @SuppressWarnings("unchecked") + public static V shareCopy(V source) { + return (V) shareCopy(source, source.getAllocator()); + } + + /** + * Create a new vector sharing the same underlying memory as the source, associated with the given + * allocator. Reference counts are incremented so that the memory is only released when all sharing + * vectors have been closed. The source vector is not modified. + * + * @param source the vector to share from + * @param allocator the allocator for the new vector + * @param the vector type + * @return a new vector sharing the same underlying memory + */ + @SuppressWarnings("unchecked") + public static V shareCopy(V source, BufferAllocator allocator) { + FieldVector target = source.getField().createVector(allocator); + shareCopyInto(source, target); + return (V) target; + } + + /** + * Create a new VectorSchemaRoot sharing the same underlying memory as the source. Reference + * counts are incremented so that the memory is only released when all sharing roots have been + * closed. The source root is not modified. + * + *

Uses each source vector's own allocator for its corresponding target vector. + * + * @param source the root to share from + * @return a new root sharing the same underlying memory + */ + public static VectorSchemaRoot shareCopy(VectorSchemaRoot source) { + List sharedVectors = + source.getFieldVectors().stream() + .map(v -> shareCopy(v)) + .collect(Collectors.toList()); + VectorSchemaRoot result = new VectorSchemaRoot(sharedVectors); + result.setRowCount(source.getRowCount()); + return result; + } + + /** + * Create a new VectorSchemaRoot sharing the same underlying memory as the source, with all + * vectors associated with the given allocator. + * + * @param source the root to share from + * @param allocator the allocator for all vectors in the new root + * @return a new root sharing the same underlying memory + */ + public static VectorSchemaRoot shareCopy(VectorSchemaRoot source, BufferAllocator allocator) { + List sharedVectors = + source.getFieldVectors().stream() + .map(v -> shareCopy(v, allocator)) + .collect(Collectors.toList()); + VectorSchemaRoot result = new VectorSchemaRoot(sharedVectors); + result.setRowCount(source.getRowCount()); + return result; + } + + /** + * Create a new vector by transferring buffer ownership from the source. The source is left with + * empty buffers and can be reused via {@code allocateNew()}. + * + *

Uses each source vector's own allocator for the target. + * + * @param source the vector to transfer from + * @param the vector type + * @return a new vector owning the transferred buffers + */ + @SuppressWarnings("unchecked") + public static V transferCopy(V source) { + return (V) transferCopy(source, source.getAllocator()); + } + + /** + * Create a new vector by transferring buffer ownership from the source, associated with the given + * allocator. The source is left with empty buffers and can be reused via {@code allocateNew()}. + * + * @param source the vector to transfer from + * @param allocator the allocator for the new vector + * @param the vector type + * @return a new vector owning the transferred buffers + */ + @SuppressWarnings("unchecked") + public static V transferCopy(V source, BufferAllocator allocator) { + FieldVector target = source.getField().createVector(allocator); + transferCopyInto(source, target); + return (V) target; + } + + /** + * Create a new VectorSchemaRoot by transferring buffer ownership from the source. The source + * root's vectors are left with empty buffers and can be reused via {@code allocateNew()}. + * + *

Uses each source vector's own allocator for its corresponding target vector. + * + * @param source the root to transfer from + * @return a new root owning the transferred buffers + */ + public static VectorSchemaRoot transferCopy(VectorSchemaRoot source) { + List transferredVectors = + source.getFieldVectors().stream() + .map(v -> transferCopy(v)) + .collect(Collectors.toList()); + VectorSchemaRoot result = new VectorSchemaRoot(transferredVectors); + result.setRowCount(source.getRowCount()); + return result; + } + + /** + * Create a new VectorSchemaRoot by transferring buffer ownership from the source, with all + * vectors associated with the given allocator. + * + * @param source the root to transfer from + * @param allocator the allocator for all vectors in the new root + * @return a new root owning the transferred buffers + */ + public static VectorSchemaRoot transferCopy(VectorSchemaRoot source, BufferAllocator allocator) { + List transferredVectors = + source.getFieldVectors().stream() + .map(v -> transferCopy(v, allocator)) + .collect(Collectors.toList()); + VectorSchemaRoot result = new VectorSchemaRoot(transferredVectors); + result.setRowCount(source.getRowCount()); + return result; + } + + /** + * Create a new vector with an independent deep copy of the source data. Both source and result + * are fully independent after this operation. + * + *

Uses each source vector's own allocator for the target. + * + * @param source the vector to copy + * @param the vector type + * @return a new vector with independent copies of all buffers + */ + @SuppressWarnings("unchecked") + public static V deepCopy(V source) { + return (V) deepCopy(source, source.getAllocator()); + } + + /** + * Create a new vector with an independent deep copy of the source data, associated with the given + * allocator. Both source and result are fully independent after this operation. + * + * @param source the vector to copy + * @param allocator the allocator for the new vector + * @param the vector type + * @return a new vector with independent copies of all buffers + */ + @SuppressWarnings("unchecked") + public static V deepCopy(V source, BufferAllocator allocator) { + FieldVector target = source.getField().createVector(allocator); + deepCopyInto(source, target); + return (V) target; + } + + /** + * Create a new VectorSchemaRoot with an independent deep copy of the source data. Both source and + * result are fully independent after this operation. + * + *

Uses each source vector's own allocator for its corresponding target vector. + * + * @param source the root to copy + * @return a new root with independent copies of all data + */ + public static VectorSchemaRoot deepCopy(VectorSchemaRoot source) { + List copiedVectors = + source.getFieldVectors().stream() + .map(v -> deepCopy(v)) + .collect(Collectors.toList()); + VectorSchemaRoot result = new VectorSchemaRoot(copiedVectors); + result.setRowCount(source.getRowCount()); + return result; + } + + /** + * Create a new VectorSchemaRoot with an independent deep copy of the source data, with all + * vectors associated with the given allocator. + * + * @param source the root to copy + * @param allocator the allocator for all vectors in the new root + * @return a new root with independent copies of all data + */ + public static VectorSchemaRoot deepCopy(VectorSchemaRoot source, BufferAllocator allocator) { + List copiedVectors = + source.getFieldVectors().stream() + .map(v -> deepCopy(v, allocator)) + .collect(Collectors.toList()); + VectorSchemaRoot result = new VectorSchemaRoot(copiedVectors); + result.setRowCount(source.getRowCount()); + return result; + } + + private static void shareCopyInto(FieldVector source, FieldVector target) { + List sourceBuffers = source.getFieldBuffers(); + ArrowFieldNode node = + new ArrowFieldNode(source.getValueCount(), source.getNullCount()); + target.loadFieldBuffers(node, sourceBuffers); + + List sourceChildren = source.getChildrenFromFields(); + List targetChildren = target.getChildrenFromFields(); + for (int i = 0; i < sourceChildren.size(); i++) { + shareCopyInto(sourceChildren.get(i), targetChildren.get(i)); + } + } + + private static void transferCopyInto(FieldVector source, FieldVector target) { + shareCopyInto(source, target); + source.clear(); + } + + private static void deepCopyInto(FieldVector source, FieldVector target) { + List sourceBuffers = source.getFieldBuffers(); + BufferAllocator targetAllocator = target.getAllocator(); + List copiedBuffers = new ArrayList<>(sourceBuffers.size()); + for (ArrowBuf buf : sourceBuffers) { + long size = buf.readableBytes(); + ArrowBuf copy = targetAllocator.buffer(size); + copy.setBytes(0, buf, 0, size); + copy.writerIndex(size); + copiedBuffers.add(copy); + } + ArrowFieldNode node = + new ArrowFieldNode(source.getValueCount(), source.getNullCount()); + target.loadFieldBuffers(node, copiedBuffers); + for (ArrowBuf buf : copiedBuffers) { + buf.close(); + } + + List sourceChildren = source.getChildrenFromFields(); + List targetChildren = target.getChildrenFromFields(); + for (int i = 0; i < sourceChildren.size(); i++) { + deepCopyInto(sourceChildren.get(i), targetChildren.get(i)); + } + } +} diff --git a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorOps.java b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorOps.java new file mode 100644 index 000000000..1dff8ef87 --- /dev/null +++ b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorOps.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestVectorOps { + + private BufferAllocator allocator; + + @BeforeEach + public void init() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @AfterEach + public void tearDown() { + allocator.close(); + } + + @Test + public void testShareCopyVector() { + try (IntVector source = new IntVector("ints", allocator)) { + source.allocateNew(4); + source.set(0, 10); + source.set(1, 20); + source.setNull(2); + source.set(3, 40); + source.setValueCount(4); + + try (IntVector shared = VectorOps.shareCopy(source)) { + assertEquals(4, shared.getValueCount()); + assertEquals(10, shared.get(0)); + assertEquals(20, shared.get(1)); + assertTrue(shared.isNull(2)); + assertEquals(40, shared.get(3)); + + // Source is unmodified + assertEquals(4, source.getValueCount()); + assertEquals(10, source.get(0)); + } + // Source still usable after shared copy is closed + assertEquals(10, source.get(0)); + } + } + + @Test + public void testShareCopyVectorWithAllocator() { + try (BufferAllocator childAlloc = allocator.newChildAllocator("child", 0, Long.MAX_VALUE); + IntVector source = new IntVector("ints", allocator)) { + source.allocateNew(2); + source.set(0, 100); + source.set(1, 200); + source.setValueCount(2); + + try (IntVector shared = VectorOps.shareCopy(source, childAlloc)) { + assertEquals(childAlloc, shared.getAllocator()); + assertEquals(2, shared.getValueCount()); + assertEquals(100, shared.get(0)); + assertEquals(200, shared.get(1)); + } + } + } + + @Test + public void testShareCopyVectorSchemaRoot() { + try (IntVector intVec = new IntVector("ints", allocator); + VarCharVector strVec = new VarCharVector("strings", allocator)) { + intVec.allocateNew(3); + intVec.set(0, 1); + intVec.set(1, 2); + intVec.set(2, 3); + intVec.setValueCount(3); + + strVec.allocateNew(3); + strVec.set(0, "hello".getBytes()); + strVec.set(1, "world".getBytes()); + strVec.set(2, "!".getBytes()); + strVec.setValueCount(3); + + VectorSchemaRoot source = + new VectorSchemaRoot(Arrays.asList((FieldVector) intVec, (FieldVector) strVec)); + source.setRowCount(3); + + try (VectorSchemaRoot shared = VectorOps.shareCopy(source)) { + assertEquals(3, shared.getRowCount()); + assertEquals(2, shared.getFieldVectors().size()); + + IntVector sharedInts = (IntVector) shared.getVector("ints"); + assertEquals(1, sharedInts.get(0)); + assertEquals(2, sharedInts.get(1)); + assertEquals(3, sharedInts.get(2)); + + VarCharVector sharedStrs = (VarCharVector) shared.getVector("strings"); + assertEquals("hello", new String(sharedStrs.get(0))); + assertEquals("world", new String(sharedStrs.get(1))); + assertEquals("!", new String(sharedStrs.get(2))); + + // Source still readable + assertEquals(1, intVec.get(0)); + } + source.close(); + } + } + + @Test + public void testTransferCopyVector() { + try (IntVector source = new IntVector("ints", allocator)) { + source.allocateNew(3); + source.set(0, 100); + source.set(1, 200); + source.set(2, 300); + source.setValueCount(3); + + try (IntVector transferred = VectorOps.transferCopy(source)) { + assertEquals(3, transferred.getValueCount()); + assertEquals(100, transferred.get(0)); + assertEquals(200, transferred.get(1)); + assertEquals(300, transferred.get(2)); + + // Source is emptied + assertEquals(0, source.getValueCount()); + } + } + } + + @Test + public void testTransferCopyVectorWithAllocator() { + try (BufferAllocator childAlloc = allocator.newChildAllocator("child", 0, Long.MAX_VALUE); + IntVector source = new IntVector("ints", allocator)) { + source.allocateNew(2); + source.set(0, 42); + source.set(1, 99); + source.setValueCount(2); + + try (IntVector transferred = VectorOps.transferCopy(source, childAlloc)) { + assertEquals(childAlloc, transferred.getAllocator()); + assertEquals(2, transferred.getValueCount()); + assertEquals(42, transferred.get(0)); + assertEquals(99, transferred.get(1)); + } + } + } + + @Test + public void testTransferCopyVectorSchemaRoot() { + IntVector intVec = new IntVector("ints", allocator); + intVec.allocateNew(2); + intVec.set(0, 7); + intVec.set(1, 8); + intVec.setValueCount(2); + + VectorSchemaRoot source = + new VectorSchemaRoot(Arrays.asList((FieldVector) intVec)); + source.setRowCount(2); + + try (VectorSchemaRoot transferred = VectorOps.transferCopy(source)) { + assertEquals(2, transferred.getRowCount()); + IntVector transferredInts = (IntVector) transferred.getVector("ints"); + assertEquals(7, transferredInts.get(0)); + assertEquals(8, transferredInts.get(1)); + + // Source vectors are emptied + assertEquals(0, intVec.getValueCount()); + } + source.close(); + } + + @Test + public void testDeepCopyVector() { + try (IntVector source = new IntVector("ints", allocator)) { + source.allocateNew(3); + source.set(0, 11); + source.set(1, 22); + source.set(2, 33); + source.setValueCount(3); + + try (IntVector copied = VectorOps.deepCopy(source)) { + assertEquals(3, copied.getValueCount()); + assertEquals(11, copied.get(0)); + assertEquals(22, copied.get(1)); + assertEquals(33, copied.get(2)); + + // Source is unmodified + assertEquals(3, source.getValueCount()); + assertEquals(11, source.get(0)); + } + // Source still usable after copy is closed + assertEquals(11, source.get(0)); + } + } + + @Test + public void testDeepCopyVectorIndependence() { + try (IntVector source = new IntVector("ints", allocator); + IntVector copied = VectorOps.deepCopy(source)) { + source.allocateNew(2); + source.set(0, 50); + source.set(1, 60); + source.setValueCount(2); + + // Modifying source after deep copy doesn't affect the copy + // (copy was made when source was empty, so it should be empty) + assertEquals(0, copied.getValueCount()); + } + } + + @Test + public void testDeepCopyVectorWithAllocator() { + try (BufferAllocator childAlloc = allocator.newChildAllocator("child", 0, Long.MAX_VALUE); + IntVector source = new IntVector("ints", allocator)) { + source.allocateNew(2); + source.set(0, 77); + source.set(1, 88); + source.setValueCount(2); + + try (IntVector copied = VectorOps.deepCopy(source, childAlloc)) { + assertEquals(childAlloc, copied.getAllocator()); + assertEquals(2, copied.getValueCount()); + assertEquals(77, copied.get(0)); + assertEquals(88, copied.get(1)); + } + } + } + + @Test + public void testDeepCopyVectorSchemaRoot() { + try (IntVector intVec = new IntVector("ints", allocator); + VarCharVector strVec = new VarCharVector("strings", allocator)) { + intVec.allocateNew(2); + intVec.set(0, 5); + intVec.set(1, 6); + intVec.setValueCount(2); + + strVec.allocateNew(2); + strVec.set(0, "foo".getBytes()); + strVec.set(1, "bar".getBytes()); + strVec.setValueCount(2); + + VectorSchemaRoot source = + new VectorSchemaRoot(Arrays.asList((FieldVector) intVec, (FieldVector) strVec)); + source.setRowCount(2); + + try (VectorSchemaRoot copied = VectorOps.deepCopy(source)) { + assertEquals(2, copied.getRowCount()); + IntVector copiedInts = (IntVector) copied.getVector("ints"); + assertEquals(5, copiedInts.get(0)); + assertEquals(6, copiedInts.get(1)); + + VarCharVector copiedStrs = (VarCharVector) copied.getVector("strings"); + assertEquals("foo", new String(copiedStrs.get(0))); + assertEquals("bar", new String(copiedStrs.get(1))); + + // Source still intact + assertEquals(5, intVec.get(0)); + } + source.close(); + } + } + + @Test + public void testShareCopyVarCharVector() { + try (VarCharVector source = new VarCharVector("strs", allocator)) { + source.allocateNew(3); + source.set(0, "alpha".getBytes()); + source.set(1, "beta".getBytes()); + source.set(2, "gamma".getBytes()); + source.setValueCount(3); + + try (VarCharVector shared = VectorOps.shareCopy(source)) { + assertEquals(3, shared.getValueCount()); + assertEquals("alpha", new String(shared.get(0))); + assertEquals("beta", new String(shared.get(1))); + assertEquals("gamma", new String(shared.get(2))); + } + // Source still valid + assertEquals("alpha", new String(source.get(0))); + } + } + + @Test + public void testShareCopySourceClosedFirst() { + IntVector shared; + try (IntVector source = new IntVector("ints", allocator)) { + source.allocateNew(2); + source.set(0, 999); + source.set(1, 888); + source.setValueCount(2); + + shared = VectorOps.shareCopy(source); + } + // Source is now closed; shared copy should still be readable + assertEquals(2, shared.getValueCount()); + assertEquals(999, shared.get(0)); + assertEquals(888, shared.get(1)); + shared.close(); + } +} From c4f564624ad722e1528173e36e0a6ecbb9aa54dc Mon Sep 17 00:00:00 2001 From: John-W-Lewis Date: Mon, 18 May 2026 10:30:13 +0100 Subject: [PATCH 2/2] Fix addVector/removeVector using VectorOps.shareCopy for safe shared semantics Use the new VectorOps.shareCopy to fix the unsafe shared-reference bug in addVector/removeVector while preserving the original intended semantics: both source and result roots remain usable with the same data, and memory is only released when all sharing roots are closed. Also applies Spotless formatting fixes. Co-authored-by: Cursor --- .../apache/arrow/vector/VectorSchemaRoot.java | 27 +++--- .../apache/arrow/vector/util/VectorOps.java | 27 ++---- .../arrow/vector/TestVectorSchemaRoot.java | 85 +++++++++++++++++-- .../arrow/vector/util/TestVectorOps.java | 3 +- 4 files changed, 106 insertions(+), 36 deletions(-) diff --git a/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java b/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java index 4c1fbf761..6c9f93e5d 100644 --- a/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java +++ b/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java @@ -35,6 +35,7 @@ import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.TransferPair; +import org.apache.arrow.vector.util.VectorOps; /** * Holder for a set of vectors to be loaded/unloaded. A VectorSchemaRoot is a container that can @@ -193,6 +194,10 @@ public FieldVector getVector(int index) { /** * Add vector to the record batch, producing a new VectorSchemaRoot. * + *

The returned root shares the underlying memory of this root's vectors and the added vector + * via {@link VectorOps#shareCopy}. Both this root and the returned root remain usable; the + * underlying memory is only released when all sharing roots have been closed. + * * @param index field index * @param vector vector to be added. * @return out VectorSchemaRoot with vector added @@ -201,16 +206,14 @@ public VectorSchemaRoot addVector(int index, FieldVector vector) { Preconditions.checkNotNull(vector); Preconditions.checkArgument(index >= 0 && index <= fieldVectors.size()); List newVectors = new ArrayList<>(); - if (index == fieldVectors.size()) { - newVectors.addAll(fieldVectors); - newVectors.add(vector); - } else { - for (int i = 0; i < fieldVectors.size(); i++) { - if (i == index) { - newVectors.add(vector); - } - newVectors.add(fieldVectors.get(i)); + for (int i = 0; i < fieldVectors.size(); i++) { + if (i == index) { + newVectors.add(VectorOps.shareCopy(vector)); } + newVectors.add(VectorOps.shareCopy(fieldVectors.get(i))); + } + if (index == fieldVectors.size()) { + newVectors.add(VectorOps.shareCopy(vector)); } return new VectorSchemaRoot(newVectors); } @@ -218,6 +221,10 @@ public VectorSchemaRoot addVector(int index, FieldVector vector) { /** * Remove vector from the record batch, producing a new VectorSchemaRoot. * + *

The returned root shares the underlying memory of this root's retained vectors via {@link + * VectorOps#shareCopy}. Both this root and the returned root remain usable; the underlying memory + * is only released when all sharing roots have been closed. + * * @param index field index * @return out VectorSchemaRoot with vector removed */ @@ -226,7 +233,7 @@ public VectorSchemaRoot removeVector(int index) { List newVectors = new ArrayList<>(); for (int i = 0; i < fieldVectors.size(); i++) { if (i != index) { - newVectors.add(fieldVectors.get(i)); + newVectors.add(VectorOps.shareCopy(fieldVectors.get(i))); } } return new VectorSchemaRoot(newVectors); diff --git a/vector/src/main/java/org/apache/arrow/vector/util/VectorOps.java b/vector/src/main/java/org/apache/arrow/vector/util/VectorOps.java index d3b931951..c1c289fb0 100644 --- a/vector/src/main/java/org/apache/arrow/vector/util/VectorOps.java +++ b/vector/src/main/java/org/apache/arrow/vector/util/VectorOps.java @@ -25,11 +25,10 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; - /** * Generic utility operations for creating new vectors and roots from existing ones, without - * requiring per-type implementations. These operations work by manipulating whole buffers via {@link - * FieldVector#getFieldBuffers()} and {@link FieldVector#loadFieldBuffers}, rather than + * requiring per-type implementations. These operations work by manipulating whole buffers via + * {@link FieldVector#getFieldBuffers()} and {@link FieldVector#loadFieldBuffers}, rather than * type-specific internal logic. * *

Three modes of creation are provided: @@ -66,8 +65,8 @@ public static V shareCopy(V source) { /** * Create a new vector sharing the same underlying memory as the source, associated with the given - * allocator. Reference counts are incremented so that the memory is only released when all sharing - * vectors have been closed. The source vector is not modified. + * allocator. Reference counts are incremented so that the memory is only released when all + * sharing vectors have been closed. The source vector is not modified. * * @param source the vector to share from * @param allocator the allocator for the new vector @@ -93,9 +92,7 @@ public static V shareCopy(V source, BufferAllocator allo */ public static VectorSchemaRoot shareCopy(VectorSchemaRoot source) { List sharedVectors = - source.getFieldVectors().stream() - .map(v -> shareCopy(v)) - .collect(Collectors.toList()); + source.getFieldVectors().stream().map(v -> shareCopy(v)).collect(Collectors.toList()); VectorSchemaRoot result = new VectorSchemaRoot(sharedVectors); result.setRowCount(source.getRowCount()); return result; @@ -161,9 +158,7 @@ public static V transferCopy(V source, BufferAllocator a */ public static VectorSchemaRoot transferCopy(VectorSchemaRoot source) { List transferredVectors = - source.getFieldVectors().stream() - .map(v -> transferCopy(v)) - .collect(Collectors.toList()); + source.getFieldVectors().stream().map(v -> transferCopy(v)).collect(Collectors.toList()); VectorSchemaRoot result = new VectorSchemaRoot(transferredVectors); result.setRowCount(source.getRowCount()); return result; @@ -229,9 +224,7 @@ public static V deepCopy(V source, BufferAllocator alloc */ public static VectorSchemaRoot deepCopy(VectorSchemaRoot source) { List copiedVectors = - source.getFieldVectors().stream() - .map(v -> deepCopy(v)) - .collect(Collectors.toList()); + source.getFieldVectors().stream().map(v -> deepCopy(v)).collect(Collectors.toList()); VectorSchemaRoot result = new VectorSchemaRoot(copiedVectors); result.setRowCount(source.getRowCount()); return result; @@ -257,8 +250,7 @@ public static VectorSchemaRoot deepCopy(VectorSchemaRoot source, BufferAllocator private static void shareCopyInto(FieldVector source, FieldVector target) { List sourceBuffers = source.getFieldBuffers(); - ArrowFieldNode node = - new ArrowFieldNode(source.getValueCount(), source.getNullCount()); + ArrowFieldNode node = new ArrowFieldNode(source.getValueCount(), source.getNullCount()); target.loadFieldBuffers(node, sourceBuffers); List sourceChildren = source.getChildrenFromFields(); @@ -284,8 +276,7 @@ private static void deepCopyInto(FieldVector source, FieldVector target) { copy.writerIndex(size); copiedBuffers.add(copy); } - ArrowFieldNode node = - new ArrowFieldNode(source.getValueCount(), source.getNullCount()); + ArrowFieldNode node = new ArrowFieldNode(source.getValueCount(), source.getNullCount()); target.loadFieldBuffers(node, copiedBuffers); for (ArrowBuf buf : copiedBuffers) { buf.close(); diff --git a/vector/src/test/java/org/apache/arrow/vector/TestVectorSchemaRoot.java b/vector/src/test/java/org/apache/arrow/vector/TestVectorSchemaRoot.java index bd3113f8b..6bbc15c8d 100644 --- a/vector/src/test/java/org/apache/arrow/vector/TestVectorSchemaRoot.java +++ b/vector/src/test/java/org/apache/arrow/vector/TestVectorSchemaRoot.java @@ -164,7 +164,7 @@ public void testAddVector() { VectorSchemaRoot newRecordBatch = original.addVector(1, intVector3); assertEquals(3, newRecordBatch.getFieldVectors().size()); - assertEquals(intVector3, newRecordBatch.getFieldVectors().get(1)); + assertEquals(intVector3.getField(), newRecordBatch.getFieldVectors().get(1).getField()); original.close(); newRecordBatch.close(); @@ -182,9 +182,9 @@ public void testAddVectorAtEnd() { VectorSchemaRoot newRecordBatch = original.addVector(2, intVector3); assertEquals(3, newRecordBatch.getFieldVectors().size()); - assertEquals(intVector1, newRecordBatch.getFieldVectors().get(0)); - assertEquals(intVector2, newRecordBatch.getFieldVectors().get(1)); - assertEquals(intVector3, newRecordBatch.getFieldVectors().get(2)); + assertEquals(intVector1.getField(), newRecordBatch.getFieldVectors().get(0).getField()); + assertEquals(intVector2.getField(), newRecordBatch.getFieldVectors().get(1).getField()); + assertEquals(intVector3.getField(), newRecordBatch.getFieldVectors().get(2).getField()); original.close(); newRecordBatch.close(); @@ -203,14 +203,87 @@ public void testRemoveVector() { VectorSchemaRoot newRecordBatch = original.removeVector(0); assertEquals(2, newRecordBatch.getFieldVectors().size()); - assertEquals(intVector2, newRecordBatch.getFieldVectors().get(0)); - assertEquals(intVector3, newRecordBatch.getFieldVectors().get(1)); + assertEquals(intVector2.getField(), newRecordBatch.getFieldVectors().get(0).getField()); + assertEquals(intVector3.getField(), newRecordBatch.getFieldVectors().get(1).getField()); original.close(); newRecordBatch.close(); } } + @Test + public void testAddVectorSharesCopy() { + try (final IntVector intVector1 = new IntVector("intVector1", allocator); + final IntVector intVector2 = new IntVector("intVector2", allocator); + final IntVector intVector3 = new IntVector("intVector3", allocator)) { + for (int i = 0; i < 5; i++) { + intVector1.setSafe(i, i * 10); + intVector2.setSafe(i, i * 20); + intVector3.setSafe(i, i * 30); + } + intVector1.setValueCount(5); + intVector2.setValueCount(5); + intVector3.setValueCount(5); + + VectorSchemaRoot original = new VectorSchemaRoot(Arrays.asList(intVector1, intVector2)); + original.setRowCount(5); + + VectorSchemaRoot result = original.addVector(1, intVector3); + + // Close the original root and the added vector -- the result should still have valid data + original.close(); + intVector3.close(); + + assertEquals(3, result.getFieldVectors().size()); + IntVector resultVec0 = (IntVector) result.getVector("intVector1"); + IntVector resultVec1 = (IntVector) result.getVector("intVector3"); + IntVector resultVec2 = (IntVector) result.getVector("intVector2"); + assertEquals(0, resultVec0.get(0)); + assertEquals(10, resultVec0.get(1)); + assertEquals(0, resultVec1.get(0)); + assertEquals(30, resultVec1.get(1)); + assertEquals(0, resultVec2.get(0)); + assertEquals(20, resultVec2.get(1)); + + result.close(); + } + } + + @Test + public void testRemoveVectorSharesCopy() { + try (final IntVector intVector1 = new IntVector("intVector1", allocator); + final IntVector intVector2 = new IntVector("intVector2", allocator); + final IntVector intVector3 = new IntVector("intVector3", allocator)) { + for (int i = 0; i < 5; i++) { + intVector1.setSafe(i, i * 10); + intVector2.setSafe(i, i * 20); + intVector3.setSafe(i, i * 30); + } + intVector1.setValueCount(5); + intVector2.setValueCount(5); + intVector3.setValueCount(5); + + VectorSchemaRoot original = + new VectorSchemaRoot(Arrays.asList(intVector1, intVector2, intVector3)); + original.setRowCount(5); + + VectorSchemaRoot result = original.removeVector(1); + + // Close the original root -- the result should still have valid data + original.close(); + + assertEquals(2, result.getFieldVectors().size()); + IntVector resultVec0 = (IntVector) result.getVector("intVector1"); + IntVector resultVec1 = (IntVector) result.getVector("intVector3"); + assertEquals(0, resultVec0.get(0)); + assertEquals(10, resultVec0.get(1)); + assertEquals(0, resultVec1.get(0)); + assertEquals(30, resultVec1.get(1)); + + result.close(); + } + } + @Test public void testSlice() { try (final IntVector intVector = new IntVector("intVector", allocator); diff --git a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorOps.java b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorOps.java index 1dff8ef87..5a4025fb7 100644 --- a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorOps.java +++ b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorOps.java @@ -176,8 +176,7 @@ public void testTransferCopyVectorSchemaRoot() { intVec.set(1, 8); intVec.setValueCount(2); - VectorSchemaRoot source = - new VectorSchemaRoot(Arrays.asList((FieldVector) intVec)); + VectorSchemaRoot source = new VectorSchemaRoot(Arrays.asList((FieldVector) intVec)); source.setRowCount(2); try (VectorSchemaRoot transferred = VectorOps.transferCopy(source)) {