Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-8023: Convert java.sql.Date between UTC/local time zone normaliz… #4781

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.nifi.serialization.record;

import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -149,6 +150,12 @@ private Object normalizeValue(final Object value) throws SQLException {
return null;
}

if (value instanceof java.sql.Date) {
// Date objects should be stored in records as UTC normalized dates (UTC 00:00:00)
// but they come from the driver in JVM's local time zone 00:00:00 and need to be converted.
return DataTypeUtils.convertDateToUTC((java.sql.Date) value);
}

if (value instanceof List) {
return ((List) value).toArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1085,6 +1089,32 @@ public static java.sql.Date toDate(final Object value, final Supplier<DateFormat
throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date for field " + fieldName);
}

/**
* Converts a java.sql.Date object in local time zone (typically coming from a java.sql.ResultSet and having 00:00:00 time part)
* to UTC normalized form (storing the epoch corresponding to the UTC time with the same date/time as the input).
*
* @param dateLocalTZ java.sql.Date in local time zone
* @return java.sql.Date in UTC normalized form
*/
public static Date convertDateToUTC(Date dateLocalTZ) {
ZonedDateTime zdtLocalTZ = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateLocalTZ.getTime()), ZoneId.systemDefault());
ZonedDateTime zdtUTC = zdtLocalTZ.withZoneSameLocal(ZoneOffset.UTC);
return new Date(zdtUTC.toInstant().toEpochMilli());
}

/**
* Converts a java.sql.Date object in UTC normalized form
* to local time zone (storing the epoch corresponding to the local time with the same date/time as the input).
*
* @param dateUTC java.sql.Date in UTC normalized form
* @return java.sql.Date in local time zone
*/
public static Date convertDateToLocalTZ(Date dateUTC) {
ZonedDateTime zdtUTC = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateUTC.getTime()), ZoneOffset.UTC);
ZonedDateTime zdtLocalTZ = zdtUTC.withZoneSameLocal(ZoneId.systemDefault());
return new Date(zdtLocalTZ.toInstant().toEpochMilli());
}

