Skip to content
Permalink
Browse files
DRILL-6353: Upgrade Parquet MR dependencies
closes #1259
  • Loading branch information
vrozov authored and ilooner committed Jun 14, 2018
1 parent 98dbc3a commit ac8e69847659582e36c89fd52bb0856ab3bfbd21
Showing 20 changed files with 209 additions and 234 deletions.
@@ -31,6 +31,20 @@
<packaging>jar</packaging>
<name>contrib/hive-storage-plugin/hive-exec-shaded</name>

<properties>
<hive.parquet.version>1.8.3</hive.parquet.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
<version>${hive.parquet.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
@@ -68,11 +82,6 @@
</exclusion>
</exclusions>
</dependency>
<!--Once newer hive-exec version leverages parquet-column 1.9.0, this dependency can be deleted -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
</dependency>
</dependencies>

<build>
@@ -83,7 +92,7 @@
<artifactSet>
<includes>
<include>org.apache.hive:hive-exec</include>
<include>org.apache.parquet:parquet-column</include>
<include>org.apache.parquet:parquet-hadoop-bundle</include>
<include>commons-codec:commons-codec</include>
<include>com.fasterxml.jackson.core:jackson-databind</include>
<include>com.fasterxml.jackson.core:jackson-annotations</include>
@@ -117,6 +126,10 @@
<pattern>org.apache.parquet.</pattern>
<shadedPattern>hive.org.apache.parquet.</shadedPattern>
</relocation>
<relocation>
<pattern>shaded.parquet.</pattern>
<shadedPattern>hive.shaded.parquet.</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.avro.</pattern>
<shadedPattern>hive.org.apache.avro.</shadedPattern>
@@ -249,92 +249,17 @@
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-format</artifactId>
<version>2.3.0-incubating</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-jackson</artifactId>
<version>${parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
<version>${parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-generator</artifactId>
<version>${parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
@@ -113,7 +113,7 @@ private static <C extends Comparable<C>> LogicalExpression createEqualPredicate(
// can drop when left's max < right's min, or right's max < left's min
final C leftMin = leftStat.genericGetMin();
final C rightMin = rightStat.genericGetMin();
return leftStat.genericGetMax().compareTo(rightMin) < 0 || rightStat.genericGetMax().compareTo(leftMin) < 0;
return (leftStat.compareMaxToValue(rightMin) < 0) || (rightStat.compareMaxToValue(leftMin) < 0);
}) {
@Override
public String toString() {
@@ -132,7 +132,7 @@ private static <C extends Comparable<C>> LogicalExpression createGTPredicate(
return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
// can drop when left's max <= right's min.
final C rightMin = rightStat.genericGetMin();
return leftStat.genericGetMax().compareTo(rightMin) <= 0;
return leftStat.compareMaxToValue(rightMin) <= 0;
});
}

@@ -146,7 +146,7 @@ private static <C extends Comparable<C>> LogicalExpression createGEPredicate(
return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
// can drop when left's max < right's min.
final C rightMin = rightStat.genericGetMin();
return leftStat.genericGetMax().compareTo(rightMin) < 0;
return leftStat.compareMaxToValue(rightMin) < 0;
});
}

@@ -160,7 +160,7 @@ private static <C extends Comparable<C>> LogicalExpression createLTPredicate(
return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
// can drop when right's max <= left's min.
final C leftMin = leftStat.genericGetMin();
return rightStat.genericGetMax().compareTo(leftMin) <= 0;
return rightStat.compareMaxToValue(leftMin) <= 0;
});
}

@@ -173,7 +173,7 @@ private static <C extends Comparable<C>> LogicalExpression createLEPredicate(
return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
// can drop when right's max < left's min.
final C leftMin = leftStat.genericGetMin();
return rightStat.genericGetMax().compareTo(leftMin) < 0;
return rightStat.compareMaxToValue(leftMin) < 0;
});
}

@@ -188,8 +188,8 @@ private static <C extends Comparable<C>> LogicalExpression createNEPredicate(
// can drop when there is only one unique value.
final C leftMax = leftStat.genericGetMax();
final C rightMax = rightStat.genericGetMax();
return leftStat.genericGetMin().compareTo(leftMax) == 0 && rightStat.genericGetMin().compareTo(rightMax) == 0 &&
leftStat.genericGetMax().compareTo(rightMax) == 0;
return leftStat.compareMinToValue(leftMax) == 0 && rightStat.compareMinToValue(rightMax) == 0 &&
leftStat.compareMaxToValue(rightMax) == 0;
});
}

@@ -23,6 +23,7 @@
import org.apache.drill.common.expression.TypedFieldExpr;
import org.apache.drill.common.expression.visitors.ExprVisitor;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.parquet.column.statistics.BooleanStatistics;
import org.apache.parquet.column.statistics.Statistics;

