Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-8023: Convert java.sql.Date between UTC/local time zone normaliz… #4781

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.nifi.serialization.record;

import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,12 +27,12 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Array;
import java.sql.Date;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing java.util.Date to java.sql.Date changes the value of DATE_CLASS_NAME, does that have any other implications that should be considered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was intentional because it seemed to be a bug. ResultSet.getMetaData().getColumnClassName() would never return java.util.Date but the SQL date/time types.
However, now I see that this code section is related to QueryRecord and it may use java.util.Date. I'll revert it back.
Thanks for the heads up that made me double check it in more detail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for responding regarding the column class name handling. I will take another look once you push the next set of updates.

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -149,6 +150,12 @@ private Object normalizeValue(final Object value) throws SQLException {
return null;
}

if (value instanceof Date) {
// Date objects should be stored in records as UTC normalized dates (UTC 00:00:00)
// but they come from the driver in JVM's local time zone 00:00:00 and need to be converted.
return DataTypeUtils.convertDateToUTC((Date) value);
}

if (value instanceof List) {
return ((List) value).toArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1085,6 +1087,28 @@ public static java.sql.Date toDate(final Object value, final Supplier<DateFormat
throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date for field " + fieldName);
}

/**
* Converts a java.sql.Date object with 00:00:00 in local time zone (typically coming from a java.sql.ResultSet)
* to UTC normalized form (storing epoch corresponding to UTC 00:00:00 on the given day).
*
* @param date java.sql.Date with local time zone 00:00:00
* @return java.sql.Date with UTC 00:00:00
*/
public static Date convertDateToUTC(Date date) {
return new Date(date.toLocalDate().atStartOfDay(ZoneId.of("UTC")).toInstant().toEpochMilli());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the ZoneId.of("UTC") reference can be replaced with ZoneOffset.UTC?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

atStartOfDay() has ZoneId parameter only.
In my understanding ZoneOffset is DST-agnostic, so the ZoneId is the right choice in any case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the documentation, ZoneOffset extends ZoneId so would ZoneOffset.UTC function the same as ZoneId.of("UTC")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I did not notice that ZoneOffset extends ZoneId. Just focused on that ZoneId references an offset and the DST info.
As there is no DST in UTC, it is a simple offset so ZoneOffset.UTC will be fine. Thanks!

}

/**
* Converts a java.sql.Date object with 00:00:00 in UTC
* to local time zone normalized form (storing epoch corresponding to 00:00:00 in local time zone on the given day).
*
* @param date java.sql.Date with UTC 00:00:00
* @return java.sql.Date with local time zone 00:00:00
*/
public static Date convertDateToLocalTZ(Date date) {
return Date.valueOf(Instant.ofEpochMilli(date.getTime()).atZone(ZoneId.of("UTC")).toLocalDate());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method naming and implementation are a little confusing on initial read. The atZone() method returns a ZonedDateTime and toLocalDate() return the java.time.LocalDate portion, but does that actually convert the date to the system default time zone? It may be helpful to break this into multiple lines as it is a bit hard to follow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed... what do you think about this?

    public static Date convertDateToUTC(Date dateLocalTZ) {
        ZonedDateTime zdtLocalTZ = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateLocalTZ.getTime()), ZoneId.systemDefault());
        ZonedDateTime zdtUTC = zdtLocalTZ.withZoneSameLocal(ZoneId.of("UTC"));
        Date dateUTC = new Date(zdtUTC.toInstant().toEpochMilli());
        return dateUTC;
    }

No LocaDate and I believe withZoneSameLocal() in the middle of the method really grasps what is going on here.
The same can be applied to convertDateToLocalTZ() similarly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updated suggestion, breaking things out read much more easily, and the withZoneSameLocal() method makes more sense now. Based on that snippet, I would suggest just returning the final Date object instead of declaring and returning.

}

public static boolean isDateTypeCompatible(final Object value, final String format) {
if (value == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -874,4 +879,44 @@ public void testIsFittingNumberType() {
assertTrue(DataTypeUtils.isFittingNumberType(9D, RecordFieldType.DOUBLE));
assertFalse(DataTypeUtils.isFittingNumberType(9, RecordFieldType.DOUBLE));
}

@Test
public void testConvertDateToUTC() {
int year = 2021;
int month = 1;
int dayOfMonth = 25;

Date dateLocalTZ = new Date(ZonedDateTime.of(LocalDateTime.of(year, month, dayOfMonth,0,0,0), ZoneId.systemDefault()).toInstant().toEpochMilli());

Date dateUTC = DataTypeUtils.convertDateToUTC(dateLocalTZ);

ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateUTC.getTime()), ZoneId.of("UTC"));
assertEquals(year, zdt.getYear());
assertEquals(month, zdt.getMonthValue());
assertEquals(dayOfMonth, zdt.getDayOfMonth());
assertEquals(0, zdt.getHour());
assertEquals(0, zdt.getMinute());
assertEquals(0, zdt.getSecond());
assertEquals(0, zdt.getNano());
}

