Skip to content

Commit

Permalink
DRILL-4203: Fix date values written in parquet files created by Drill
Browse files Browse the repository at this point in the history
Drill was writing non-standard dates into parquet files for all releases
before 1.9.0. The values have been read by Drill correctly by Drill, but
external tools like Spark reading the files will see corrupted values for
all dates that have been written by Drill.

This change corrects the behavior of the Drill parquet writer to correctly
store dates in the format given in the parquet specification.

To maintain compatibility with old files, the parquet reader code has
been updated to check for the old format and automatically shift the
corrupted values into corrected ones automatically.

The test cases included here should ensure that all files produced by
historical versions of Drill will continue to return the same values they
had in previous releases. For compatibility with external tools, any old
files with corrupted dates can be re-written using the CREATE TABLE AS
command (as the writer will now only produce the specification-compliant
values, even if after reading out of older corrupt files).

While the old behavior was a consistent shift into an unlikely range
to be used in a modern database (over 10,000 years in the future), these are still
valid date values. In the case where these may have been written into
files intentionally, and we cannot be certain from the metadata if Drill
produced the files, an option is included to turn off the auto-correction.
Use of this option is assumed to be extremely unlikely, but it is included
for completeness.

This patch was originally written against version 1.5.0, when rebasing
the corruption threshold was updated to 1.9.0.

Added regenerated binary files, updated metadata cache files accordingly.

One small fix in the ParquetGroupScan to accommodate changes in master that changed
when metadata is read.

Tests for bugs revealed by the regression suite.

Fix drill version number in metadata file generation
  • Loading branch information
jaltekruse authored and parthchandra committed Oct 14, 2016
1 parent 2f4b5ef commit ae34d5c
Show file tree
Hide file tree
Showing 87 changed files with 1,607 additions and 87 deletions.
Expand Up @@ -36,6 +36,7 @@
import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -118,14 +119,19 @@ public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan
final List<Integer> rowGroupNums = getRowGroupNumbersFromFileSplit(fileSplit, parquetMetadata); final List<Integer> rowGroupNums = getRowGroupNumbersFromFileSplit(fileSplit, parquetMetadata);


for(int rowGroupNum : rowGroupNums) { for(int rowGroupNum : rowGroupNums) {
// Drill has only ever written a single row group per file, only detect corruption
// in the first row group
ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
ParquetReaderUtility.detectCorruptDates(parquetMetadata, config.getColumns(), true);
readers.add(new ParquetRecordReader( readers.add(new ParquetRecordReader(
context, context,
Path.getPathWithoutSchemeAndAuthority(finalPath).toString(), Path.getPathWithoutSchemeAndAuthority(finalPath).toString(),
rowGroupNum, fs, rowGroupNum, fs,
CodecFactory.createDirectCodecFactory(fs.getConf(), CodecFactory.createDirectCodecFactory(fs.getConf(),
new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0), new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
parquetMetadata, parquetMetadata,
newColumns) newColumns,
containsCorruptDates)
); );
Map<String, String> implicitValues = Maps.newLinkedHashMap(); Map<String, String> implicitValues = Maps.newLinkedHashMap();


Expand Down
Expand Up @@ -156,12 +156,12 @@ public void writeField() throws IOException {
<#elseif minor.class == "Date"> <#elseif minor.class == "Date">
<#if mode.prefix == "Repeated" > <#if mode.prefix == "Repeated" >
reader.read(i, holder); reader.read(i, holder);
consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) + JULIAN_DAY_EPOC)); consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) - JULIAN_DAY_EPOC));
<#else> <#else>
consumer.startField(fieldName, fieldId); consumer.startField(fieldName, fieldId);
reader.read(holder); reader.read(holder);
// convert from internal Drill date format to Julian Day centered around Unix Epoc // convert from internal Drill date format to Julian Day centered around Unix Epoc
consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) + JULIAN_DAY_EPOC)); consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) - JULIAN_DAY_EPOC));
consumer.endField(fieldName, fieldId); consumer.endField(fieldName, fieldId);
</#if> </#if>
<#elseif <#elseif
Expand Down
Expand Up @@ -110,7 +110,10 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConv
return notSupported(tableName); return notSupported(tableName);
} }