public static boolean isDateTypeCompatible(final Object value, final String format) {
if (value == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -27,35 +26,61 @@
import org.mockito.junit.MockitoJUnitRunner;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class ResultSetRecordSetTest {

private static final String COLUMN_NAME_VARCHAR = "varchar";
private static final String COLUMN_NAME_BIGINT = "bigint";
private static final String COLUMN_NAME_ROWID = "rowid";
private static final String COLUMN_NAME_BIT = "bit";
private static final String COLUMN_NAME_BOOLEAN = "boolean";
private static final String COLUMN_NAME_CHAR = "char";
private static final String COLUMN_NAME_DATE = "date";
private static final String COLUMN_NAME_INTEGER = "integer";
private static final String COLUMN_NAME_DOUBLE = "double";
private static final String COLUMN_NAME_REAL = "real";
private static final String COLUMN_NAME_FLOAT = "float";
private static final String COLUMN_NAME_SMALLINT = "smallint";
private static final String COLUMN_NAME_TINYINT = "tinyint";
private static final String COLUMN_NAME_BIG_DECIMAL_1 = "bigDecimal1";
private static final String COLUMN_NAME_BIG_DECIMAL_2 = "bigDecimal2";
private static final String COLUMN_NAME_BIG_DECIMAL_3 = "bigDecimal3";
private static final String COLUMN_NAME_BIG_DECIMAL_4 = "bigDecimal4";

private static final Object[][] COLUMNS = new Object[][] {
// column number; column label / name / schema field; column type; schema data type;
{1, "varchar", Types.VARCHAR, RecordFieldType.STRING.getDataType()},
{2, "bigint", Types.BIGINT, RecordFieldType.LONG.getDataType()},
{3, "rowid", Types.ROWID, RecordFieldType.LONG.getDataType()},
{4, "bit", Types.BIT, RecordFieldType.BOOLEAN.getDataType()},
{5, "boolean", Types.BOOLEAN, RecordFieldType.BOOLEAN.getDataType()},
{6, "char", Types.CHAR, RecordFieldType.CHAR.getDataType()},
{7, "date", Types.DATE, RecordFieldType.DATE.getDataType()},
{8, "integer", Types.INTEGER, RecordFieldType.INT.getDataType()},
{9, "double", Types.DOUBLE, RecordFieldType.DOUBLE.getDataType()},
{10, "real", Types.REAL, RecordFieldType.DOUBLE.getDataType()},
{11, "float", Types.FLOAT, RecordFieldType.FLOAT.getDataType()},
{12, "smallint", Types.SMALLINT, RecordFieldType.SHORT.getDataType()},
{13, "tinyint", Types.TINYINT, RecordFieldType.BYTE.getDataType()},
{14, "bigDecimal1", Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)},
{15, "bigDecimal2", Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)},
{16, "bigDecimal3", Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)},
{17, "bigDecimal4", Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(10, 3)},
{1, COLUMN_NAME_VARCHAR, Types.VARCHAR, RecordFieldType.STRING.getDataType()},
{2, COLUMN_NAME_BIGINT, Types.BIGINT, RecordFieldType.LONG.getDataType()},
{3, COLUMN_NAME_ROWID, Types.ROWID, RecordFieldType.LONG.getDataType()},
{4, COLUMN_NAME_BIT, Types.BIT, RecordFieldType.BOOLEAN.getDataType()},
{5, COLUMN_NAME_BOOLEAN, Types.BOOLEAN, RecordFieldType.BOOLEAN.getDataType()},
{6, COLUMN_NAME_CHAR, Types.CHAR, RecordFieldType.CHAR.getDataType()},
{7, COLUMN_NAME_DATE, Types.DATE, RecordFieldType.DATE.getDataType()},
{8, COLUMN_NAME_INTEGER, Types.INTEGER, RecordFieldType.INT.getDataType()},
{9, COLUMN_NAME_DOUBLE, Types.DOUBLE, RecordFieldType.DOUBLE.getDataType()},
{10, COLUMN_NAME_REAL, Types.REAL, RecordFieldType.DOUBLE.getDataType()},
{11, COLUMN_NAME_FLOAT, Types.FLOAT, RecordFieldType.FLOAT.getDataType()},
{12, COLUMN_NAME_SMALLINT, Types.SMALLINT, RecordFieldType.SHORT.getDataType()},
{13, COLUMN_NAME_TINYINT, Types.TINYINT, RecordFieldType.BYTE.getDataType()},
{14, COLUMN_NAME_BIG_DECIMAL_1, Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)},
{15, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)},
{16, COLUMN_NAME_BIG_DECIMAL_3, Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)},
{17, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(10, 3)},
};