@Test
public void testConvertDateToLocalTZ() {
int year = 2021;
int month = 1;
int dayOfMonth = 25;

Date dateUTC = new Date(ZonedDateTime.of(LocalDateTime.of(year, month, dayOfMonth,0,0,0), ZoneId.of("UTC")).toInstant().toEpochMilli());

Date dateLocalTZ = DataTypeUtils.convertDateToLocalTZ(dateUTC);

ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateLocalTZ.getTime()), ZoneId.systemDefault());
assertEquals(year, zdt.getYear());
assertEquals(month, zdt.getMonthValue());
assertEquals(dayOfMonth, zdt.getDayOfMonth());
assertEquals(0, zdt.getHour());
assertEquals(0, zdt.getMinute());
assertEquals(0, zdt.getSecond());
assertEquals(0, zdt.getNano());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

import javax.xml.bind.DatatypeConverter;

Expand Down Expand Up @@ -393,6 +394,18 @@ public static long convertToAvroStream(final ResultSet rs, final OutputStream ou
rec.put(i-1, value);
}

} else if (value instanceof java.sql.Date) {
if (options.useLogicalTypes) {
// Delegate mapping to AvroTypeUtil in order to utilize logical types.
// AvroTypeUtil.convertToAvroObject() expects java.sql.Date object as a UTC normalized date (UTC 00:00:00)
// but it comes from the driver in JVM's local time zone 00:00:00 and needs to be converted.
java.sql.Date normalizedDate = DataTypeUtils.convertDateToUTC((java.sql.Date) value);
rec.put(i - 1, AvroTypeUtil.convertToAvroObject(normalizedDate, fieldSchema));
} else {
// As string for backward compatibility.
rec.put(i - 1, value.toString());
}

} else if (value instanceof Date) {
if (options.useLogicalTypes) {
// Delegate mapping to AvroTypeUtil in order to utilize logical types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.avro.io.DatumReader;
import org.apache.avro.util.Utf8;
import org.apache.commons.io.input.ReaderInputStream;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down Expand Up @@ -679,11 +680,12 @@ public void testConvertToAvroStreamForDateTimeAsLogicalType() throws SQLExceptio

testConvertToAvroStreamForDateTime(options,
(record, date) -> {
java.sql.Date expected = DataTypeUtils.convertDateToUTC(date);
final int daysSinceEpoch = (int) record.get("date");
final long millisSinceEpoch = TimeUnit.MILLISECONDS.convert(daysSinceEpoch, TimeUnit.DAYS);
java.sql.Date actual = java.sql.Date.valueOf(Instant.ofEpochMilli(millisSinceEpoch).atZone(ZoneOffset.UTC).toLocalDate());
LOGGER.debug("comparing dates, expecting '{}', actual '{}'", date, actual);
assertEquals(date, actual);
java.sql.Date actual = new java.sql.Date(millisSinceEpoch);
LOGGER.debug("comparing dates, expecting '{}', actual '{}'", expected, actual);
assertEquals(expected, actual);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of performing multiple conversions for the test, could this comparison be simplified by comparing daysSinceEpoch to the actual number of expected days? That would make the assert much clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests (historically) do not assert the raw value (int / long) from the Avro record but reconstruct the Date/Time/Timestamp objects and compare them with the original objects (which were the inputs of the test cases).
I believe the reason was to avoid to calculate the days/millis/etc. from the test input data which would be a similarly complicated method.

As the test inputs are literals, it is not needed to calculate the expected value from code, but it can be determined once and hard coded as literals too. What do you think?
I mean:

int expectedDaysSinceEpoch = 17444;
assertDate.accept(record, expectedDaysSinceEpoch);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code snippet you suggested looks much clearer and avoids the conversion, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not go on that way because testConvertToAvroStreamForDateTime() is used in two places and the other really expects a Date object.
However, I found a simpler way to convert the Date to the expected number of days, and then to assert the raw value from Avro against that.

},
(record, time) -> {
int millisSinceMidnight = (int) record.get("time");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -483,16 +484,17 @@ public void testBytesDecimalConversion(){

@Test
public void testDateConversion() {
final Calendar c = Calendar.getInstance();
final Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
c.set(2019, Calendar.JANUARY, 1, 0, 0, 0);
c.set(Calendar.MILLISECOND, 0);
final long epochMillis = c.getTimeInMillis();

final LogicalTypes.Date dateType = LogicalTypes.date();
final Schema fieldSchema = Schema.create(Type.INT);
dateType.addToSchema(fieldSchema);
final Object convertedValue = AvroTypeUtil.convertToAvroObject(new Date(epochMillis), fieldSchema);
assertTrue(convertedValue instanceof Integer);
assertEquals((int) convertedValue, LocalDate.of(2019, 1, 1).toEpochDay());
assertEquals(LocalDate.of(2019, 1, 1).toEpochDay(), (int) convertedValue);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
Expand All @@ -71,6 +72,7 @@
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTransientException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -685,10 +687,15 @@ private void executeDML(final ProcessContext context, final ProcessSession sessi

for (int i = 0; i < fieldIndexes.size(); i++) {
final int currentFieldIndex = fieldIndexes.get(i);
final Object currentValue = values[currentFieldIndex];
Object currentValue = values[currentFieldIndex];
final DataType dataType = dataTypes.get(currentFieldIndex);
final int sqlType = DataTypeUtils.getSQLTypeValue(dataType);

if (sqlType == Types.DATE && currentValue instanceof Date) {
// convert Date from the internal UTC normalized form to local time zone needed by database drivers
currentValue = DataTypeUtils.convertDateToLocalTZ((Date) currentValue);
}

// If DELETE type, insert the object twice because of the null check (see generateDelete for details)
if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
ps.setObject(i * 2 + 1, currentValue, sqlType);
Expand Down