diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java new file mode 100644 index 000000000000..08d2f7325550 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.vector; + +import java.io.IOException; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.iceberg.io.CloseableIterator; + +/** + * Wraps a Hive VRB and holds corresponding metadata information about it, such as VRB context (e.g. type infos) and + * file row offset. + */ +public class HiveBatchContext { + + private final VectorizedRowBatch batch; + private final VectorizedRowBatchCtx vrbCtx; + /** + * File row position of the first row in this batch. Long.MIN_VALUE if unknown. + */ + private final long fileRowOffset; + + public HiveBatchContext(VectorizedRowBatch batch, VectorizedRowBatchCtx vrbCtx, long fileRowOffset) { + this.batch = batch; + this.vrbCtx = vrbCtx; + this.fileRowOffset = fileRowOffset; + } + + public VectorizedRowBatch getBatch() { + return batch; + } + + public RowIterator rowIterator() throws IOException { + throw new UnsupportedOperationException("Not implemented yet"); + } + + // TODO: implement row iterator + class RowIterator implements CloseableIterator { + + @Override + public void close() throws IOException { + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public Object next() { + return null; + } + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java similarity index 83% rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java index 1ad5180e58c7..22a42f2953e0 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.llap.LlapHiveUtils; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -30,7 +31,7 @@ /** * Iterator wrapper around Hive's VectorizedRowBatch producer (MRv1 implementing) record readers. */ -public final class VectorizedRowBatchIterator implements CloseableIterator { +public final class HiveBatchIterator implements CloseableIterator { private final RecordReader recordReader; private final NullWritable key; @@ -39,8 +40,9 @@ public final class VectorizedRowBatchIterator implements CloseableIterator recordReader, JobConf job, + HiveBatchIterator(RecordReader recordReader, JobConf job, int[] partitionColIndices, Object[] partitionValues) { this.recordReader = recordReader; this.key = recordReader.createKey(); @@ -62,6 +64,11 @@ private void advance() { if (!recordReader.next(key, batch)) { batch.size = 0; } + + if (recordReader instanceof RowPositionAwareVectorizedRecordReader) { + rowOffset = ((RowPositionAwareVectorizedRecordReader) recordReader).getRowNumber(); + } + // Fill partition values if (partitionColIndices != null) { for (int i = 0; i < partitionColIndices.length; ++i) { @@ -86,9 +93,9 @@ public boolean hasNext() { } @Override - public VectorizedRowBatch next() { + public HiveBatchContext next() { advance(); advanced = false; - return batch; + return new HiveBatchContext(batch, vrbCtx, rowOffset); } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java index 412f74780908..a70a37c22247 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java @@ -46,7 +46,8 @@ public HiveIcebergVectorizedRecordReader( public boolean next(Void key, VectorizedRowBatch value) throws IOException { try { if (innerReader.nextKeyValue()) { - VectorizedRowBatch newBatch = (VectorizedRowBatch) innerReader.getCurrentValue(); + HiveBatchContext currentValue = (HiveBatchContext) innerReader.getCurrentValue(); + VectorizedRowBatch newBatch = currentValue.getBatch(); value.cols = newBatch.cols; value.endOfFile = newBatch.endOfFile; value.numCols = newBatch.numCols; diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java index 2e0d11e59c58..22131062ddf7 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java @@ -76,8 +76,9 @@ private HiveVectorizedReader() { } - public static CloseableIterable reader(Path path, FileScanTask task, Map idToConstant, - TaskAttemptContext context, Expression residual) { + public static CloseableIterable reader(Path path, FileScanTask task, + Map idToConstant, TaskAttemptContext context, Expression residual) { + // Tweaks on jobConf here are relevant for this task only, so we need to copy it first as context's conf is reused.. JobConf job = new JobConf(context.getConfiguration()); FileFormat format = task.file().format(); @@ -171,8 +172,9 @@ private static RecordReader orcRecordReader(Jo VectorizedReadUtils.deserializeToShadedOrcTail(serializedOrcTail).getSchema(), residual); // If LLAP enabled, try to retrieve an LLAP record reader - this might yield to null in some special cases + // TODO: add support for reading files with positional deletes with LLAP (LLAP would need to provide file row num) if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) && - LlapProxy.getIo() != null) { + LlapProxy.getIo() != null && task.deletes().isEmpty()) { recordReader = LlapProxy.getIo().llapVectorizedOrcReaderForPath(fileId, path, null, readColumnIds, job, start, length, reporter); } @@ -220,14 +222,14 @@ private static RecordReader parquetRecordReade return inputFormat.getRecordReader(split, job, reporter); } - private static CloseableIterable createVectorizedRowBatchIterable( + private static CloseableIterable createVectorizedRowBatchIterable( RecordReader hiveRecordReader, JobConf job, int[] partitionColIndices, Object[] partitionValues) { - VectorizedRowBatchIterator iterator = - new VectorizedRowBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues); + HiveBatchIterator iterator = + new HiveBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues); - return new CloseableIterable() { + return new CloseableIterable() { @Override public CloseableIterator iterator() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RowPositionAwareVectorizedRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RowPositionAwareVectorizedRecordReader.java new file mode 100644 index 000000000000..1775787f1873 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RowPositionAwareVectorizedRecordReader.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +public interface RowPositionAwareVectorizedRecordReader { + /** + * Returns the row position (in the file) of the first row in the last returned batch. + * @return row position + * @throws IOException + */ + long getRowNumber() throws IOException; +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index b7b48efbc74a..aea4f37180b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketIdentifier; import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.NullWritable; @@ -55,7 +56,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat { + implements RecordReader, RowPositionAwareVectorizedRecordReader { private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; private final long offset; private final long length; @@ -171,6 +172,11 @@ public void close() throws IOException { public float getProgress() throws IOException { return progress; } + + @Override + public long getRowNumber() throws IOException { + return reader.getRowNumber(); + } } public VectorizedOrcInputFormat() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index f7b13cb3d6a8..c1c0a1206868 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.io.BucketIdentifier; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader; import org.apache.hadoop.hive.ql.io.SyntheticFileId; import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; @@ -78,8 +79,10 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -90,7 +93,7 @@ * from Apache Spark and Apache Parquet. */ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase - implements RecordReader { + implements RecordReader, RowPositionAwareVectorizedRecordReader { public static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class); private List colsToInclude; @@ -128,6 +131,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase private ZoneId writerTimezone; private final BucketIdentifier bucketIdentifier; + // number of rows returned with the last batch + private int lastReturnedRowCount = -1; + + // row number (in the file) of the first row returned in the last batch + private long currentRowNumInRowGroup = -1; + + // index of the current rowgroup, incremented after reader.readNextRowGroup() calls + private int currentRowGroupIndex = -1; + + private Map rowGroupNumToRowPos = new HashMap<>(); + // LLAP cache integration // TODO: also support fileKey in splits, like OrcSplit does private Object cacheKey = null; @@ -216,7 +230,11 @@ public void initialize( offsets.add(offset); } blocks = new ArrayList<>(); + long allRowsInFile = 0; + int blockIndex = 0; for (BlockMetaData block : parquetMetadata.getBlocks()) { + rowGroupNumToRowPos.put(blockIndex++, allRowsInFile); + allRowsInFile += block.getRowCount(); if (offsets.contains(block.getStartingPos())) { blocks.add(block); } @@ -365,10 +383,17 @@ public float getProgress() throws IOException { return 0; } + @Override + public long getRowNumber() throws IOException { + return rowGroupNumToRowPos.get(currentRowGroupIndex) + currentRowNumInRowGroup; + } + /** * Advances to the next batch of rows. Returns false if there are no more. */ private boolean nextBatch(VectorizedRowBatch columnarBatch) throws IOException { + currentRowNumInRowGroup += lastReturnedRowCount; + columnarBatch.reset(); if (rowsReturned >= totalRowCount) { return false; @@ -391,6 +416,7 @@ private boolean nextBatch(VectorizedRowBatch columnarBatch) throws IOException { columnTypesList.get(colsToInclude.get(i))); } } + lastReturnedRowCount = num; rowsReturned += num; columnarBatch.size = num; return true; @@ -430,6 +456,9 @@ private void checkEndOfRowGroup() throws IOException { } } + currentRowNumInRowGroup = 0; + currentRowGroupIndex++; + totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java index e290e332e7f3..0a0867fff9f1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -119,6 +119,11 @@ public void decimalRead() throws Exception { stringReadDecimal(isDictionaryEncoding); } + @Test + public void verifyBatchOffsets() throws Exception { + super.verifyBatchOffsets(); + } + private class TestVectorizedParquetRecordReader extends VectorizedParquetRecordReader { public TestVectorizedParquetRecordReader( org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) throws IOException { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java index e29b6f0c7661..df78eb697586 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java @@ -1067,4 +1067,24 @@ protected void decimalRead(boolean isDictionaryEncoding) throws Exception { reader.close(); } } + + protected void verifyBatchOffsets() throws Exception { + Configuration c = new Configuration(); + c.set(IOConstants.COLUMNS, "int64_field"); + c.set(IOConstants.COLUMNS_TYPES, "bigint"); + c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createTestParquetReader("message test { required int64 int64_field;}", c); + VectorizedRowBatch previous = reader.createValue(); + try { + int batchCount = 0; + while (reader.next(NullWritable.get(), previous)) { + assertEquals(VectorizedRowBatch.DEFAULT_SIZE * batchCount++, reader.getRowNumber()); + } + assertEquals(reader.getRowNumber(), nElements); + } finally { + reader.close(); + } + } }