@Mock
Expand All @@ -66,26 +91,26 @@ public class ResultSetRecordSetTest {

@Before
public void setUp() throws SQLException {
Mockito.when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
Mockito.when(resultSetMetaData.getColumnCount()).thenReturn(COLUMNS.length);
when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
when(resultSetMetaData.getColumnCount()).thenReturn(COLUMNS.length);

for (final Object[] column : COLUMNS) {
Mockito.when(resultSetMetaData.getColumnLabel((Integer) column[0])).thenReturn((column[1]) + "Col");
Mockito.when(resultSetMetaData.getColumnName((Integer) column[0])).thenReturn((String) column[1]);
Mockito.when(resultSetMetaData.getColumnType((Integer) column[0])).thenReturn((Integer) column[2]);
when(resultSetMetaData.getColumnLabel((Integer) column[0])).thenReturn((String) (column[1]));
when(resultSetMetaData.getColumnName((Integer) column[0])).thenReturn((String) column[1]);
when(resultSetMetaData.getColumnType((Integer) column[0])).thenReturn((Integer) column[2]);

if(column[3] instanceof DecimalDataType) {
DecimalDataType ddt = (DecimalDataType)column[3];
Mockito.when(resultSetMetaData.getPrecision((Integer) column[0])).thenReturn(ddt.getPrecision());
Mockito.when(resultSetMetaData.getScale((Integer) column[0])).thenReturn(ddt.getScale());
when(resultSetMetaData.getPrecision((Integer) column[0])).thenReturn(ddt.getPrecision());
when(resultSetMetaData.getScale((Integer) column[0])).thenReturn(ddt.getScale());
}
}

// Big decimal values are necessary in order to determine precision and scale
Mockito.when(resultSet.getBigDecimal(16)).thenReturn(new BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1"));
when(resultSet.getBigDecimal(16)).thenReturn(new BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1"));

// This will be handled by a dedicated branch for Java Objects, needs some further details
Mockito.when(resultSetMetaData.getColumnClassName(16)).thenReturn(BigDecimal.class.getName());
when(resultSetMetaData.getColumnClassName(16)).thenReturn(BigDecimal.class.getName());
}

@Test
Expand Down Expand Up @@ -124,7 +149,7 @@ public void testCreateSchemaWhenOtherType() throws SQLException {
final RecordSchema resultSchema = testSubject.getSchema();

// then
Assert.assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(30, 10), resultSchema.getField(0).getDataType());
assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(30, 10), resultSchema.getField(0).getDataType());
}

@Test
Expand All @@ -137,17 +162,88 @@ public void testCreateSchemaWhenOtherTypeWithoutSchema() throws SQLException {
final RecordSchema resultSchema = testSubject.getSchema();

// then
Assert.assertEquals(RecordFieldType.CHOICE, resultSchema.getField(0).getDataType().getFieldType());
assertEquals(RecordFieldType.CHOICE, resultSchema.getField(0).getDataType().getFieldType());
}

@Test
public void testCreateRecord() throws SQLException {
// given
final RecordSchema recordSchema = givenRecordSchema();

LocalDate testDate = LocalDate.of(2021, 1, 26);

final String varcharValue = "varchar";
final Long bigintValue = 1234567890123456789L;
final Long rowidValue = 11111111L;
final Boolean bitValue = Boolean.FALSE;
final Boolean booleanValue = Boolean.TRUE;
final Character charValue = 'c';
final Date dateValue = Date.valueOf(testDate);
final Integer integerValue = 1234567890;
final Double doubleValue = 0.12;
final Double realValue = 3.45;
final Float floatValue = 6.78F;
final Short smallintValue = 12345;
final Byte tinyintValue = 123;
final BigDecimal bigDecimal1Value = new BigDecimal("1234.567");
final BigDecimal bigDecimal2Value = new BigDecimal("1234");
final BigDecimal bigDecimal3Value = new BigDecimal("1234567890.1");
final BigDecimal bigDecimal4Value = new BigDecimal("1234567.089");

when(resultSet.getObject(COLUMN_NAME_VARCHAR)).thenReturn(varcharValue);
when(resultSet.getObject(COLUMN_NAME_BIGINT)).thenReturn(bigintValue);
when(resultSet.getObject(COLUMN_NAME_ROWID)).thenReturn(rowidValue);
when(resultSet.getObject(COLUMN_NAME_BIT)).thenReturn(bitValue);
when(resultSet.getObject(COLUMN_NAME_BOOLEAN)).thenReturn(booleanValue);
when(resultSet.getObject(COLUMN_NAME_CHAR)).thenReturn(charValue);
when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(dateValue);
when(resultSet.getObject(COLUMN_NAME_INTEGER)).thenReturn(integerValue);
when(resultSet.getObject(COLUMN_NAME_DOUBLE)).thenReturn(doubleValue);
when(resultSet.getObject(COLUMN_NAME_REAL)).thenReturn(realValue);
when(resultSet.getObject(COLUMN_NAME_FLOAT)).thenReturn(floatValue);
when(resultSet.getObject(COLUMN_NAME_SMALLINT)).thenReturn(smallintValue);
when(resultSet.getObject(COLUMN_NAME_TINYINT)).thenReturn(tinyintValue);
when(resultSet.getObject(COLUMN_NAME_BIG_DECIMAL_1)).thenReturn(bigDecimal1Value);
when(resultSet.getObject(COLUMN_NAME_BIG_DECIMAL_2)).thenReturn(bigDecimal2Value);
when(resultSet.getObject(COLUMN_NAME_BIG_DECIMAL_3)).thenReturn(bigDecimal3Value);
when(resultSet.getObject(COLUMN_NAME_BIG_DECIMAL_4)).thenReturn(bigDecimal4Value);

// when
ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, recordSchema);
Record record = testSubject.createRecord(resultSet);

// then
assertEquals(varcharValue, record.getAsString(COLUMN_NAME_VARCHAR));
assertEquals(bigintValue, record.getAsLong(COLUMN_NAME_BIGINT));
assertEquals(rowidValue, record.getAsLong(COLUMN_NAME_ROWID));
assertEquals(bitValue, record.getAsBoolean(COLUMN_NAME_BIT));
assertEquals(booleanValue, record.getAsBoolean(COLUMN_NAME_BOOLEAN));
assertEquals(charValue, record.getValue(COLUMN_NAME_CHAR));

// Date is expected in UTC normalized form
Date expectedDate = new Date(testDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
assertEquals(expectedDate, record.getAsDate(COLUMN_NAME_DATE, null));

assertEquals(integerValue, record.getAsInt(COLUMN_NAME_INTEGER));
assertEquals(doubleValue, record.getAsDouble(COLUMN_NAME_DOUBLE));
assertEquals(realValue, record.getAsDouble(COLUMN_NAME_REAL));
assertEquals(floatValue, record.getAsFloat(COLUMN_NAME_FLOAT));
assertEquals(smallintValue.shortValue(), record.getAsInt(COLUMN_NAME_SMALLINT).shortValue());
assertEquals(tinyintValue.byteValue(), record.getAsInt(COLUMN_NAME_TINYINT).byteValue());
assertEquals(bigDecimal1Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_1));
assertEquals(bigDecimal2Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_2));
assertEquals(bigDecimal3Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_3));
assertEquals(bigDecimal4Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_4));
}

