Skip to content

Commit

Permalink
HIVE-26694: Populate file row position information during vectorized …
Browse files Browse the repository at this point in the history
…Iceberg reads (#3732) (Adam Szita, reviewed by Laszlo Pinter)
  • Loading branch information
szlta committed Nov 7, 2022
1 parent 72ce0c9 commit 63b6134
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 14 deletions.
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive.vector;

import java.io.IOException;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.iceberg.io.CloseableIterator;

/**
* Wraps a Hive VRB and holds corresponding metadata information about it, such as VRB context (e.g. type infos) and
* file row offset.
*/
public class HiveBatchContext {

private final VectorizedRowBatch batch;
private final VectorizedRowBatchCtx vrbCtx;
/**
* File row position of the first row in this batch. Long.MIN_VALUE if unknown.
*/
private final long fileRowOffset;

public HiveBatchContext(VectorizedRowBatch batch, VectorizedRowBatchCtx vrbCtx, long fileRowOffset) {
this.batch = batch;
this.vrbCtx = vrbCtx;
this.fileRowOffset = fileRowOffset;
}

public VectorizedRowBatch getBatch() {
return batch;
}

public RowIterator rowIterator() throws IOException {
throw new UnsupportedOperationException("Not implemented yet");
}

// TODO: implement row iterator
class RowIterator implements CloseableIterator {

@Override
public void close() throws IOException {
}

@Override
public boolean hasNext() {
return false;
}

@Override
public Object next() {
return null;
}
}
}
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hive.llap.LlapHiveUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
Expand All @@ -30,7 +31,7 @@
/**
* Iterator wrapper around Hive's VectorizedRowBatch producer (MRv1 implementing) record readers.
*/
public final class VectorizedRowBatchIterator implements CloseableIterator<VectorizedRowBatch> {
public final class HiveBatchIterator implements CloseableIterator<HiveBatchContext> {

private final RecordReader<NullWritable, VectorizedRowBatch> recordReader;
private final NullWritable key;
Expand All @@ -39,8 +40,9 @@ public final class VectorizedRowBatchIterator implements CloseableIterator<Vecto
private final int[] partitionColIndices;
private final Object[] partitionValues;
private boolean advanced = false;
private long rowOffset = Long.MIN_VALUE;

VectorizedRowBatchIterator(RecordReader<NullWritable, VectorizedRowBatch> recordReader, JobConf job,
HiveBatchIterator(RecordReader<NullWritable, VectorizedRowBatch> recordReader, JobConf job,
int[] partitionColIndices, Object[] partitionValues) {
this.recordReader = recordReader;
this.key = recordReader.createKey();
Expand All @@ -62,6 +64,11 @@ private void advance() {
if (!recordReader.next(key, batch)) {
batch.size = 0;
}

if (recordReader instanceof RowPositionAwareVectorizedRecordReader) {
rowOffset = ((RowPositionAwareVectorizedRecordReader) recordReader).getRowNumber();
}

// Fill partition values
if (partitionColIndices != null) {
for (int i = 0; i < partitionColIndices.length; ++i) {
Expand All @@ -86,9 +93,9 @@ public boolean hasNext() {
}

@Override
public VectorizedRowBatch next() {
public HiveBatchContext next() {
advance();
advanced = false;
return batch;
return new HiveBatchContext(batch, vrbCtx, rowOffset);
}
}
Expand Up @@ -46,7 +46,8 @@ public HiveIcebergVectorizedRecordReader(
public boolean next(Void key, VectorizedRowBatch value) throws IOException {
try {
if (innerReader.nextKeyValue()) {
VectorizedRowBatch newBatch = (VectorizedRowBatch) innerReader.getCurrentValue();
HiveBatchContext currentValue = (HiveBatchContext) innerReader.getCurrentValue();
VectorizedRowBatch newBatch = currentValue.getBatch();
value.cols = newBatch.cols;
value.endOfFile = newBatch.endOfFile;
value.numCols = newBatch.numCols;
Expand Down
Expand Up @@ -76,8 +76,9 @@ private HiveVectorizedReader() {

}

public static <D> CloseableIterable<D> reader(Path path, FileScanTask task, Map<Integer, ?> idToConstant,
TaskAttemptContext context, Expression residual) {
public static CloseableIterable<HiveBatchContext> reader(Path path, FileScanTask task,
Map<Integer, ?> idToConstant, TaskAttemptContext context, Expression residual) {

// Tweaks on jobConf here are relevant for this task only, so we need to copy it first as context's conf is reused..
JobConf job = new JobConf(context.getConfiguration());
FileFormat format = task.file().format();
Expand Down Expand Up @@ -171,8 +172,9 @@ private static RecordReader<NullWritable, VectorizedRowBatch> orcRecordReader(Jo
VectorizedReadUtils.deserializeToShadedOrcTail(serializedOrcTail).getSchema(), residual);

// If LLAP enabled, try to retrieve an LLAP record reader - this might yield to null in some special cases
// TODO: add support for reading files with positional deletes with LLAP (LLAP would need to provide file row num)
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
LlapProxy.getIo() != null) {
LlapProxy.getIo() != null && task.deletes().isEmpty()) {
recordReader = LlapProxy.getIo().llapVectorizedOrcReaderForPath(fileId, path, null, readColumnIds,
job, start, length, reporter);
}
Expand Down Expand Up @@ -220,14 +222,14 @@ private static RecordReader<NullWritable, VectorizedRowBatch> parquetRecordReade
return inputFormat.getRecordReader(split, job, reporter);
}

private static <D> CloseableIterable<D> createVectorizedRowBatchIterable(
private static CloseableIterable<HiveBatchContext> createVectorizedRowBatchIterable(
RecordReader<NullWritable, VectorizedRowBatch> hiveRecordReader, JobConf job, int[] partitionColIndices,
Object[] partitionValues) {

VectorizedRowBatchIterator iterator =
new VectorizedRowBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues);
HiveBatchIterator iterator =
new HiveBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues);

return new CloseableIterable<D>() {
return new CloseableIterable<HiveBatchContext>() {

@Override
public CloseableIterator iterator() {
Expand Down
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.io;

import java.io.IOException;

public interface RowPositionAwareVectorizedRecordReader {
/**
* Returns the row position (in the file) of the first row in the last returned batch.
* @return row position
* @throws IOException
*/
long getRowNumber() throws IOException;
}
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketIdentifier;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.NullWritable;
Expand All @@ -55,7 +56,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
SelfDescribingInputFormatInterface {

static class VectorizedOrcRecordReader
implements RecordReader<NullWritable, VectorizedRowBatch> {
implements RecordReader<NullWritable, VectorizedRowBatch>, RowPositionAwareVectorizedRecordReader {
private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
private final long offset;
private final long length;
Expand Down Expand Up @@ -171,6 +172,11 @@ public void close() throws IOException {
public float getProgress() throws IOException {
return progress;
}

@Override
public long getRowNumber() throws IOException {
return reader.getRowNumber();
}
}

public VectorizedOrcInputFormat() {
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hive.ql.io.BucketIdentifier;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
Expand Down Expand Up @@ -78,8 +79,10 @@
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

Expand All @@ -90,7 +93,7 @@
* from Apache Spark and Apache Parquet.
*/
public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
implements RecordReader<NullWritable, VectorizedRowBatch> {
implements RecordReader<NullWritable, VectorizedRowBatch>, RowPositionAwareVectorizedRecordReader {
public static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class);

private List<Integer> colsToInclude;
Expand Down Expand Up @@ -128,6 +131,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
private ZoneId writerTimezone;
private final BucketIdentifier bucketIdentifier;

// number of rows returned with the last batch
private int lastReturnedRowCount = -1;

// row number (in the file) of the first row returned in the last batch
private long currentRowNumInRowGroup = -1;

// index of the current rowgroup, incremented after reader.readNextRowGroup() calls
private int currentRowGroupIndex = -1;

private Map<Integer, Long> rowGroupNumToRowPos = new HashMap<>();

// LLAP cache integration
// TODO: also support fileKey in splits, like OrcSplit does
private Object cacheKey = null;
Expand Down Expand Up @@ -216,7 +230,11 @@ public void initialize(
offsets.add(offset);
}
blocks = new ArrayList<>();
long allRowsInFile = 0;
int blockIndex = 0;
for (BlockMetaData block : parquetMetadata.getBlocks()) {
rowGroupNumToRowPos.put(blockIndex++, allRowsInFile);
allRowsInFile += block.getRowCount();
if (offsets.contains(block.getStartingPos())) {
blocks.add(block);
}
Expand Down Expand Up @@ -365,10 +383,17 @@ public float getProgress() throws IOException {
return 0;
}

@Override
public long getRowNumber() throws IOException {
return rowGroupNumToRowPos.get(currentRowGroupIndex) + currentRowNumInRowGroup;
}

/**
* Advances to the next batch of rows. Returns false if there are no more.
*/
private boolean nextBatch(VectorizedRowBatch columnarBatch) throws IOException {
currentRowNumInRowGroup += lastReturnedRowCount;

columnarBatch.reset();
if (rowsReturned >= totalRowCount) {
return false;
Expand All @@ -391,6 +416,7 @@ private boolean nextBatch(VectorizedRowBatch columnarBatch) throws IOException {
columnTypesList.get(colsToInclude.get(i)));
}
}
lastReturnedRowCount = num;
rowsReturned += num;
columnarBatch.size = num;
return true;
Expand Down Expand Up @@ -430,6 +456,9 @@ private void checkEndOfRowGroup() throws IOException {
}
}

currentRowNumInRowGroup = 0;
currentRowGroupIndex++;

totalCountLoadedSoFar += pages.getRowCount();
}

Expand Down
Expand Up @@ -119,6 +119,11 @@ public void decimalRead() throws Exception {
stringReadDecimal(isDictionaryEncoding);
}

@Test
public void verifyBatchOffsets() throws Exception {
super.verifyBatchOffsets();
}

private class TestVectorizedParquetRecordReader extends VectorizedParquetRecordReader {
public TestVectorizedParquetRecordReader(
org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) throws IOException {
Expand Down
Expand Up @@ -1067,4 +1067,24 @@ protected void decimalRead(boolean isDictionaryEncoding) throws Exception {
reader.close();
}
}

protected void verifyBatchOffsets() throws Exception {
Configuration c = new Configuration();
c.set(IOConstants.COLUMNS, "int64_field");
c.set(IOConstants.COLUMNS_TYPES, "bigint");
c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
VectorizedParquetRecordReader reader =
createTestParquetReader("message test { required int64 int64_field;}", c);
VectorizedRowBatch previous = reader.createValue();
try {
int batchCount = 0;
while (reader.next(NullWritable.get(), previous)) {
assertEquals(VectorizedRowBatch.DEFAULT_SIZE * batchCount++, reader.getRowNumber());
}
assertEquals(reader.getRowNumber(), nElements);
} finally {
reader.close();
}
}
}

0 comments on commit 63b6134

Please sign in to comment.