Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-25104: Backward incompatible timestamp serialization in Parquet for certain timezones #2282

Merged
merged 12 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2215,8 +2215,13 @@ public static enum ConfVars {
"This value controls whether date type in Parquet files was written using the hybrid or proleptic\n" +
"calendar. Hybrid is the default."),
HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED("hive.parquet.timestamp.legacy.conversion.enabled", true,
jcamachor marked this conversation as resolved.
Show resolved Hide resolved
"This value controls whether we use former Java time API to convert between timezones on files where timezone\n" +
"is not encoded in the metadata. This is for debugging."),
"Whether to use former Java date/time APIs to convert between timezones when reading timestamps from " +
"Parquet files. The property has no effect when the file contains explicit metadata about the conversion " +
"used to write the data; in this case reading conversion is chosen based on the metadata."),
HIVE_PARQUET_TIMESTAMP_WRITE_LEGACY_CONVERSION_ENABLED("hive.parquet.timestamp.write.legacy.conversion.enabled", false,
"Whether to use former Java date/time APIs to convert between timezones when writing timestamps in " +
"Parquet files. Once data are written to the file the effect is permanent (also reflected in the metadata)." +
"Changing the value of this property affects only new data written to the file."),
HIVE_AVRO_TIMESTAMP_SKIP_CONVERSION("hive.avro.timestamp.skip.conversion", false,
"Some older Hive implementations (pre-3.1) wrote Avro timestamps in a UTC-normalized" +
"manner, while from version 3.1 until now Hive wrote time zone agnostic timestamps. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ Table Parameters:
previous_metadata_location hdfs://### HDFS PATH ###
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize 2626
totalSize 2899
#### A masked pattern was here ####
write.format.default parquet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ Table Parameters:
previous_metadata_location hdfs://### HDFS PATH ###
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize 1503
totalSize 1659
#### A masked pattern was here ####
write.format.default parquet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ Table Parameters:
rawDataSize 116
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize 568
totalSize 607
#### A masked pattern was here ####
write.format.default parquet

Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.jupiter.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.jupiter.version}</version>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions ql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,11 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.SerDeStats;
Expand Down Expand Up @@ -142,8 +143,11 @@ protected ParquetInputSplit getSplit(
skipProlepticConversion = HiveConf.getBoolVar(
conf, HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
}
legacyConversionEnabled = HiveConf.getBoolVar(
conf, ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED);
legacyConversionEnabled = HiveConf.getBoolVar(conf, ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED);
if (fileMetaData.getKeyValueMetaData().containsKey(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY)) {
legacyConversionEnabled = Boolean.parseBoolean(
fileMetaData.getKeyValueMetaData().get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY));
}

split = new ParquetInputSplit(finalPath,
splitStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;

import com.google.common.base.MoreObjects;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.ParquetTimestampUtils;
import org.apache.hadoop.hive.common.type.CalendarUtils;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
Expand Down Expand Up @@ -699,10 +703,11 @@ protected TimestampWritableV2 convert(Binary binary) {
// time zone in order to emulate time zone agnostic behavior.
boolean skipConversion = Boolean.parseBoolean(
metadata.get(HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION.varname));
boolean legacyConversion = Boolean.parseBoolean(
metadata.get(ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED.varname));
Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipConversion,
DataWritableReadSupport.getWriterTimeZoneId(metadata), legacyConversion);
String legacyConversion = metadata.get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY);
assert legacyConversion != null;
ZoneId targetZone = skipConversion ? ZoneOffset.UTC : MoreObjects
.firstNonNull(DataWritableReadSupport.getWriterTimeZoneId(metadata), TimeZone.getDefault().toZoneId());
Timestamp ts = NanoTimeUtils.getTimestamp(nt, targetZone, Boolean.parseBoolean(legacyConversion));
return new TimestampWritableV2(ts);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -303,7 +304,7 @@ public static Boolean getWriterDateProleptic(Map<String, String> metadata) {

return null;
}