import java.util.ArrayList;
@@ -114,7 +115,7 @@ private static <C extends Comparable<C>> LogicalExpression createIsNotNullPredic
private static LogicalExpression createIsTruePredicate(LogicalExpression expr) {
return new ParquetIsPredicate<Boolean>(expr,
//if max value is not true or if there are all nulls -> canDrop
(exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) || isAllNulls(exprStat, evaluator.getRowCount())
(exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() || isAllNulls(exprStat, evaluator.getRowCount())
);
}

@@ -124,7 +125,7 @@ private static LogicalExpression createIsTruePredicate(LogicalExpression expr) {
private static LogicalExpression createIsFalsePredicate(LogicalExpression expr) {
return new ParquetIsPredicate<Boolean>(expr,
//if min value is not false or if there are all nulls -> canDrop
(exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) || isAllNulls(exprStat, evaluator.getRowCount())
(exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() || isAllNulls(exprStat, evaluator.getRowCount())
);
}

@@ -134,7 +135,7 @@ private static LogicalExpression createIsFalsePredicate(LogicalExpression expr)
private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr) {
return new ParquetIsPredicate<Boolean>(expr,
//if min value is not false or if there are no nulls -> canDrop
(exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) && hasNoNulls(exprStat)
(exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() && hasNoNulls(exprStat)
);
}

@@ -144,7 +145,7 @@ private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr
private static LogicalExpression createIsNotFalsePredicate(LogicalExpression expr) {
return new ParquetIsPredicate<Boolean>(expr,
//if max value is not true or if there are no nulls -> canDrop
(exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) && hasNoNulls(exprStat)
(exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() && hasNoNulls(exprStat)
);
}

@@ -43,7 +43,7 @@ static boolean isNullOrEmpty(Statistics stat) {
* False if at least one row is not null.
*/
static boolean isAllNulls(Statistics stat, long rowCount) {
return stat.getNumNulls() == rowCount;
return stat.isNumNullsSet() && stat.getNumNulls() == rowCount;
}

/**
@@ -54,7 +54,7 @@ static boolean isAllNulls(Statistics stat, long rowCount) {
* False if the parquet file hasn't nulls.
*/
static boolean hasNoNulls(Statistics stat) {
return stat.getNumNulls() == 0;
return !stat.isNumNullsSet() || stat.getNumNulls() == 0;
}

}
@@ -33,11 +33,12 @@
import org.apache.drill.exec.store.parquet2.DrillParquetReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

@@ -50,6 +51,9 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.drill.exec.store.parquet.metadata.Metadata.PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;

public abstract class AbstractParquetScanBatchCreator {

private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
@@ -146,11 +150,15 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow
protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);

private ParquetMetadata readFooter(Configuration conf, String path) throws IOException {
Configuration newConf = new Configuration(conf);
conf = new Configuration(conf);
conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
return ParquetFileReader.readFooter(newConf, new Path(path), ParquetMetadataConverter.NO_FILTER);
conf.setBoolean(PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED, true);
ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(NO_FILTER).build();
try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), conf), options)) {
return reader.getFooter();
}
}

private boolean isComplex(ParquetMetadata footer) {
@@ -21,14 +21,13 @@

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.util.CompatibilityUtil;
import org.apache.parquet.hadoop.util.HadoopStreams;

public class ColumnDataReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
@@ -58,11 +57,7 @@ public BytesInput getPageAsBytesInput(int pageLength) throws IOException{

public void loadPage(DrillBuf target, int pageLength) throws IOException {
target.clear();
ByteBuffer directBuffer = target.nioBuffer(0, pageLength);
int lengthLeftToRead = pageLength;
while (lengthLeftToRead > 0) {
lengthLeftToRead -= CompatibilityUtil.getBuf(input, directBuffer, lengthLeftToRead);
}
HadoopStreams.wrap(input).read(target.nioBuffer(0, pageLength));
target.writerIndex(pageLength);
}

@@ -42,6 +42,7 @@
import com.google.common.base.Preconditions;

import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;

public class FooterGatherer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FooterGatherer.class);
@@ -160,7 +161,8 @@ public static Footer readFooter(final Configuration config, final FileStatus sta
footerBytes = ArrayUtils.subarray(footerBytes, start, start + size);
}

ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(new ByteArrayInputStream(footerBytes));
final ByteArrayInputStream from = new ByteArrayInputStream(footerBytes);
ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(from, NO_FILTER);
Footer footer = new Footer(status.getPath(), metadata);
return footer;
}

0 comments on commit ac8e698

Please sign in to comment.