Metadata.createMeta(fs, selectionRoot); if (!(formatConfig instanceof ParquetFormatConfig)) {
formatConfig = new ParquetFormatConfig();
}
Metadata.createMeta(fs, selectionRoot, (ParquetFormatConfig) formatConfig);
return direct(true, "Successfully updated metadata for table %s.", tableName); return direct(true, "Successfully updated metadata for table %s.", tableName);


} catch(Exception e) { } catch(Exception e) {
Expand Down
Expand Up @@ -26,6 +26,8 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.util.DrillVersionInfo;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.TimedRunnable; import org.apache.drill.exec.store.TimedRunnable;
import org.apache.drill.exec.store.dfs.DrillPathFilter; import org.apache.drill.exec.store.dfs.DrillPathFilter;
import org.apache.drill.exec.store.dfs.MetadataContext; import org.apache.drill.exec.store.dfs.MetadataContext;
Expand Down Expand Up @@ -80,6 +82,7 @@ public class Metadata {
public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories"; public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories";


private final FileSystem fs; private final FileSystem fs;
private final ParquetFormatConfig formatConfig;


private ParquetTableMetadataBase parquetTableMetadata; private ParquetTableMetadataBase parquetTableMetadata;
private ParquetTableMetadataDirs parquetTableMetadataDirs; private ParquetTableMetadataDirs parquetTableMetadataDirs;
Expand All @@ -91,8 +94,8 @@ public class Metadata {
* @param path * @param path
* @throws IOException * @throws IOException
*/ */
public static void createMeta(FileSystem fs, String path) throws IOException { public static void createMeta(FileSystem fs, String path, ParquetFormatConfig formatConfig) throws IOException {
Metadata metadata = new Metadata(fs); Metadata metadata = new Metadata(fs, formatConfig);
metadata.createMetaFilesRecursively(path); metadata.createMetaFilesRecursively(path);
} }


Expand All @@ -104,9 +107,9 @@ public static void createMeta(FileSystem fs, String path) throws IOException {
* @return * @return
* @throws IOException * @throws IOException
*/ */
public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs, String path) public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs, String path, ParquetFormatConfig formatConfig)
throws IOException { throws IOException {
Metadata metadata = new Metadata(fs); Metadata metadata = new Metadata(fs, formatConfig);
return metadata.getParquetTableMetadata(path); return metadata.getParquetTableMetadata(path);
} }


Expand All @@ -119,8 +122,8 @@ public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs, Str
* @throws IOException * @throws IOException
*/ */
public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs, public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs,
List<FileStatus> fileStatuses) throws IOException { List<FileStatus> fileStatuses, ParquetFormatConfig formatConfig) throws IOException {
Metadata metadata = new Metadata(fs); Metadata metadata = new Metadata(fs, formatConfig);
return metadata.getParquetTableMetadata(fileStatuses); return metadata.getParquetTableMetadata(fileStatuses);
} }


Expand All @@ -132,20 +135,21 @@ public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs,
* @return * @return
* @throws IOException * @throws IOException
*/ */
public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String path, MetadataContext metaContext) throws IOException { public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String path, MetadataContext metaContext, ParquetFormatConfig formatConfig) throws IOException {
Metadata metadata = new Metadata(fs); Metadata metadata = new Metadata(fs, formatConfig);
metadata.readBlockMeta(path, false, metaContext); metadata.readBlockMeta(path, false, metaContext);
return metadata.parquetTableMetadata; return metadata.parquetTableMetadata;
} }


public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, String path, MetadataContext metaContext) throws IOException { public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, String path, MetadataContext metaContext, ParquetFormatConfig formatConfig) throws IOException {
Metadata metadata = new Metadata(fs); Metadata metadata = new Metadata(fs, formatConfig);
metadata.readBlockMeta(path, true, metaContext); metadata.readBlockMeta(path, true, metaContext);
return metadata.parquetTableMetadataDirs; return metadata.parquetTableMetadataDirs;
} }


private Metadata(FileSystem fs) { private Metadata(FileSystem fs, ParquetFormatConfig formatConfig) {
this.fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf()); this.fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf());
this.formatConfig = formatConfig;
} }