/**
* Return the columns which contains required nested attribute level
* E.g., given struct a:<x:int, y:int> while 'x' is required and 'y' is not, the method will return
Expand Down Expand Up @@ -523,10 +524,30 @@ public RecordMaterializer<ArrayWritable> prepareForRead(final Configuration conf
configuration, HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT)));
}

String legacyConversion = ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED.varname;
if (!metadata.containsKey(legacyConversion)) {
metadata.put(legacyConversion, String.valueOf(HiveConf.getBoolVar(
configuration, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED)));
if (!metadata.containsKey(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY)) {
final String legacyConversion;
if(keyValueMetaData.containsKey(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY)) {
// If there is meta about the legacy conversion then the file should be read in the same way it was written.
legacyConversion = keyValueMetaData.get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY);
} else if(keyValueMetaData.containsKey(DataWritableWriteSupport.WRITER_TIMEZONE)) {
// If there is no meta about the legacy conversion but there is meta about the timezone then we can infer the
// file was written with the new rules.
legacyConversion = "false";
} else {
jcamachor marked this conversation as resolved.
Show resolved Hide resolved
// If there is no meta at all then it is not possible to determine which rules were used to write the file.
// Choose between old/new rules using the respective configuration property.
legacyConversion = String.valueOf(
HiveConf.getBoolVar(configuration, ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED));
}
metadata.put(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY, legacyConversion);
} else {
String ctxMeta = metadata.get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY);
String fileMeta = keyValueMetaData.get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY);
if (!Objects.equals(ctxMeta, fileMeta)) {
throw new IllegalStateException(
"Different values for " + DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY + " metadata: context ["
+ ctxMeta + "], file [" + fileMeta + "].");
}
}

return new DataWritableRecordConverter(readContext.getRequestedSchema(), metadata, hiveTypeInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,11 @@ private static Calendar getGMTCalendar() {
return parquetGMTCalendar.get();
}

public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion) {
return getNanoTime(ts, skipConversion, null);
}

/**
* Gets a NanoTime object, which represents timestamps as nanoseconds since epoch, from a
* Timestamp object. Parquet will store this NanoTime object as int96.
*
* If skipConversion flag is on, the timestamp will be converted to NanoTime as-is, i.e.
* timeZoneId argument will be ignored.
* If skipConversion is off, timestamp can be converted from a given time zone (timeZoneId) to UTC
* if timeZoneId is present, and if not present: from system time zone to UTC, before being
* converted to NanoTime.
* (See TimestampDataWriter#write for current Hive writing procedure.)
* Converts a timestamp from the specified timezone to UTC and returns its representation in NanoTime.
*/
public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion, ZoneId timeZoneId) {
if (skipConversion) {
timeZoneId = ZoneOffset.UTC;
} else if (timeZoneId == null) {
timeZoneId = TimeZone.getDefault().toZoneId();
}
ts = TimestampTZUtil.convertTimestampToZone(ts, timeZoneId, ZoneOffset.UTC);
public static NanoTime getNanoTime(Timestamp ts, ZoneId sourceZone, boolean legacyConversion) {
ts = TimestampTZUtil.convertTimestampToZone(ts, sourceZone, ZoneOffset.UTC, legacyConversion);

Calendar calendar = getGMTCalendar();
calendar.setTimeInMillis(ts.toEpochMilli());
Expand All @@ -95,32 +78,17 @@ public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion, ZoneId
return new NanoTime(days, nanosOfDay);
}

public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) {
return getTimestamp(nt, skipConversion, null, false);
public static Timestamp getTimestamp(NanoTime nt, ZoneId targetZone) {
return getTimestamp(nt, targetZone, false);
}

/**
* Gets a Timestamp object from a NanoTime object, which represents timestamps as nanoseconds
* since epoch. Parquet stores these as int96.
* Converts a nanotime representation in UTC, to a timestamp in the specified timezone.
*
* Before converting to NanoTime, we may convert the timestamp to a desired time zone
* (timeZoneId). This will only happen if skipConversion flag is off.
* If skipConversion is off and timeZoneId is not found, then convert the timestamp to system
* time zone.
*
* For skipConversion to be true it must be set in conf AND the parquet file must NOT be written
* by parquet's java library (parquet-mr). This is enforced in ParquetRecordReaderBase#getSplit.
* @param legacyConversion when true the conversion to the target timezone is done with legacy (backwards compatible)
* method.
*/
public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion, ZoneId timeZoneId,
boolean legacyConversionEnabled) {
boolean legacyConversion = false;
if (skipConversion) {
timeZoneId = ZoneOffset.UTC;
} else if (timeZoneId == null) {
legacyConversion = legacyConversionEnabled;
timeZoneId = TimeZone.getDefault().toZoneId();
}

public static Timestamp getTimestamp(NanoTime nt, ZoneId targetZone, boolean legacyConversion) {
int julianDay = nt.getJulianDay();
long nanosOfDay = nt.getTimeOfDayNanos();

Expand Down Expand Up @@ -151,7 +119,7 @@ public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion, ZoneId
calendar.set(Calendar.SECOND, seconds);

Timestamp ts = Timestamp.ofEpochMilli(calendar.getTimeInMillis(), (int) nanos);
ts = TimestampTZUtil.convertTimestampToZone(ts, ZoneOffset.UTC, timeZoneId, legacyConversion);
ts = TimestampTZUtil.convertTimestampToZone(ts, ZoneOffset.UTC, targetZone, legacyConversion);
return ts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Optional;
import java.util.TimeZone;

import static com.google.common.base.MoreObjects.firstNonNull;

/**
* Parquet file has self-describing schema which may differ from the user required schema (e.g.
Expand Down Expand Up @@ -1211,23 +1215,19 @@ private static byte[] convertToBytes(boolean value) {
* The reader who reads from the underlying Timestamp value value.
*/
public static class TypesFromInt96PageReader extends DefaultParquetDataColumnReader {
private boolean skipTimestampConversion = false;
private ZoneId writerTimezone;
private final ZoneId targetZone;
private boolean legacyConversionEnabled;

public TypesFromInt96PageReader(ValuesReader realReader, int length,
boolean skipTimestampConversion, ZoneId writerTimezone, boolean legacyConversionEnabled) {
public TypesFromInt96PageReader(ValuesReader realReader, int length, ZoneId targetZone,
boolean legacyConversionEnabled) {
super(realReader, length);
this.skipTimestampConversion = skipTimestampConversion;
this.writerTimezone = writerTimezone;
this.targetZone = targetZone;
this.legacyConversionEnabled = legacyConversionEnabled;
}

public TypesFromInt96PageReader(Dictionary dict, int length, boolean skipTimestampConversion,
ZoneId writerTimezone, boolean legacyConversionEnabled) {
public TypesFromInt96PageReader(Dictionary dict, int length, ZoneId targetZone, boolean legacyConversionEnabled) {
super(dict, length);
this.skipTimestampConversion = skipTimestampConversion;
this.writerTimezone = writerTimezone;
this.targetZone = targetZone;
this.legacyConversionEnabled = legacyConversionEnabled;
}

Expand All @@ -1237,7 +1237,7 @@ private Timestamp convert(Binary binary) {
long timeOfDayNanos = buf.getLong();
int julianDay = buf.getInt();
NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
return NanoTimeUtils.getTimestamp(nt, skipTimestampConversion, writerTimezone, legacyConversionEnabled);
return NanoTimeUtils.getTimestamp(nt, targetZone, legacyConversionEnabled);
}

@Override
Expand Down Expand Up @@ -1907,9 +1907,11 @@ private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(boolean i
return isDictionary ? new TypesFromFloatPageReader(dictionary, length, hivePrecision,
hiveScale) : new TypesFromFloatPageReader(valuesReader, length, hivePrecision, hiveScale);
case INT96:
return isDictionary ? new TypesFromInt96PageReader(dictionary, length,
skipTimestampConversion, writerTimezone, legacyConversionEnabled) : new
TypesFromInt96PageReader(valuesReader, length, skipTimestampConversion, writerTimezone, legacyConversionEnabled);
ZoneId targetZone =
skipTimestampConversion ? ZoneOffset.UTC : firstNonNull(writerTimezone, TimeZone.getDefault().toZoneId());
return isDictionary ?
new TypesFromInt96PageReader(dictionary, length, targetZone, legacyConversionEnabled) :
new TypesFromInt96PageReader(valuesReader, length, targetZone, legacyConversionEnabled);
case BOOLEAN:
return isDictionary ? new TypesFromBooleanPageReader(dictionary, length) : new
TypesFromBooleanPageReader(valuesReader, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DataWritableWriteSupport extends WriteSupport<ParquetHiveRecord> {
public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema";
public static final String WRITER_TIMEZONE = "writer.time.zone";
public static final String WRITER_DATE_PROLEPTIC = "writer.date.proleptic";
public static final String WRITER_ZONE_CONVERSION_LEGACY = "writer.zone.conversion.legacy";

private DataWritableWriter writer;
private MessageType schema;
Expand All @@ -60,6 +61,8 @@ public WriteContext init(final Configuration configuration) {
defaultDateProleptic = HiveConf.getBoolVar(
configuration, HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN);
metaData.put(WRITER_DATE_PROLEPTIC, String.valueOf(defaultDateProleptic));
metaData.put(WRITER_ZONE_CONVERSION_LEGACY, String
.valueOf(HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_WRITE_LEGACY_CONVERSION_ENABLED)));
return new WriteContext(schema, metaData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

import java.util.List;
import java.util.Map;
import java.util.TimeZone;

/**
*
Expand All @@ -69,6 +70,7 @@ public class DataWritableWriter {
protected final RecordConsumer recordConsumer;
private final GroupType schema;
private final boolean defaultDateProleptic;
private final boolean isLegacyZoneConversion;
private Configuration conf;

/* This writer will be created when writing the first row in order to get
Expand All @@ -80,6 +82,8 @@ public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType s
this.recordConsumer = recordConsumer;
this.schema = schema;
this.defaultDateProleptic = defaultDateProleptic;
this.isLegacyZoneConversion =
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_WRITE_LEGACY_CONVERSION_ENABLED.defaultBoolVal;
}

public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType schema,
Expand All @@ -88,6 +92,8 @@ public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType s
this.schema = schema;
this.defaultDateProleptic = defaultDateProleptic;
this.conf = conf;
this.isLegacyZoneConversion =
HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_WRITE_LEGACY_CONVERSION_ENABLED);
}

/**
Expand Down Expand Up @@ -536,7 +542,8 @@ public void write(Object value) {
Long int64value = ParquetTimestampUtils.getInt64(ts, timeUnit);
recordConsumer.addLong(int64value);
zabetak marked this conversation as resolved.
Show resolved Hide resolved
} else {
recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary());
recordConsumer.addBinary(
NanoTimeUtils.getNanoTime(ts, TimeZone.getDefault().toZoneId(), isLegacyZoneConversion).toBinary());
}
}

Expand Down