Skip to content

Commit

Permalink
For review
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed May 27, 2024
1 parent 0eb2ca6 commit 0bc4933
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 71 deletions.
73 changes: 73 additions & 0 deletions common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.arrow.c;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.Field;

/** This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. */
public class CometSchemaImporter {
private final BufferAllocator allocator;
private final SchemaImporter importer;
private final CDataDictionaryProvider provider = new CDataDictionaryProvider();

public CometSchemaImporter(BufferAllocator allocator) {
this.allocator = allocator;
this.importer = new SchemaImporter(allocator);
}

public BufferAllocator getAllocator() {
return allocator;
}

public CDataDictionaryProvider getProvider() {
return provider;
}

public Field importField(ArrowSchema schema) {
try {
return importer.importField(schema, provider);
} finally {
schema.release();
schema.close();
}
}

/**
* Imports data from ArrowArray/ArrowSchema into a FieldVector. This is basically the same as Java
* Arrow `Data.importVector`. `Data.importVector` initiates `SchemaImporter` internally which is
* used to fill dictionary ids for dictionary encoded vectors. Every call to `importVector` will
* begin with dictionary ids starting from 0. So, separate calls to `importVector` will overwrite
* dictionary ids. To avoid this, we need to use the same `SchemaImporter` instance for all calls
* to `importVector`.
*/
public FieldVector importVector(ArrowArray array, ArrowSchema schema) {
Field field = importField(schema);
FieldVector vector = field.createVector(allocator);
Data.importIntoVector(allocator, array, vector, provider);

return vector;
}

public void close() {
provider.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl
private MessageType requestedSchema;
private CometVector[] vectors;
private AbstractColumnReader[] columnReaders;
private CometSchemaImporter importer;
private ColumnarBatch currentBatch;
private Future<Option<Throwable>> prefetchTask;
private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
Expand Down Expand Up @@ -519,6 +520,10 @@ public void close() throws IOException {
fileReader.close();
fileReader = null;
}
if (importer != null) {
importer.close();
importer = null;
}
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -556,7 +561,8 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
numRowGroupsMetric.add(1);
}

CometSchemaImporter importer = new CometSchemaImporter(ALLOCATOR);
if (importer != null) importer.close();
importer = new CometSchemaImporter(ALLOCATOR);

List<ColumnDescriptor> columns = requestedSchema.getColumns();
for (int i = 0; i < columns.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.spark.sql.comet.util.Utils$;
import org.apache.spark.sql.types.DataType;

import org.apache.comet.CometConf;
Expand Down Expand Up @@ -206,8 +205,7 @@ public CometDecodedVector loadVector() {

try (ArrowArray array = ArrowArray.wrap(addresses[0]);
ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
FieldVector vector =
Utils$.MODULE$.importVector(importer.getAllocator(), importer, array, schema);
FieldVector vector = importer.importVector(array, schema);

DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();

Expand Down
44 changes: 0 additions & 44 deletions common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala

This file was deleted.

25 changes: 2 additions & 23 deletions common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import java.nio.channels.Channels

import scala.collection.JavaConverters._

import org.apache.arrow.c.{ArrowArray, ArrowSchema, CDataDictionaryProvider, CometSchemaImporter, Data}
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.c.CDataDictionaryProvider
import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot}
import org.apache.arrow.vector.complex.MapVector
import org.apache.arrow.vector.dictionary.DictionaryProvider
Expand All @@ -39,7 +38,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}

import org.apache.comet.vector._
import org.apache.comet.vector.CometVector

object Utils {
def getConfPath(confFileName: String): String = {
Expand Down Expand Up @@ -265,24 +264,4 @@ object Utils {
throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}")
}
}

/**
* Imports data from ArrowArray/ArrowSchema into a FieldVector. This is basically the same as
* Java Arrow `Data.importVector`. `Data.importVector` initiates `SchemaImporter` internally
* which is used to fill dictionary ids for dictionary encoded vectors. Every call to
* `importVector` will begin with dictionary ids starting from 0. So, separate calls to
* `importVector` will overwrite dictionary ids. To avoid this, we need to use the same
* `SchemaImporter` instance for all calls to `importVector`.
*/
def importVector(
allocator: BufferAllocator,
importer: CometSchemaImporter,
array: ArrowArray,
schema: ArrowSchema): FieldVector = {
val field = importer.importField(schema)
val vector = field.createVector(allocator)
Data.importIntoVector(allocator, array, vector, importer.getProvider())

vector
}
}

0 comments on commit 0bc4933

Please sign in to comment.