Skip to content

Commit

Permalink
ORC-577: Allow row-level filtering
Browse files Browse the repository at this point in the history
Fixes #475
  • Loading branch information
Panagiotis Garefalakis authored and jcamachor committed May 26, 2020
1 parent 5dd3b29 commit 8c6b178
Show file tree
Hide file tree
Showing 16 changed files with 2,826 additions and 194 deletions.
40 changes: 40 additions & 0 deletions java/core/src/java/org/apache/orc/Reader.java
Expand Up @@ -22,9 +22,11 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

/**
* The interface for reading ORC files.
Expand Down Expand Up @@ -187,6 +189,8 @@ class Options implements Cloneable {
private Boolean useZeroCopy = null;
private Boolean skipCorruptRecords = null;
private TypeDescription schema = null;
private String[] preFilterColumns = null;
Consumer<VectorizedRowBatch> skipRowCallback = null;
private DataReader dataReader = null;
private Boolean tolerateMissingSchema = null;
private boolean forcePositionalEvolution;
Expand Down Expand Up @@ -237,6 +241,34 @@ public Options schema(TypeDescription schema) {
return this;
}

/**
* Set a row level filter.
* This is an advanced feature that allows the caller to specify
* a list of columns that are read first and then a filter that
* is called to determine which rows if any should be read.
*
* User should expect the batches that come from the reader
* to use the selected array set by their filter.
*
* Use cases for this are predicates that SearchArgs can't represent,
* such as relationships between columns (eg. columnA == columnB).
* @param filterColumnNames a comma separated list of the column names that
* are read before the filter is applied. Only top
* level columns in the reader's schema can be used
* here and they must not be duplicated.
* @param filterCallback a function callback to perform filtering during the call to
* RecordReader.nextBatch. This function should not reference
* any static fields nor modify the passed in ColumnVectors but
* should set the filter output using the selected array.
*
* @return this
*/
public Options setRowFilter(String[] filterColumnNames, Consumer<VectorizedRowBatch> filterCallback) {
this.preFilterColumns = filterColumnNames;
this.skipRowCallback = filterCallback;
return this;
}

/**
* Set search argument for predicate push down.
* @param sarg the search argument
Expand Down Expand Up @@ -336,6 +368,14 @@ public SearchArgument getSearchArgument() {
return sarg;
}

public Consumer<VectorizedRowBatch> getFilterCallback() {
return skipRowCallback;
}

public String[] getPreFilterColumnNames(){
return preFilterColumns;
}

public String[] getColumnNames() {
return columnNames;
}
Expand Down
42 changes: 39 additions & 3 deletions java/core/src/java/org/apache/orc/impl/BitFieldReader.java
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -21,6 +21,7 @@
import java.io.IOException;

import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.io.filter.FilterContext;

public final class BitFieldReader {
private final RunLengthByteReader input;
Expand Down Expand Up @@ -50,6 +51,38 @@ public int next() throws IOException {
return ((current >>> (8 - currentIdx)) & 1);
}

public void nextVector(LongColumnVector previous,
FilterContext filterContext,
long previousLen) throws IOException {
previous.isRepeating = false;
int previousIdx = 0;
if (previous.noNulls) {
for (int i = 0; i != filterContext.getSelectedSize(); i++) {
int idx = filterContext.getSelected()[i];
if (idx - previousIdx > 0) {
skip(idx - previousIdx);
}
previous.vector[idx] = next();
previousIdx = idx + 1;
}
skip(previousLen - previousIdx);
} else {
for (int i = 0; i != filterContext.getSelectedSize(); i++) {
int idx = filterContext.getSelected()[i];
if (idx - previousIdx > 0) {
skip(TreeReaderFactory.TreeReader.countNonNullRowsInRange(previous.isNull, previousIdx, idx));
}
if (!previous.isNull[idx]) {
previous.vector[idx] = next();
} else {
previous.vector[idx] = 1;
}
previousIdx = idx + 1;
}
skip(TreeReaderFactory.TreeReader.countNonNullRowsInRange(previous.isNull, previousIdx, (int)previousLen));
}
}

public void nextVector(LongColumnVector previous,
long previousLen) throws IOException {
previous.isRepeating = true;
Expand Down Expand Up @@ -95,8 +128,11 @@ public void skip(long totalBits) throws IOException {
} else {
final long bitsToSkip = (totalBits - availableBits);
input.skip(bitsToSkip / 8);
current = input.next();
currentIdx = (byte) (bitsToSkip % 8);
// Edge case: when skipping the last bits of a bitField there is nothing more to read!
if (input.hasNext()) {
current = input.next();
currentIdx = (byte) (bitsToSkip % 8);
}
}
}

Expand Down

0 comments on commit 8c6b178

Please sign in to comment.