From 467830498ad60e0acd98d3f33254a1d63acb74a5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 27 May 2024 10:22:23 -0700 Subject: [PATCH] For review --- .../apache/arrow/c/CometSchemaImporter.java | 73 +++++++++++++++++++ .../org/apache/comet/parquet/BatchReader.java | 8 +- .../apache/comet/parquet/ColumnReader.java | 4 +- .../apache/arrow/c/CometSchemaImporter.scala | 44 ----------- .../apache/spark/sql/comet/util/Utils.scala | 23 +----- 5 files changed, 82 insertions(+), 70 deletions(-) create mode 100644 common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java delete mode 100644 common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala diff --git a/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java b/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java new file mode 100644 index 000000000..32955f1ac --- /dev/null +++ b/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.arrow.c; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.types.pojo.Field; + +/** This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. */ +public class CometSchemaImporter { + private final BufferAllocator allocator; + private final SchemaImporter importer; + private final CDataDictionaryProvider provider = new CDataDictionaryProvider(); + + public CometSchemaImporter(BufferAllocator allocator) { + this.allocator = allocator; + this.importer = new SchemaImporter(allocator); + } + + public BufferAllocator getAllocator() { + return allocator; + } + + public CDataDictionaryProvider getProvider() { + return provider; + } + + public Field importField(ArrowSchema schema) { + try { + return importer.importField(schema, provider); + } finally { + schema.release(); + schema.close(); + } + } + + /** + * Imports data from ArrowArray/ArrowSchema into a FieldVector. This is basically the same as Java + * Arrow `Data.importVector`. `Data.importVector` initiates `SchemaImporter` internally which is + * used to fill dictionary ids for dictionary encoded vectors. Every call to `importVector` will + * begin with dictionary ids starting from 0. So, separate calls to `importVector` will overwrite + * dictionary ids. To avoid this, we need to use the same `SchemaImporter` instance for all calls + * to `importVector`. + */ + public FieldVector importVector(ArrowArray array, ArrowSchema schema) { + Field field = importField(schema); + FieldVector vector = field.createVector(allocator); + Data.importIntoVector(allocator, array, vector, provider); + + return vector; + } + + public void close() { + provider.close(); + } +} diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index e3303e8ac..bf8e6e550 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -108,6 +108,7 @@ public class BatchReader extends RecordReader implements Cl private MessageType requestedSchema; private CometVector[] vectors; private AbstractColumnReader[] columnReaders; + private CometSchemaImporter importer; private ColumnarBatch currentBatch; private Future> prefetchTask; private LinkedBlockingQueue> prefetchQueue; @@ -519,6 +520,10 @@ public void close() throws IOException { fileReader.close(); fileReader = null; } + if (importer != null) { + importer.close(); + importer = null; + } } @SuppressWarnings("deprecation") @@ -556,7 +561,8 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable { numRowGroupsMetric.add(1); } - CometSchemaImporter importer = new CometSchemaImporter(ALLOCATOR); + if (importer != null) importer.close(); + importer = new CometSchemaImporter(ALLOCATOR); List columns = requestedSchema.getColumns(); for (int i = 0; i < columns.size(); i++) { diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index 033ca589b..3d4cb3aa5 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -39,7 +39,6 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.schema.LogicalTypeAnnotation; -import org.apache.spark.sql.comet.util.Utils$; import org.apache.spark.sql.types.DataType; import org.apache.comet.CometConf; @@ -206,8 +205,7 @@ public CometDecodedVector loadVector() { try (ArrowArray array = ArrowArray.wrap(addresses[0]); ArrowSchema schema = ArrowSchema.wrap(addresses[1])) { - FieldVector vector = - Utils$.MODULE$.importVector(importer.getAllocator(), importer, array, schema); + FieldVector vector = importer.importVector(array, schema); DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary(); diff --git a/common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala b/common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala deleted file mode 100644 index 3dedcddfd..000000000 --- a/common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.arrow.c - -import org.apache.arrow.memory.BufferAllocator -import org.apache.arrow.vector.types.pojo.Field - -/** - * This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. - */ -class CometSchemaImporter(allocator: BufferAllocator) { - val importer = new SchemaImporter(allocator) - val provider = new CDataDictionaryProvider() - - def getAllocator(): BufferAllocator = allocator - - def getProvider(): CDataDictionaryProvider = provider - - def importField(schema: ArrowSchema): Field = { - try { - importer.importField(schema, provider) - } finally { - schema.release(); - schema.close(); - } - } -} diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 5569b4b1a..298cbab6b 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -25,8 +25,7 @@ import java.nio.channels.Channels import scala.collection.JavaConverters._ -import org.apache.arrow.c.{ArrowArray, ArrowSchema, CDataDictionaryProvider, CometSchemaImporter, Data} -import org.apache.arrow.memory.BufferAllocator +import org.apache.arrow.c.CDataDictionaryProvider import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot} import org.apache.arrow.vector.complex.MapVector import org.apache.arrow.vector.dictionary.DictionaryProvider @@ -265,24 +264,4 @@ object Utils { throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}") } } - - /** - * Imports data from ArrowArray/ArrowSchema into a FieldVector. This is basically the same as - * Java Arrow `Data.importVector`. `Data.importVector` initiates `SchemaImporter` internally - * which is used to fill dictionary ids for dictionary encoded vectors. Every call to - * `importVector` will begin with dictionary ids starting from 0. So, separate calls to - * `importVector` will overwrite dictionary ids. To avoid this, we need to use the same - * `SchemaImporter` instance for all calls to `importVector`. - */ - def importVector( - allocator: BufferAllocator, - importer: CometSchemaImporter, - array: ArrowArray, - schema: ArrowSchema): FieldVector = { - val field = importer.importField(schema) - val vector = field.createVector(allocator) - Data.importIntoVector(allocator, array, vector, importer.getProvider()) - - vector - } }