/** /**
Expand Down Expand Up @@ -345,6 +349,10 @@ private ParquetFileMetadata_v2 getParquetFileMetadata_v2(ParquetTableMetadata_v2


List<RowGroupMetadata_v2> rowGroupMetadataList = Lists.newArrayList(); List<RowGroupMetadata_v2> rowGroupMetadataList = Lists.newArrayList();


ArrayList<SchemaPath> ALL_COLS = new ArrayList<>();
ALL_COLS.add(AbstractRecordReader.STAR_COLUMN);
boolean autoCorrectCorruptDates = formatConfig.autoCorrectCorruptDates;
ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates);
for (BlockMetaData rowGroup : metadata.getBlocks()) { for (BlockMetaData rowGroup : metadata.getBlocks()) {
List<ColumnMetadata_v2> columnMetadataList = Lists.newArrayList(); List<ColumnMetadata_v2> columnMetadataList = Lists.newArrayList();
long length = 0; long length = 0;
Expand All @@ -367,9 +375,13 @@ private ParquetFileMetadata_v2 getParquetFileMetadata_v2(ParquetTableMetadata_v2
if (statsAvailable) { if (statsAvailable) {
// Write stats only if minVal==maxVal. Also, we then store only maxVal // Write stats only if minVal==maxVal. Also, we then store only maxVal
Object mxValue = null; Object mxValue = null;
if (stats.genericGetMax() != null && stats.genericGetMin() != null && stats.genericGetMax() if (stats.genericGetMax() != null && stats.genericGetMin() != null &&
.equals(stats.genericGetMin())) { stats.genericGetMax().equals(stats.genericGetMin())) {
mxValue = stats.genericGetMax(); mxValue = stats.genericGetMax();
if (containsCorruptDates == ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION
&& columnTypeMetadata.originalType == OriginalType.DATE) {
mxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) mxValue);
}
} }
columnMetadata = columnMetadata =
new ColumnMetadata_v2(columnTypeMetadata.name, col.getType(), mxValue, stats.getNumNulls()); new ColumnMetadata_v2(columnTypeMetadata.name, col.getType(), mxValue, stats.getNumNulls());
Expand Down Expand Up @@ -521,7 +533,6 @@ private void readBlockMeta(String path,
* Check if the parquet metadata needs to be updated by comparing the modification time of the directories with * Check if the parquet metadata needs to be updated by comparing the modification time of the directories with
* the modification time of the metadata file * the modification time of the metadata file
* *
* @param tableMetadata
* @param metaFilePath * @param metaFilePath
* @return * @return
* @throws IOException * @throws IOException
Expand Down Expand Up @@ -585,6 +596,7 @@ public static abstract class ParquetTableMetadataBase {
@JsonIgnore public abstract OriginalType getOriginalType(String[] columnName); @JsonIgnore public abstract OriginalType getOriginalType(String[] columnName);


@JsonIgnore public abstract ParquetTableMetadataBase clone(); @JsonIgnore public abstract ParquetTableMetadataBase clone();
@JsonIgnore public abstract String getDrillVersion();
} }


public static abstract class ParquetFileMetadata { public static abstract class ParquetFileMetadata {
Expand Down Expand Up @@ -618,6 +630,24 @@ public static abstract class ColumnMetadata {


public abstract Object getMaxValue(); public abstract Object getMaxValue();


/**
* Set the max value recorded in the parquet metadata statistics.
*
* This object would just be immutable, but due to Drill-4203 we need to correct
* date values that had been corrupted by earlier versions of Drill.
* @return
*/
public abstract void setMax(Object newMax);

/**
* Set the max value recorded in the parquet metadata statistics.
*
* This object would just be immutable, but due to Drill-4203 we need to correct
* date values that had been corrupted by earlier versions of Drill.
* @return
*/
public abstract void setMin(Object newMax);

public abstract PrimitiveTypeName getPrimitiveType(); public abstract PrimitiveTypeName getPrimitiveType();


public abstract OriginalType getOriginalType(); public abstract OriginalType getOriginalType();
Expand Down Expand Up @@ -681,6 +711,10 @@ public ParquetTableMetadata_v1(List<ParquetFileMetadata_v1> files, List<String>
@JsonIgnore @Override public ParquetTableMetadataBase clone() { @JsonIgnore @Override public ParquetTableMetadataBase clone() {
return new ParquetTableMetadata_v1(files, directories); return new ParquetTableMetadata_v1(files, directories);
} }
@Override
public String getDrillVersion() {
return null;
}
} }




Expand Down Expand Up @@ -870,7 +904,6 @@ public void setMax(Object max) {
return max; return max;
} }



} }


