Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-26694: Populate file row position information during vectorized Iceberg reads #3732

Merged
merged 1 commit into from Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive.vector;

import java.io.IOException;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.iceberg.io.CloseableIterator;

/**
* Wraps a Hive VRB and holds corresponding metadata information about it, such as VRB context (e.g. type infos) and
* file row offset.
*/
public class HiveBatchContext {

private final VectorizedRowBatch batch;
private final VectorizedRowBatchCtx vrbCtx;
/**
* File row position of the first row in this batch. Long.MIN_VALUE if unknown.
*/
private final long fileRowOffset;

public HiveBatchContext(VectorizedRowBatch batch, VectorizedRowBatchCtx vrbCtx, long fileRowOffset) {
this.batch = batch;
this.vrbCtx = vrbCtx;
this.fileRowOffset = fileRowOffset;
}

public VectorizedRowBatch getBatch() {
return batch;
}

public RowIterator rowIterator() throws IOException {
throw new UnsupportedOperationException("Not implemented yet");
}

// TODO: implement row iterator
class RowIterator implements CloseableIterator {

@Override
public void close() throws IOException {
}

@Override
public boolean hasNext() {
return false;
}

@Override
public Object next() {
return null;
}
}
}
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hive.llap.LlapHiveUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
Expand All @@ -30,7 +31,7 @@
/**
* Iterator wrapper around Hive's VectorizedRowBatch producer (MRv1 implementing) record readers.
*/
public final class VectorizedRowBatchIterator implements CloseableIterator<VectorizedRowBatch> {
public final class HiveBatchIterator implements CloseableIterator<HiveBatchContext> {

private final RecordReader<NullWritable, VectorizedRowBatch> recordReader;
private final NullWritable key;
Expand All @@ -39,8 +40,9 @@ public final class VectorizedRowBatchIterator implements CloseableIterator<Vecto
private final int[] partitionColIndices;
private final Object[] partitionValues;
private boolean advanced = false;
private long rowOffset = Long.MIN_VALUE;

VectorizedRowBatchIterator(RecordReader<NullWritable, VectorizedRowBatch> recordReader, JobConf job,
HiveBatchIterator(RecordReader<NullWritable, VectorizedRowBatch> recordReader, JobConf job,
int[] partitionColIndices, Object[] partitionValues) {
this.recordReader = recordReader;
this.key = recordReader.createKey();
Expand All @@ -62,6 +64,11 @@ private void advance() {
if (!recordReader.next(key, batch)) {
batch.size = 0;
}

if (recordReader instanceof RowPositionAwareVectorizedRecordReader) {
rowOffset = ((RowPositionAwareVectorizedRecordReader) recordReader).getRowNumber();
}

// Fill partition values
if (partitionColIndices != null) {
for (int i = 0; i < partitionColIndices.length; ++i) {
Expand All @@ -86,9 +93,9 @@ public boolean hasNext() {
}

@Override
public VectorizedRowBatch next() {
public HiveBatchContext next() {
advance();
advanced = false;
return batch;
return new HiveBatchContext(batch, vrbCtx, rowOffset);
}
}
Expand Up @@ -46,7 +46,8 @@ public HiveIcebergVectorizedRecordReader(
public boolean next(Void key, VectorizedRowBatch value) throws IOException {
try {
if (innerReader.nextKeyValue()) {
VectorizedRowBatch newBatch = (VectorizedRowBatch) innerReader.getCurrentValue();
HiveBatchContext currentValue = (HiveBatchContext) innerReader.getCurrentValue();
VectorizedRowBatch newBatch = currentValue.getBatch();
value.cols = newBatch.cols;
value.endOfFile = newBatch.endOfFile;
value.numCols = newBatch.numCols;
Expand Down
Expand Up @@ -76,8 +76,9 @@ private HiveVectorizedReader() {

}

public static <D> CloseableIterable<D> reader(Path path, FileScanTask task, Map<Integer, ?> idToConstant,
TaskAttemptContext context, Expression residual) {
public static CloseableIterable<HiveBatchContext> reader(Path path, FileScanTask task,
Map<Integer, ?> idToConstant, TaskAttemptContext context, Expression residual) {

// Tweaks on jobConf here are relevant for this task only, so we need to copy it first as context's conf is reused..
JobConf job = new JobConf(context.getConfiguration());
FileFormat format = task.file().format();
Expand Down Expand Up @@ -171,8 +172,9 @@ private static RecordReader<NullWritable, VectorizedRowBatch> orcRecordReader(Jo
VectorizedReadUtils.deserializeToShadedOrcTail(serializedOrcTail).getSchema(), residual);

// If LLAP enabled, try to retrieve an LLAP record reader - this might yield to null in some special cases
// TODO: add support for reading files with positional deletes with LLAP (LLAP would need to provide file row num)
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
LlapProxy.getIo() != null) {
LlapProxy.getIo() != null && task.deletes().isEmpty()) {
recordReader = LlapProxy.getIo().llapVectorizedOrcReaderForPath(fileId, path, null, readColumnIds,
job, start, length, reporter);
}
Expand Down Expand Up @@ -220,14 +222,14 @@ private static RecordReader<NullWritable, VectorizedRowBatch> parquetRecordReade
return inputFormat.getRecordReader(split, job, reporter);
}

private static <D> CloseableIterable<D> createVectorizedRowBatchIterable(
private static CloseableIterable<HiveBatchContext> createVectorizedRowBatchIterable(
RecordReader<NullWritable, VectorizedRowBatch> hiveRecordReader, JobConf job, int[] partitionColIndices,
Object[] partitionValues) {

VectorizedRowBatchIterator iterator =
new VectorizedRowBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues);
HiveBatchIterator iterator =
new HiveBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues);

return new CloseableIterable<D>() {
return new CloseableIterable<HiveBatchContext>() {

@Override
public CloseableIterator iterator() {
Expand Down
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.io;

import java.io.IOException;

public interface RowPositionAwareVectorizedRecordReader {
/**
* Returns the row position (in the file) of the first row in the last returned batch.
* @return row position
* @throws IOException
*/
long getRowNumber() throws IOException;
}
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketIdentifier;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.NullWritable;
Expand All @@ -55,7 +56,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
SelfDescribingInputFormatInterface {

static class VectorizedOrcRecordReader
implements RecordReader<NullWritable, VectorizedRowBatch> {
implements RecordReader<NullWritable, VectorizedRowBatch>, RowPositionAwareVectorizedRecordReader {
private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
private final long offset;
private final long length;
Expand Down Expand Up @@ -171,6 +172,11 @@ public void close() throws IOException {
public float getProgress() throws IOException {
return progress;
}

@Override
public long getRowNumber() throws IOException {
return reader.getRowNumber();
}
}

public VectorizedOrcInputFormat() {
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hive.ql.io.BucketIdentifier;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
Expand Down Expand Up @@ -78,8 +79,10 @@
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

Expand All @@ -90,7 +93,7 @@
* from Apache Spark and Apache Parquet.
*/
public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
implements RecordReader<NullWritable, VectorizedRowBatch> {
implements RecordReader<NullWritable, VectorizedRowBatch>, RowPositionAwareVectorizedRecordReader {
public static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class);

private List<Integer> colsToInclude;
Expand Down Expand Up @@ -128,6 +131,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
private ZoneId writerTimezone;
private final BucketIdentifier bucketIdentifier;

// number of rows returned with the last batch
private int lastReturnedRowCount = -1;

// row number (in the file) of the first row returned in the last batch
private long currentRowNumInRowGroup = -1;

// index of the current rowgroup, incremented after reader.readNextRowGroup() calls
private int currentRowGroupIndex = -1;

private Map<Integer, Long> rowGroupNumToRowPos = new HashMap<>();

// LLAP cache integration
// TODO: also support fileKey in splits, like OrcSplit does
private Object cacheKey = null;
Expand Down Expand Up @@ -216,7 +230,11 @@ public void initialize(
offsets.add(offset);
}
blocks = new ArrayList<>();
long allRowsInFile = 0;
int blockIndex = 0;
for (BlockMetaData block : parquetMetadata.getBlocks()) {
rowGroupNumToRowPos.put(blockIndex++, allRowsInFile);
allRowsInFile += block.getRowCount();
if (offsets.contains(block.getStartingPos())) {
blocks.add(block);
}
Expand Down Expand Up @@ -365,10 +383,17 @@ public float getProgress() throws IOException {
return 0;
}

@Override
public long getRowNumber() throws IOException {
return rowGroupNumToRowPos.get(currentRowGroupIndex) + currentRowNumInRowGroup;
}

/**
* Advances to the next batch of rows. Returns false if there are no more.
*/
private boolean nextBatch(VectorizedRowBatch columnarBatch) throws IOException {
currentRowNumInRowGroup += lastReturnedRowCount;

columnarBatch.reset();
if (rowsReturned >= totalRowCount) {
return false;
Expand All @@ -391,6 +416,7 @@ private boolean nextBatch(VectorizedRowBatch columnarBatch) throws IOException {
columnTypesList.get(colsToInclude.get(i)));
}
}
lastReturnedRowCount = num;
rowsReturned += num;
columnarBatch.size = num;
return true;
Expand Down Expand Up @@ -430,6 +456,9 @@ private void checkEndOfRowGroup() throws IOException {
}
}

currentRowNumInRowGroup = 0;
currentRowGroupIndex++;

totalCountLoadedSoFar += pages.getRowCount();
}

Expand Down
Expand Up @@ -119,6 +119,11 @@ public void decimalRead() throws Exception {
stringReadDecimal(isDictionaryEncoding);
}

@Test
public void verifyBatchOffsets() throws Exception {
super.verifyBatchOffsets();
}

private class TestVectorizedParquetRecordReader extends VectorizedParquetRecordReader {
public TestVectorizedParquetRecordReader(
org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) throws IOException {
Expand Down
Expand Up @@ -1067,4 +1067,24 @@ protected void decimalRead(boolean isDictionaryEncoding) throws Exception {
reader.close();
}
}

protected void verifyBatchOffsets() throws Exception {
Configuration c = new Configuration();
c.set(IOConstants.COLUMNS, "int64_field");
c.set(IOConstants.COLUMNS_TYPES, "bigint");
c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
VectorizedParquetRecordReader reader =
createTestParquetReader("message test { required int64 int64_field;}", c);
VectorizedRowBatch previous = reader.createValue();
try {
int batchCount = 0;
while (reader.next(NullWritable.get(), previous)) {
assertEquals(VectorizedRowBatch.DEFAULT_SIZE * batchCount++, reader.getRowNumber());
}
assertEquals(reader.getRowNumber(), nElements);
} finally {
reader.close();
}
}
}