Skip to content

Commit

Permalink
HIVE-25104: Backward incompatible timestamp serialization in Parquet …
Browse files Browse the repository at this point in the history
…for certain timezones (Stamatis Zampetakis, reviewed by Jesus Camacho Rodriguez)

Closes #2282
  • Loading branch information
zabetak committed Jun 4, 2021
1 parent ac9acdf commit f1ff996
Show file tree
Hide file tree
Showing 22 changed files with 575 additions and 103 deletions.
9 changes: 7 additions & 2 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2215,8 +2215,13 @@ public static enum ConfVars {
"This value controls whether date type in Parquet files was written using the hybrid or proleptic\n" +
"calendar. Hybrid is the default."),
HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED("hive.parquet.timestamp.legacy.conversion.enabled", true,
"This value controls whether we use former Java time API to convert between timezones on files where timezone\n" +
"is not encoded in the metadata. This is for debugging."),
"Whether to use former Java date/time APIs to convert between timezones when reading timestamps from " +
"Parquet files. The property has no effect when the file contains explicit metadata about the conversion " +
"used to write the data; in this case reading conversion is chosen based on the metadata."),
HIVE_PARQUET_TIMESTAMP_WRITE_LEGACY_CONVERSION_ENABLED("hive.parquet.timestamp.write.legacy.conversion.enabled", false,
"Whether to use former Java date/time APIs to convert between timezones when writing timestamps in " +
"Parquet files. Once data are written to the file the effect is permanent (also reflected in the metadata)." +
"Changing the value of this property affects only new data written to the file."),
HIVE_AVRO_TIMESTAMP_SKIP_CONVERSION("hive.avro.timestamp.skip.conversion", false,
"Some older Hive implementations (pre-3.1) wrote Avro timestamps in a UTC-normalized" +
"manner, while from version 3.1 until now Hive wrote time zone agnostic timestamps. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ Table Parameters:
previous_metadata_location hdfs://### HDFS PATH ###
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize 2626
totalSize 2899
#### A masked pattern was here ####
write.format.default parquet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ Table Parameters:
previous_metadata_location hdfs://### HDFS PATH ###
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize 1503
totalSize 1659
#### A masked pattern was here ####
write.format.default parquet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ Table Parameters:
rawDataSize 116
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize 568
totalSize 607
#### A masked pattern was here ####
write.format.default parquet

Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.jupiter.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.jupiter.version}</version>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions ql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,11 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.SerDeStats;
Expand Down Expand Up @@ -142,8 +143,11 @@ protected ParquetInputSplit getSplit(
skipProlepticConversion = HiveConf.getBoolVar(
conf, HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
}
legacyConversionEnabled = HiveConf.getBoolVar(
conf, ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED);
legacyConversionEnabled = HiveConf.getBoolVar(conf, ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED);
if (fileMetaData.getKeyValueMetaData().containsKey(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY)) {
legacyConversionEnabled = Boolean.parseBoolean(
fileMetaData.getKeyValueMetaData().get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY));
}

split = new ParquetInputSplit(finalPath,
splitStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;

import com.google.common.base.MoreObjects;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.ParquetTimestampUtils;
import org.apache.hadoop.hive.common.type.CalendarUtils;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
Expand Down Expand Up @@ -699,10 +703,11 @@ protected TimestampWritableV2 convert(Binary binary) {
// time zone in order to emulate time zone agnostic behavior.
boolean skipConversion = Boolean.parseBoolean(
metadata.get(HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION.varname));
boolean legacyConversion = Boolean.parseBoolean(
metadata.get(ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED.varname));
Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipConversion,
DataWritableReadSupport.getWriterTimeZoneId(metadata), legacyConversion);
String legacyConversion = metadata.get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY);
assert legacyConversion != null;
ZoneId targetZone = skipConversion ? ZoneOffset.UTC : MoreObjects
.firstNonNull(DataWritableReadSupport.getWriterTimeZoneId(metadata), TimeZone.getDefault().toZoneId());
Timestamp ts = NanoTimeUtils.getTimestamp(nt, targetZone, Boolean.parseBoolean(legacyConversion));
return new TimestampWritableV2(ts);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -303,7 +304,7 @@ public static Boolean getWriterDateProleptic(Map<String, String> metadata) {

return null;
}