/** /**
Expand All @@ -885,16 +918,18 @@ public void setMax(Object max) {
@JsonProperty public ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo; @JsonProperty public ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo;
@JsonProperty List<ParquetFileMetadata_v2> files; @JsonProperty List<ParquetFileMetadata_v2> files;
@JsonProperty List<String> directories; @JsonProperty List<String> directories;
@JsonProperty String drillVersion;


public ParquetTableMetadata_v2() { public ParquetTableMetadata_v2() {
super(); this.drillVersion = DrillVersionInfo.getVersion();
} }


public ParquetTableMetadata_v2(ParquetTableMetadataBase parquetTable, public ParquetTableMetadata_v2(ParquetTableMetadataBase parquetTable,
List<ParquetFileMetadata_v2> files, List<String> directories) { List<ParquetFileMetadata_v2> files, List<String> directories) {
this.files = files; this.files = files;
this.directories = directories; this.directories = directories;
this.columnTypeInfo = ((ParquetTableMetadata_v2) parquetTable).columnTypeInfo; this.columnTypeInfo = ((ParquetTableMetadata_v2) parquetTable).columnTypeInfo;
this.drillVersion = DrillVersionInfo.getVersion();
} }


public ParquetTableMetadata_v2(List<ParquetFileMetadata_v2> files, List<String> directories, public ParquetTableMetadata_v2(List<ParquetFileMetadata_v2> files, List<String> directories,
Expand Down Expand Up @@ -935,6 +970,11 @@ public ColumnTypeMetadata_v2 getColumnTypeInfo(String[] name) {
@JsonIgnore @Override public ParquetTableMetadataBase clone() { @JsonIgnore @Override public ParquetTableMetadataBase clone() {
return new ParquetTableMetadata_v2(files, directories, columnTypeInfo); return new ParquetTableMetadata_v2(files, directories, columnTypeInfo);
} }
@Override
public String getDrillVersion() {
return drillVersion;
}

} }




Expand Down Expand Up @@ -1141,6 +1181,11 @@ public boolean hasSingleValue() {
return mxValue; return mxValue;
} }


@Override
public void setMin(Object newMin) {
// noop - min value not stored in this version of the metadata
}

@Override public PrimitiveTypeName getPrimitiveType() { @Override public PrimitiveTypeName getPrimitiveType() {
return null; return null;
} }
Expand Down
Expand Up @@ -17,21 +17,33 @@
*/ */
package org.apache.drill.exec.store.parquet; package org.apache.drill.exec.store.parquet;


import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.FormatPluginConfig;


import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;


@JsonTypeName("parquet") @JsonTypeName("parquet")
public class ParquetFormatConfig implements FormatPluginConfig{ public class ParquetFormatConfig implements FormatPluginConfig{


public boolean autoCorrectCorruptDates = true;

@Override @Override
public int hashCode() { public boolean equals(Object o) {
return 7; if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ParquetFormatConfig that = (ParquetFormatConfig) o;

return autoCorrectCorruptDates == that.autoCorrectCorruptDates;

} }


@Override @Override
public boolean equals(Object obj) { public int hashCode() {
return obj instanceof ParquetFormatConfig; return (autoCorrectCorruptDates ? 1 : 0);
} }

} }
Expand Up @@ -92,7 +92,7 @@ public ParquetFormatPlugin(String name, DrillbitContext context, Configuration f
StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){ StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
this.context = context; this.context = context;
this.config = formatConfig; this.config = formatConfig;
this.formatMatcher = new ParquetFormatMatcher(this); this.formatMatcher = new ParquetFormatMatcher(this, config);
this.storageConfig = storageConfig; this.storageConfig = storageConfig;
this.fsConf = fsConf; this.fsConf = fsConf;
this.name = name == null ? DEFAULT_NAME : name; this.name = name == null ? DEFAULT_NAME : name;
Expand Down Expand Up @@ -196,8 +196,11 @@ public FormatMatcher getMatcher() {


private static class ParquetFormatMatcher extends BasicFormatMatcher{ private static class ParquetFormatMatcher extends BasicFormatMatcher{


public ParquetFormatMatcher(ParquetFormatPlugin plugin) { private final ParquetFormatConfig formatConfig;

public ParquetFormatMatcher(ParquetFormatPlugin plugin, ParquetFormatConfig formatConfig) {
super(plugin, PATTERNS, MAGIC_STRINGS); super(plugin, PATTERNS, MAGIC_STRINGS);
this.formatConfig = formatConfig;
} }


@Override @Override
Expand All @@ -218,7 +221,7 @@ public DrillTable isReadable(DrillFileSystem fs, FileSelection selection,
// create a metadata context that will be used for the duration of the query for this table // create a metadata context that will be used for the duration of the query for this table
MetadataContext metaContext = new MetadataContext(); MetadataContext metaContext = new MetadataContext();


ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath.toString(), metaContext); ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath.toString(), metaContext, formatConfig);
if (mDirs.getDirectories().size() > 0) { if (mDirs.getDirectories().size() > 0) {
FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection, FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection,
selection.getSelectionRoot() /* cacheFileRoot initially points to selectionRoot */); selection.getSelectionRoot() /* cacheFileRoot initially points to selectionRoot */);
Expand Down

0 comments on commit ae34d5c

Please sign in to comment.