Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions common/src/main/scala/org/apache/comet/vector/ExportedBatch.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.vector

import org.apache.arrow.c.ArrowArray
import org.apache.arrow.c.ArrowSchema

/**
* A wrapper class to hold the exported Arrow arrays and schemas.
*
* @param batch
* a list containing number of rows + pairs of memory addresses in the format of (address of
* Arrow array, address of Arrow schema)
* @param arrowSchemas
* the exported Arrow schemas, needs to be deallocated after being moved by the native executor
* @param arrowArrays
* the exported Arrow arrays, needs to be deallocated after being moved by the native executor
*/
case class ExportedBatch(
batch: Array[Long],
arrowSchemas: Array[ArrowSchema],
arrowArrays: Array[ArrowArray]) {
def close(): Unit = {
arrowSchemas.foreach(_.close())
arrowArrays.foreach(_.close())
}
}
68 changes: 39 additions & 29 deletions common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,43 +44,53 @@ class NativeUtil {
* @param batch
* the input Comet columnar batch
* @return
* a list containing number of rows + pairs of memory addresses in the format of (address of
* Arrow array, address of Arrow schema)
* an exported batches object containing an array containing number of rows + pairs of memory
* addresses in the format of (address of Arrow array, address of Arrow schema)
*/
def exportBatch(batch: ColumnarBatch): Array[Long] = {
def exportBatch(batch: ColumnarBatch): ExportedBatch = {
val exportedVectors = mutable.ArrayBuffer.empty[Long]
exportedVectors += batch.numRows()

// Run checks prior to exporting the batch
(0 until batch.numCols()).foreach { index =>
val c = batch.column(index)
if (!c.isInstanceOf[CometVector]) {
batch.close()
throw new SparkException(
"Comet execution only takes Arrow Arrays, but got " +
s"${c.getClass}")
}
}

val arrowSchemas = mutable.ArrayBuffer.empty[ArrowSchema]
val arrowArrays = mutable.ArrayBuffer.empty[ArrowArray]

(0 until batch.numCols()).foreach { index =>
batch.column(index) match {
case a: CometVector =>
val valueVector = a.getValueVector

val provider = if (valueVector.getField.getDictionary != null) {
a.getDictionaryProvider
} else {
null
}

Comment on lines 68 to -64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for moving the sanity check for column types prior to the actual construction of the Arrow C data structure. It is tricky to release already constructed FFI data structures before raising the exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, okay.

val arrowSchema = ArrowSchema.allocateNew(allocator)
val arrowArray = ArrowArray.allocateNew(allocator)
Data.exportVector(
allocator,
getFieldVector(valueVector, "export"),
provider,
arrowArray,
arrowSchema)

exportedVectors += arrowArray.memoryAddress()
exportedVectors += arrowSchema.memoryAddress()
case c =>
throw new SparkException(
"Comet execution only takes Arrow Arrays, but got " +
s"${c.getClass}")
val cometVector = batch.column(index).asInstanceOf[CometVector]
val valueVector = cometVector.getValueVector

val provider = if (valueVector.getField.getDictionary != null) {
cometVector.getDictionaryProvider
} else {
null
}

val arrowSchema = ArrowSchema.allocateNew(allocator)
val arrowArray = ArrowArray.allocateNew(allocator)
arrowSchemas += arrowSchema
arrowArrays += arrowArray
Data.exportVector(
allocator,
getFieldVector(valueVector, "export"),
provider,
arrowArray,
arrowSchema)

exportedVectors += arrowArray.memoryAddress()
exportedVectors += arrowSchema.memoryAddress()
}

exportedVectors.toArray
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can return ExportedBatch without any above change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't like the restructuring that moves the sanity checks, I can revert it to the original control flow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExportedBatch(exportedVectors.toArray, arrowSchemas.toArray, arrowArrays.toArray)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,8 @@ object Utils {
val writer = new ArrowStreamWriter(root, provider, Channels.newChannel(out))
writer.start()
writer.writeBatch()

root.clear()
writer.end()

out.flush()
out.close()
writer.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing the writer will close the dictionary provider. If the dictionary arrays are shared across batches, you will close them and empty later batches. I remember we hit the issue before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I need to take further look at this to fully understand if this fix is correct or not.

Copy link
Member Author

@Kontinuation Kontinuation Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be correct. The dictionary provider held by the writer contains copied vectors, so closing them does not interfere with the rest parts of the system.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember closing it will cause some errors in CI due the reason I mentioned. Let's see if CI can pass or not.

Copy link
Member Author

@Kontinuation Kontinuation Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CI passes on all commits of this PR, it has run 3 rounds with no problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, maybe there was some other changes before. Anyway it is good to close the writer without issue.


if (out.size() > 0) {
(batch.numRows(), cbbos.toChunkedByteBuffer)
Expand Down
21 changes: 20 additions & 1 deletion spark/src/main/java/org/apache/comet/CometBatchIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.spark.sql.vectorized.ColumnarBatch;

import org.apache.comet.vector.ExportedBatch;
import org.apache.comet.vector.NativeUtil;

/**
Expand All @@ -34,9 +35,12 @@ public class CometBatchIterator {
final Iterator<ColumnarBatch> input;
final NativeUtil nativeUtil;

private ExportedBatch lastBatch;

CometBatchIterator(Iterator<ColumnarBatch> input, NativeUtil nativeUtil) {
this.input = input;
this.nativeUtil = nativeUtil;
this.lastBatch = null;
}

/**
Expand All @@ -45,12 +49,27 @@ public class CometBatchIterator {
* indicating the end of the iterator.
*/
public long[] next() {
// Native side already copied the content of ArrowSchema and ArrowArray. We should deallocate
// the ArrowSchema and ArrowArray base structures allocated in JVM.
if (lastBatch != null) {
lastBatch.close();
lastBatch = null;
}

boolean hasBatch = input.hasNext();

if (!hasBatch) {
return new long[] {-1};
}

return nativeUtil.exportBatch(input.next());
lastBatch = nativeUtil.exportBatch(input.next());
return lastBatch.batch();
}

public void close() {
if (lastBatch != null) {
lastBatch.close();
lastBatch = null;
}
}
}
2 changes: 2 additions & 0 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ class CometExecIterator(
}
nativeLib.releasePlan(plan)

cometBatchIterators.foreach(_.close())

// The allocator thoughts the exported ArrowArray and ArrowSchema structs are not released,
// so it will report:
// Caused by: java.lang.IllegalStateException: Memory was leaked by query.
Expand Down