Skip to content

Commit

Permalink
HIVE-25104: Backward incompatible timestamp serialization in Parquet …
Browse files Browse the repository at this point in the history
…for certain timezones (Stamatis Zampetakis, reviewed by Jesus Camacho Rodriguez)

Closes apache#2282
  • Loading branch information
zabetak authored and sankarh committed Jun 25, 2021
1 parent 50fbd22 commit 0f0d512
Show file tree
Hide file tree
Showing 15 changed files with 574 additions and 97 deletions.
9 changes: 7 additions & 2 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -1959,8 +1959,13 @@ public static enum ConfVars {
"This value controls whether date type in Parquet files was written using the hybrid or proleptic\n" +
"calendar. Hybrid is the default."),
HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED("hive.parquet.timestamp.legacy.conversion.enabled", true,
"This value controls whether we use former Java time API to convert between timezones on files where timezone\n" +
"is not encoded in the metadata. This is for debugging."),
"Whether to use former Java date/time APIs to convert between timezones when reading timestamps from " +
"Parquet files. The property has no effect when the file contains explicit metadata about the conversion " +
"used to write the data; in this case reading conversion is chosen based on the metadata."),
HIVE_PARQUET_TIMESTAMP_WRITE_LEGACY_CONVERSION_ENABLED("hive.parquet.timestamp.write.legacy.conversion.enabled", false,
"Whether to use former Java date/time APIs to convert between timezones when writing timestamps in " +
"Parquet files. Once data are written to the file the effect is permanent (also reflected in the metadata)." +
"Changing the value of this property affects only new data written to the file."),
HIVE_AVRO_TIMESTAMP_SKIP_CONVERSION("hive.avro.timestamp.skip.conversion", false,
"Some older Hive implementations (pre-3.1) wrote Avro timestamps in a UTC-normalized" +
"manner, while from version 3.1 until now Hive wrote time zone agnostic timestamps. " +
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.jupiter.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.SerDeStats;
Expand Down Expand Up @@ -134,8 +135,11 @@ protected ParquetInputSplit getSplit(
skipProlepticConversion = HiveConf.getBoolVar(
conf, HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
}
legacyConversionEnabled = HiveConf.getBoolVar(
conf, ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED);
legacyConversionEnabled = HiveConf.getBoolVar(conf, ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED);
if (fileMetaData.getKeyValueMetaData().containsKey(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY)) {
legacyConversionEnabled = Boolean.parseBoolean(
fileMetaData.getKeyValueMetaData().get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY));
}

split = new ParquetInputSplit(finalPath,
splitStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@
package org.apache.hadoop.hive.ql.io.parquet.convert;

import java.math.BigDecimal;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Map;
import java.util.TimeZone;

import com.google.common.base.MoreObjects;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
import org.apache.hadoop.hive.common.type.CalendarUtils;
import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
Expand Down Expand Up @@ -611,10 +615,11 @@ protected TimestampWritableV2 convert(Binary binary) {
// time zone in order to emulate time zone agnostic behavior.
boolean skipConversion = Boolean.parseBoolean(
metadata.get(HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION.varname));
boolean legacyConversion = Boolean.parseBoolean(
metadata.get(ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED.varname));
Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipConversion,
DataWritableReadSupport.getWriterTimeZoneId(metadata), legacyConversion);
String legacyConversion = metadata.get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY);
assert legacyConversion != null;
ZoneId targetZone = skipConversion ? ZoneOffset.UTC : MoreObjects
.firstNonNull(DataWritableReadSupport.getWriterTimeZoneId(metadata), TimeZone.getDefault().toZoneId());
Timestamp ts = NanoTimeUtils.getTimestamp(nt, targetZone, Boolean.parseBoolean(legacyConversion));
return new TimestampWritableV2(ts);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -303,7 +304,7 @@ public static Boolean getWriterDateProleptic(Map<String, String> metadata) {

return null;
}

/**
* Return the columns which contains required nested attribute level
* E.g., given struct a:<x:int, y:int> while 'x' is required and 'y' is not, the method will return
Expand Down Expand Up @@ -523,10 +524,30 @@ public RecordMaterializer<ArrayWritable> prepareForRead(final Configuration conf
configuration, HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT)));
}

String legacyConversion = ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED.varname;
if (!metadata.containsKey(legacyConversion)) {
metadata.put(legacyConversion, String.valueOf(HiveConf.getBoolVar(
configuration, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED)));
if (!metadata.containsKey(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY)) {
final String legacyConversion;
if(keyValueMetaData.containsKey(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY)) {
// If there is meta about the legacy conversion then the file should be read in the same way it was written.
legacyConversion = keyValueMetaData.get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY);
} else if(keyValueMetaData.containsKey(DataWritableWriteSupport.WRITER_TIMEZONE)) {
// If there is no meta about the legacy conversion but there is meta about the timezone then we can infer the
// file was written with the new rules.
legacyConversion = "false";
} else {
// If there is no meta at all then it is not possible to determine which rules were used to write the file.
// Choose between old/new rules using the respective configuration property.
legacyConversion = String.valueOf(
HiveConf.getBoolVar(configuration, ConfVars.HIVE_PARQUET_TIMESTAMP_LEGACY_CONVERSION_ENABLED));
}
metadata.put(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY, legacyConversion);
} else {
String ctxMeta = metadata.get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY);
String fileMeta = keyValueMetaData.get(DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY);
if (!Objects.equals(ctxMeta, fileMeta)) {
throw new IllegalStateException(
"Different values for " + DataWritableWriteSupport.WRITER_ZONE_CONVERSION_LEGACY + " metadata: context ["
+ ctxMeta + "], file [" + fileMeta + "].");
}
}

return new DataWritableRecordConverter(readContext.getRequestedSchema(), metadata, hiveTypeInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,11 @@ private static Calendar getGMTCalendar() {
return parquetGMTCalendar.get();
}

public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion) {
return getNanoTime(ts, skipConversion, null);
}

/**
* Gets a NanoTime object, which represents timestamps as nanoseconds since epoch, from a
* Timestamp object. Parquet will store this NanoTime object as int96.
*
* If skipConversion flag is on, the timestamp will be converted to NanoTime as-is, i.e.
* timeZoneId argument will be ignored.
* If skipConversion is off, timestamp can be converted from a given time zone (timeZoneId) to UTC
* if timeZoneId is present, and if not present: from system time zone to UTC, before being
* converted to NanoTime.
* (See TimestampDataWriter#write for current Hive writing procedure.)
* Converts a timestamp from the specified timezone to UTC and returns its representation in NanoTime.
*/
public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion, ZoneId timeZoneId) {
if (skipConversion) {
timeZoneId = ZoneOffset.UTC;
} else if (timeZoneId == null) {
timeZoneId = TimeZone.getDefault().toZoneId();
}
ts = TimestampTZUtil.convertTimestampToZone(ts, timeZoneId, ZoneOffset.UTC);
public static NanoTime getNanoTime(Timestamp ts, ZoneId sourceZone, boolean legacyConversion) {
ts = TimestampTZUtil.convertTimestampToZone(ts, sourceZone, ZoneOffset.UTC, legacyConversion);

Calendar calendar = getGMTCalendar();
calendar.setTimeInMillis(ts.toEpochMilli());
Expand All @@ -94,32 +77,17 @@ public static NanoTime getNanoTime(Timestamp ts, boolean skipConversion, ZoneId
return new NanoTime(days, nanosOfDay);
}

public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion) {
return getTimestamp(nt, skipConversion, null, false);
public static Timestamp getTimestamp(NanoTime nt, ZoneId targetZone) {
return getTimestamp(nt, targetZone, false);
}

/**
* Gets a Timestamp object from a NanoTime object, which represents timestamps as nanoseconds
* since epoch. Parquet stores these as int96.
* Converts a nanotime representation in UTC, to a timestamp in the specified timezone.
*
* Before converting to NanoTime, we may convert the timestamp to a desired time zone
* (timeZoneId). This will only happen if skipConversion flag is off.
* If skipConversion is off and timeZoneId is not found, then convert the timestamp to system
* time zone.
*
* For skipConversion to be true it must be set in conf AND the parquet file must NOT be written
* by parquet's java library (parquet-mr). This is enforced in ParquetRecordReaderBase#getSplit.
* @param legacyConversion when true the conversion to the target timezone is done with legacy (backwards compatible)
* method.
*/
public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion, ZoneId timeZoneId,
boolean legacyConversionEnabled) {
boolean legacyConversion = false;
if (skipConversion) {
timeZoneId = ZoneOffset.UTC;
} else if (timeZoneId == null) {
legacyConversion = legacyConversionEnabled;
timeZoneId = TimeZone.getDefault().toZoneId();
}

public static Timestamp getTimestamp(NanoTime nt, ZoneId targetZone, boolean legacyConversion) {
int julianDay = nt.getJulianDay();
long nanosOfDay = nt.getTimeOfDayNanos();

Expand Down Expand Up @@ -149,7 +117,7 @@ public static Timestamp getTimestamp(NanoTime nt, boolean skipConversion, ZoneId
calendar.set(Calendar.SECOND, seconds);

Timestamp ts = Timestamp.ofEpochMilli(calendar.getTimeInMillis(), (int) nanos);
ts = TimestampTZUtil.convertTimestampToZone(ts, ZoneOffset.UTC, timeZoneId, legacyConversion);
ts = TimestampTZUtil.convertTimestampToZone(ts, ZoneOffset.UTC, targetZone, legacyConversion);
return ts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;

import java.util.TimeZone;

import static com.google.common.base.MoreObjects.firstNonNull;

/**
* Parquet file has self-describing schema which may differ from the user required schema (e.g.
* schema evolution). This factory is used to retrieve user required typed data via corresponding
Expand Down Expand Up @@ -1177,23 +1182,19 @@ private static byte[] convertToBytes(boolean value) {
* The reader who reads from the underlying Timestamp value value.
*/
public static class TypesFromInt96PageReader extends DefaultParquetDataColumnReader {
private boolean skipTimestampConversion = false;
private ZoneId writerTimezone;
private final ZoneId targetZone;
private boolean legacyConversionEnabled;

public TypesFromInt96PageReader(ValuesReader realReader, int length,
boolean skipTimestampConversion, ZoneId writerTimezone, boolean legacyConversionEnabled) {
public TypesFromInt96PageReader(ValuesReader realReader, int length, ZoneId targetZone,
boolean legacyConversionEnabled) {
super(realReader, length);
this.skipTimestampConversion = skipTimestampConversion;
this.writerTimezone = writerTimezone;
this.targetZone = targetZone;
this.legacyConversionEnabled = legacyConversionEnabled;
}

public TypesFromInt96PageReader(Dictionary dict, int length, boolean skipTimestampConversion,
ZoneId writerTimezone, boolean legacyConversionEnabled) {
public TypesFromInt96PageReader(Dictionary dict, int length, ZoneId targetZone, boolean legacyConversionEnabled) {
super(dict, length);
this.skipTimestampConversion = skipTimestampConversion;
this.writerTimezone = writerTimezone;
this.targetZone = targetZone;
this.legacyConversionEnabled = legacyConversionEnabled;
}

Expand All @@ -1203,7 +1204,7 @@ private Timestamp convert(Binary binary) {
long timeOfDayNanos = buf.getLong();
int julianDay = buf.getInt();
NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
return NanoTimeUtils.getTimestamp(nt, skipTimestampConversion, writerTimezone, legacyConversionEnabled);
return NanoTimeUtils.getTimestamp(nt, targetZone, legacyConversionEnabled);
}

@Override
Expand Down Expand Up @@ -1536,9 +1537,11 @@ private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(boolean i
return isDictionary ? new TypesFromFloatPageReader(dictionary, length, hivePrecision,
hiveScale) : new TypesFromFloatPageReader(valuesReader, length, hivePrecision, hiveScale);
case INT96:
return isDictionary ? new TypesFromInt96PageReader(dictionary, length,
skipTimestampConversion, writerTimezone, legacyConversionEnabled) : new
TypesFromInt96PageReader(valuesReader, length, skipTimestampConversion, writerTimezone, legacyConversionEnabled);
ZoneId targetZone =
skipTimestampConversion ? ZoneOffset.UTC : firstNonNull(writerTimezone, TimeZone.getDefault().toZoneId());
return isDictionary ?
new TypesFromInt96PageReader(dictionary, length, targetZone, legacyConversionEnabled) :
new TypesFromInt96PageReader(valuesReader, length, targetZone, legacyConversionEnabled);
case BOOLEAN:
return isDictionary ? new TypesFromBooleanPageReader(dictionary, length) : new
TypesFromBooleanPageReader(valuesReader, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ public class DataWritableWriteSupport extends WriteSupport<ParquetHiveRecord> {
public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema";
public static final String WRITER_TIMEZONE = "writer.time.zone";
public static final String WRITER_DATE_PROLEPTIC = "writer.date.proleptic";
public static final String WRITER_ZONE_CONVERSION_LEGACY = "writer.zone.conversion.legacy";

private DataWritableWriter writer;
private MessageType schema;
private boolean defaultDateProleptic;
private Configuration conf;

public static void setSchema(final MessageType schema, final Configuration configuration) {
configuration.set(PARQUET_HIVE_SCHEMA, schema.toString());
Expand All @@ -51,18 +53,21 @@ public static MessageType getSchema(final Configuration configuration) {

@Override
public WriteContext init(final Configuration configuration) {
conf = configuration;
schema = getSchema(configuration);
Map<String, String> metaData = new HashMap<>();
metaData.put(WRITER_TIMEZONE, TimeZone.getDefault().toZoneId().toString());
defaultDateProleptic = HiveConf.getBoolVar(
configuration, HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN);
metaData.put(WRITER_DATE_PROLEPTIC, String.valueOf(defaultDateProleptic));
metaData.put(WRITER_ZONE_CONVERSION_LEGACY, String
.valueOf(HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_WRITE_LEGACY_CONVERSION_ENABLED)));
return new WriteContext(schema, metaData);
}

@Override
public void prepareForWrite(final RecordConsumer recordConsumer) {
writer = new DataWritableWriter(recordConsumer, schema, defaultDateProleptic);
writer = new DataWritableWriter(recordConsumer, schema, defaultDateProleptic, conf);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
*/
package org.apache.hadoop.hive.ql.io.parquet.write;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
import org.apache.hadoop.hive.common.type.CalendarUtils;
Expand Down Expand Up @@ -52,6 +54,7 @@

import java.util.List;
import java.util.Map;
import java.util.TimeZone;

/**
*
Expand All @@ -64,6 +67,8 @@ public class DataWritableWriter {
protected final RecordConsumer recordConsumer;
private final GroupType schema;
private final boolean defaultDateProleptic;
private final boolean isLegacyZoneConversion;
private Configuration conf;

/* This writer will be created when writing the first row in order to get
information about how to inspect the record data. */
Expand All @@ -74,6 +79,18 @@ public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType s
this.recordConsumer = recordConsumer;
this.schema = schema;
this.defaultDateProleptic = defaultDateProleptic;
this.isLegacyZoneConversion =
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_WRITE_LEGACY_CONVERSION_ENABLED.defaultBoolVal;
}

public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType schema,
final boolean defaultDateProleptic, final Configuration conf) {
this.recordConsumer = recordConsumer;
this.schema = schema;
this.defaultDateProleptic = defaultDateProleptic;
this.conf = conf;
this.isLegacyZoneConversion =
HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_WRITE_LEGACY_CONVERSION_ENABLED);
}

/**
Expand Down Expand Up @@ -500,7 +517,7 @@ public TimestampDataWriter(TimestampObjectInspector inspector) {
@Override
public void write(Object value) {
Timestamp ts = inspector.getPrimitiveJavaObject(value);
recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary());
recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, TimeZone.getDefault().toZoneId(), isLegacyZoneConversion).toBinary());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;

import static junit.framework.Assert.assertTrue;
import static junit.framework.TestCase.assertFalse;
Expand Down Expand Up @@ -211,7 +212,7 @@ protected static boolean getBooleanValue(
protected static NanoTime getNanoTime(int index) {
Timestamp ts = new Timestamp();
ts.setTimeInMillis(index);
return NanoTimeUtils.getNanoTime(ts, false);
return NanoTimeUtils.getNanoTime(ts, TimeZone.getDefault().toZoneId(), false);
}

protected static HiveDecimal getDecimal(
Expand Down

0 comments on commit 0f0d512

Please sign in to comment.