/**
* Return the columns which contains required nested attribute level
* E.g., given struct a:<x:int, y:int> while 'x' is required and 'y' is not, the method will return
Expand Down Expand Up @@ -523,10 +524,30 @@ public RecordMaterializer<ArrayWritable> prepareForRead(final Configuration conf
configuration, HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT)));
}

String legacyConversion = ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED.varname;
if (!metadata.containsKey(legacyConversion)) {
metadata.put(legacyConversion, String.valueOf(HiveConf.getBoolVar(
configuration, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED)));
if (!metadata.containsKey(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY)) {
final String legacyConversion;
if(keyValueMetaData.containsKey(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY)) {
// If there is meta about the legacy conversion then the file should be read in the same way it was written.
legacyConversion = keyValueMetaData.get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY);
} else if(keyValueMetaData.containsKey(DataWritableWriteSupport.WRITER_TIMEZONE)) {
// If there is no meta about the legacy conversion but there is meta about the timezone then we can infer the
// file was written with the new rules.
legacyConversion = "false";
} else {
// If there is no meta at all then it is not possible to determine which rules were used to write the file.
// Choose between old/new rules using the respective configuration property.
legacyConversion = String.valueOf(
HiveConf.getBoolVar(configuration, ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED));
}
metadata.put(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY, legacyConversion);
} else {
String ctxMeta = metadata.get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY);
String fileMeta = keyValueMetaData.get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY);
if (!Objects.equals(ctxMeta, fileMeta)) {
throw new IllegalStateException(
"Different values for " + DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY + " metadata: context ["
+ ctxMeta + "], file [" + fileMeta + "].");
}
}

return new DataWritableRecordConverter(readContext.getRequestedSchema(), metadata, hiveTypeInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,11 @@ private static Calendar getGMTCalendar() {
return parquetGMTCalendar.get();
}

public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion) {
return getNanoTime(ts, skipConversion, null);
}

/**
* Gets a NanoTime object, which represents timestamps as nanoseconds since epoch, from a
* Timestamp object. Parquet will store this NanoTime object as int96.
*
* If skipConversion flag is on, the timestamp will be converted to NanoTime as-is, i.e.
* timeZoneId argument will be ignored.
* If skipConversion is off, timestamp can be converted from a given time zone (timeZoneId) to UTC
* if timeZoneId is present, and if not present: from system time zone to UTC, before being
* converted to NanoTime.
* (See TimestampDataWriter#write for current Hive writing procedure.)
* Converts a timestamp from the specified timezone to UTC and returns its representation in NanoTime.
*/
public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion, ZoneId timeZoneId) {
if (skipConversion) {
timeZoneId = ZoneOffset.UTC;
} else if (timeZoneId == null) {
timeZoneId = TimeZone.getDefault().toZoneId();
}
ts = TimestampTZUtil.convertTimestampToZone(ts, timeZoneId, ZoneOffset.UTC);
public static NanoTime getNanoTime(Timestamp ts, ZoneId sourceZone, boolean legacyConversion) {
ts = TimestampTZUtil.convertTimestampToZone(ts, sourceZone, ZoneOffset.UTC, legacyConversion);

Calendar calendar = getGMTCalendar();
calendar.setTimeInMillis(ts.toEpochMilli());
Expand All @@ -95,32 +78,17 @@ public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion, ZoneId
return new NanoTime(days, nanosOfDay);
}

public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) {
return getTimestamp(nt, skipConversion, null, false);
public static Timestamp getTimestamp(NanoTime nt, ZoneId targetZone) {
return getTimestamp(nt, targetZone, false);
}

/**
* Gets a Timestamp object from a NanoTime object, which represents timestamps as nanoseconds
* since epoch. Parquet stores these as int96.
* Converts a nanotime representation in UTC, to a timestamp in the specified timezone.
*
* Before converting to NanoTime, we may convert the timestamp to a desired time zone
* (timeZoneId). This will only happen if skipConversion flag is off.
* If skipConversion is off and timeZoneId is not found, then convert the timestamp to system
* time zone.
*
* For skipConversion to be true it must be set in conf AND the parquet file must NOT be written
* by parquet's java library (parquet-mr). This is enforced in ParquetRecordReaderBase#getSplit.
* @param legacyConversion when true the conversion to the target timezone is done with legacy (backwards compatible)
* method.
*/
public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion, ZoneId timeZoneId,
boolean legacyConversionEnabled) {
boolean legacyConversion = false;
if (skipConversion) {
timeZoneId = ZoneOffset.UTC;
} else if (timeZoneId == null) {
legacyConversion = legacyConversionEnabled;
timeZoneId = TimeZone.getDefault().toZoneId();
}

public static Timestamp getTimestamp(NanoTime nt, ZoneId targetZone, boolean legacyConversion) {
int julianDay = nt.getJulianDay();
long nanosOfDay = nt.getTimeOfDayNanos();

Expand Down Expand Up @@ -151,7 +119,7 @@ public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion, ZoneId
calendar.set(Calendar.SECOND, seconds);

Timestamp ts = Timestamp.ofEpochMilli(calendar.getTimeInMillis(), (int) nanos);
ts = TimestampTZUtil.convertTimestampToZone(ts, ZoneOffset.UTC, timeZoneId, legacyConversion);
ts = TimestampTZUtil.convertTimestampToZone(ts, ZoneOffset.UTC, targetZone, legacyConversion);
return ts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Optional;
import java.util.TimeZone;

import static com.google.common.base.MoreObjects.firstNonNull;

/**
* Parquet file has self-describing schema which may differ from the user required schema (e.g.
Expand Down Expand Up @@ -1211,23 +1215,19 @@ private static byte[] convertToBytes(boolean value) {
* The reader who reads from the underlying Timestamp value value.
*/
public static class TypesFromInt96PageReader extends DefaultParquetDataColumnReader {
private boolean skipTimestampConversion = false;
private ZoneId writerTimezone;
private final ZoneId targetZone;
private boolean legacyConversionEnabled;

public TypesFromInt96PageReader(ValuesReader realReader, int length,
boolean skipTimestampConversion, ZoneId writerTimezone, boolean legacyConversionEnabled) {
public TypesFromInt96PageReader(ValuesReader realReader, int length, ZoneId targetZone,
boolean legacyConversionEnabled) {
super(realReader, length);
this.skipTimestampConversion = skipTimestampConversion;
this.writerTimezone = writerTimezone;
this.targetZone = targetZone;
this.legacyConversionEnabled = legacyConversionEnabled;
}

public TypesFromInt96PageReader(Dictionary dict, int length, boolean skipTimestampConversion,
ZoneId writerTimezone, boolean legacyConversionEnabled) {
public TypesFromInt96PageReader(Dictionary dict, int length, ZoneId targetZone, boolean legacyConversionEnabled) {
super(dict, length);
this.skipTimestampConversion = skipTimestampConversion;
this.writerTimezone = writerTimezone;
this.targetZone = targetZone;
this.legacyConversionEnabled = legacyConversionEnabled;
}

Expand All @@ -1237,7 +1237,7 @@ private Timestamp convert(Binary binary) {
long timeOfDayNanos = buf.getLong();
int julianDay = buf.getInt();
NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
return NanoTimeUtils.getTimestamp(nt, skipTimestampConversion, writerTimezone, legacyConversionEnabled);
return NanoTimeUtils.getTimestamp(nt, targetZone, legacyConversionEnabled);
}

@Override
Expand Down Expand Up @@ -1907,9 +1907,11 @@ private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(boolean i
return isDictionary ? new TypesFromFloatPageReader(dictionary, length, hivePrecision,
hiveScale) : new TypesFromFloatPageReader(valuesReader, length, hivePrecision, hiveScale);
case INT96:
return isDictionary ? new TypesFromInt96PageReader(dictionary, length,
skipTimestampConversion, writerTimezone, legacyConversionEnabled) : new
TypesFromInt96PageReader(valuesReader, length, skipTimestampConversion, writerTimezone, legacyConversionEnabled);
ZoneId targetZone =
skipTimestampConversion ? ZoneOffset.UTC : firstNonNull(writerTimezone, TimeZone.getDefault().toZoneId());
return isDictionary ?
new TypesFromInt96PageReader(dictionary, length, targetZone, legacyConversionEnabled) :
new TypesFromInt96PageReader(valuesReader, length, targetZone, legacyConversionEnabled);
case BOOLEAN:
return isDictionary ? new TypesFromBooleanPageReader(dictionary, length) : new
TypesFromBooleanPageReader(valuesReader, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DataWritableWriteSupport extends WriteSupport<ParquetHiveRecord> {
public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema";
public static final String WRITER_TIMEZONE = "writer.time.zone";
public static final String WRITER_DATE_PROLEPTIC = "writer.date.proleptic";
public static final String WRITER_ZONE_CONVERSION_LEGACY = "writer.zone.conversion.legacy";

private DataWritableWriter writer;
private MessageType schema;
Expand All @@ -60,6 +61,8 @@ public WriteContext init(final Configuration configuration) {
defaultDateProleptic = HiveConf.getBoolVar(
configuration, HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN);
metaData.put(WRITER_DATE_PROLEPTIC, String.valueOf(defaultDateProleptic));
metaData.put(WRITER_ZONE_CONVERSION_LEGACY, String
.valueOf(HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_WRITE_LEGACY_CONVERSION_ENABLED)));
return new WriteContext(schema, metaData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

import java.util.List;
import java.util.Map;
import java.util.TimeZone;

/**
*
Expand All @@ -69,6 +70,7 @@ public class DataWritableWriter {
protected final RecordConsumer recordConsumer;
private final GroupType schema;
private final boolean defaultDateProleptic;
private final boolean isLegacyZoneConversion;
private Configuration conf;

/* This writer will be created when writing the first row in order to get
Expand All @@ -80,6 +82,8 @@ public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType s
this.recordConsumer = recordConsumer;
this.schema = schema;
this.defaultDateProleptic = defaultDateProleptic;
this.isLegacyZoneConversion =
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_WRITE_LEGACY_CONVERSION_ENABLED.defaultBoolVal;
}

public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType schema,
Expand All @@ -88,6 +92,8 @@ public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType s
this.schema = schema;
this.defaultDateProleptic = defaultDateProleptic;
this.conf = conf;
this.isLegacyZoneConversion =
HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_WRITE_LEGACY_CONVERSION_ENABLED);
}

/**
Expand Down Expand Up @@ -536,7 +542,8 @@ public void write(Object value) {
Long int64value = ParquetTimestampUtils.getInt64(ts, timeUnit);
recordConsumer.addLong(int64value);
} else {
recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary());
recordConsumer.addBinary(
NanoTimeUtils.getNanoTime(ts, TimeZone.getDefault().toZoneId(), isLegacyZoneConversion).toBinary());
}
}

Expand Down

0 comments on commit f1ff996

Please sign in to comment.