Skip to content

Commit

Permalink
DRILL-7069: Moved version checks outside loops in transformBinaryInMe…
Browse files Browse the repository at this point in the history
…tadataCache

closes #1667
  • Loading branch information
Ben-Zvi authored and karthik committed Mar 8, 2019
1 parent 6bd31f3 commit 5abcd88
Showing 1 changed file with 50 additions and 22 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.apache.drill.exec.store.parquet.metadata.MetadataVersion;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.work.ExecErrorConstants;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.hadoop.util.VersionUtil;
import org.apache.parquet.SemanticVersion;
import org.apache.parquet.VersionParser;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.apache.drill.exec.store.parquet.metadata.Metadata_V2.ColumnTypeMetadata_v2;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V2.ParquetTableMetadata_v2;
Expand Down Expand Up @@ -239,9 +241,11 @@ public static void correctDatesInMetadataCache(ParquetTableMetadataBase parquetT
new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 0)) >= 0 ?
DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_UNCLEAR_TEST_VALUES;
if (cacheFileCanContainsCorruptDates == DateCorruptionStatus.META_UNCLEAR_TEST_VALUES) {
boolean mdVersion_1_0 = new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()));
boolean mdVersion_2_0 = new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()));
// Looking for the DATE data type of column names in the metadata cache file ("metadata_version" : "v2")
String[] names = new String[0];
if (new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
if (mdVersion_2_0) {
for (ColumnTypeMetadata_v2 columnTypeMetadata :
((ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) {
if (OriginalType.DATE.equals(columnTypeMetadata.originalType)) {
Expand All @@ -256,7 +260,7 @@ public static void correctDatesInMetadataCache(ParquetTableMetadataBase parquetT
Long rowCount = rowGroupMetadata.getRowCount();
for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
// Setting Min/Max values for ParquetTableMetadata_v1
if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
if (mdVersion_1_0) {
OriginalType originalType = columnMetadata.getOriginalType();
if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue(rowCount) &&
(Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
Expand All @@ -266,10 +270,11 @@ public static void correctDatesInMetadataCache(ParquetTableMetadataBase parquetT
}
}
// Setting Max values for ParquetTableMetadata_v2
else if (new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion())) &&
columnMetadata.getName() != null && Arrays.equals(columnMetadata.getName(), names) &&
columnMetadata.hasSingleValue(rowCount) && (Integer) columnMetadata.getMaxValue() >
ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
else if (mdVersion_2_0 &&
columnMetadata.getName() != null &&
Arrays.equals(columnMetadata.getName(), names) &&
columnMetadata.hasSingleValue(rowCount) &&
(Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
int newMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) columnMetadata.getMaxValue());
columnMetadata.setMax(newMax);
}
Expand All @@ -292,30 +297,53 @@ public static void transformBinaryInMetadataCache(ParquetTableMetadataBase parqu
Set<List<String>> columnsNames = getBinaryColumnsNames(parquetTableMetadata);
boolean allowBinaryMetadata = allowBinaryMetadata(parquetTableMetadata.getDrillVersion(), readerConfig);

// Setting Min / Max values for ParquetTableMetadata_v1
if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
for (RowGroupMetadata rowGroupMetadata : file.getRowGroups()) {
Long rowCount = rowGroupMetadata.getRowCount();
for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
if (columnMetadata.getPrimitiveType() == PrimitiveTypeName.BINARY || columnMetadata.getPrimitiveType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, false);
}
}
}
}
return;
}

// Variables needed for debugging only
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
int maxRowGroups = 0;
int minRowGroups = Integer.MAX_VALUE;
int maxNumColumns = 0;

// Setting Min / Max values for V2 and V3 versions; for versions V3_3 and above need to do decoding
boolean needDecoding = new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) >= 0;
for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
if ( timer != null ) { // for debugging only
maxRowGroups = Math.max(maxRowGroups, file.getRowGroups().size());
minRowGroups = Math.min(minRowGroups, file.getRowGroups().size());
}
for (RowGroupMetadata rowGroupMetadata : file.getRowGroups()) {
Long rowCount = rowGroupMetadata.getRowCount();
if ( timer != null ) { // for debugging only
maxNumColumns = Math.max(maxNumColumns, rowGroupMetadata.getColumns().size());
}
for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
// Setting Min / Max values for ParquetTableMetadata_v1
if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
if (columnMetadata.getPrimitiveType() == PrimitiveTypeName.BINARY
|| columnMetadata.getPrimitiveType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, false);
}
}
// Setting Min / Max values for V2 and all V3 versions prior to V3_3
else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) < 0
&& columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, false);
}
// Setting Min / Max values for V3_3 and all next versions
else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) >= 0
&& columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, true);
if (columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, needDecoding);
}
}
}
}

if (timer != null) { // log a debug message and stop the timer
String reportRG = 1 == maxRowGroups ? "1 rowgroup" : "between " + minRowGroups + "-" + maxRowGroups + "rowgroups";
logger.debug("Transforming binary in metadata cache took {} ms ({} files, {} per file, max {} columns)", timer.elapsed(TimeUnit.MILLISECONDS),
parquetTableMetadata.getFiles().size(), reportRG, maxNumColumns);
timer.stop();
}
}

/**
Expand Down

0 comments on commit 5abcd88

Please sign in to comment.