private ResultSet givenResultSetForOther() throws SQLException {
final ResultSet resultSet = Mockito.mock(ResultSet.class);
final ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);
Mockito.when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
Mockito.when(resultSetMetaData.getColumnCount()).thenReturn(1);
Mockito.when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
Mockito.when(resultSetMetaData.getColumnName(1)).thenReturn("column");
Mockito.when(resultSetMetaData.getColumnType(1)).thenReturn(Types.OTHER);
when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
when(resultSetMetaData.getColumnCount()).thenReturn(1);
when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
when(resultSetMetaData.getColumnName(1)).thenReturn("column");
when(resultSetMetaData.getColumnType(1)).thenReturn(Types.OTHER);
return resultSet;
}

Expand All @@ -162,10 +258,10 @@ private RecordSchema givenRecordSchema() {
}

private void thenAllColumnDataTypesAreCorrect(final RecordSchema resultSchema) {
Assert.assertNotNull(resultSchema);
assertNotNull(resultSchema);

for (final Object[] column : COLUMNS) {
Assert.assertEquals("For column " + column[0] + " the converted type is not matching", column[3], resultSchema.getField((Integer) column[0] - 1).getDataType());
assertEquals("For column " + column[0] + " the converted type is not matching", column[3], resultSchema.getField((Integer) column[0] - 1).getDataType());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -874,4 +879,44 @@ public void testIsFittingNumberType() {
assertTrue(DataTypeUtils.isFittingNumberType(9D, RecordFieldType.DOUBLE));
assertFalse(DataTypeUtils.isFittingNumberType(9, RecordFieldType.DOUBLE));
}

@Test
public void testConvertDateToUTC() {
int year = 2021;
int month = 1;
int dayOfMonth = 25;

Date dateLocalTZ = new Date(ZonedDateTime.of(LocalDateTime.of(year, month, dayOfMonth,0,0,0), ZoneId.systemDefault()).toInstant().toEpochMilli());

Date dateUTC = DataTypeUtils.convertDateToUTC(dateLocalTZ);

ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateUTC.getTime()), ZoneId.of("UTC"));
assertEquals(year, zdt.getYear());
assertEquals(month, zdt.getMonthValue());
assertEquals(dayOfMonth, zdt.getDayOfMonth());
assertEquals(0, zdt.getHour());
assertEquals(0, zdt.getMinute());
assertEquals(0, zdt.getSecond());
assertEquals(0, zdt.getNano());
}

@Test
public void testConvertDateToLocalTZ() {
int year = 2021;
int month = 1;
int dayOfMonth = 25;

Date dateUTC = new Date(ZonedDateTime.of(LocalDateTime.of(year, month, dayOfMonth,0,0,0), ZoneId.of("UTC")).toInstant().toEpochMilli());

Date dateLocalTZ = DataTypeUtils.convertDateToLocalTZ(dateUTC);

ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateLocalTZ.getTime()), ZoneId.systemDefault());
assertEquals(year, zdt.getYear());
assertEquals(month, zdt.getMonthValue());
assertEquals(dayOfMonth, zdt.getDayOfMonth());
assertEquals(0, zdt.getHour());
assertEquals(0, zdt.getMinute());
assertEquals(0, zdt.getSecond());
assertEquals(0, zdt.getNano());
}
}