diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java index aef3a38f41d2b..8ff2936c6c688 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java @@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimeType; import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -39,7 +40,6 @@ import java.lang.reflect.Array; import java.math.BigDecimal; import java.sql.Date; -import java.sql.Time; import java.sql.Timestamp; import java.time.LocalTime; @@ -143,7 +143,7 @@ private CsvToRowDataConverter createConverter(LogicalType type) { case DATE: return this::convertToDate; case TIME_WITHOUT_TIME_ZONE: - return this::convertToTime; + return convertToTime((TimeType) type); case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: return this::convertToTimestamp; @@ -221,12 +221,28 @@ private int convertToDate(JsonNode jsonNode) { return (int) Date.valueOf(jsonNode.asText()).toLocalDate().toEpochDay(); } - private int convertToTime(JsonNode jsonNode) { + private CsvToRowDataConverter convertToTime(TimeType timeType) { + final int precision = timeType.getPrecision(); // csv currently is using Time.valueOf() to parse time string - LocalTime localTime = Time.valueOf(jsonNode.asText()).toLocalTime(); // TODO: FLINK-17525 support millisecond and nanosecond // get number of milliseconds of the day - return localTime.toSecondOfDay() * 1000; + if (precision > 3) { + throw new IllegalArgumentException("Csv does not support TIME type " + + "with precision: " + precision + ", it only supports precision 0 ~ 3."); + } + return jsonNode -> { + LocalTime localTime = LocalTime.parse(jsonNode.asText()); + int mills = (int) (localTime.toNanoOfDay() / 1000_000L); + // this is for rounding off values out of precision + if (precision == 2) { + mills = mills / 10 * 10; + } else if (precision == 1) { + mills = mills / 100 * 100; + } else if (precision == 0) { + mills = mills / 1000 * 1000; + } + return mills; + }; } private TimestampData convertToTimestamp(JsonNode jsonNode) { @@ -284,7 +300,7 @@ private static void validateArity(int expected, int actual, boolean ignoreParseE /** * Exception which refers to parse errors in converters. - * */ + */ private static final class CsvParseException extends RuntimeException { private static final long serialVersionUID = 1L; public CsvParseException(String message, Throwable cause) { diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java index 6ab7f74fe6cb3..dfcf3780fc7e5 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java @@ -244,7 +244,7 @@ private static JsonNode convertDate(int days, ContainerNode container) { } private static JsonNode convertTime(int millisecond, ContainerNode container) { - LocalTime time = LocalTime.ofSecondOfDay(millisecond / 1000L); + LocalTime time = LocalTime.ofNanoOfDay(millisecond * 1000_000L); return container.textNode(ISO_LOCAL_TIME.format(time)); } diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java index d25472a770cf7..8051b3c4b8c3a 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java @@ -35,6 +35,7 @@ import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.time.LocalTime; import java.util.function.Consumer; import static org.apache.flink.table.api.DataTypes.ARRAY; @@ -107,6 +108,22 @@ public void testSerializeDeserialize() throws Exception { BYTES(), "awML", new byte[] {107, 3, 11}); + testNullableField( + TIME(3), + "12:12:12.232", + LocalTime.parse("12:12:12.232")); + testNullableField( + TIME(2), + "12:12:12.23", + LocalTime.parse("12:12:12.23")); + testNullableField( + TIME(1), + "12:12:12.2", + LocalTime.parse("12:12:12.2")); + testNullableField( + TIME(0), + "12:12:12", + LocalTime.parse("12:12:12")); } @Test @@ -124,13 +141,13 @@ public void testSerializeDeserializeCustomizedProperties() throws Exception { .setArrayElementDelimiter(":") .setFieldDelimiter(';'); - testField(STRING(), "123*'4**", "123'4*", deserConfig, ";"); - testField(STRING(), "'123''4**'", "123'4*", serConfig, deserConfig, ";"); - testField(STRING(), "'a;b*'c'", "a;b'c", deserConfig, ";"); + testFieldDeserialization(STRING(), "123*'4**", "123'4*", deserConfig, ";"); + testField(STRING(), "'123''4**'", "'123''4**'", serConfig, deserConfig, ";"); + testFieldDeserialization(STRING(), "'a;b*'c'", "a;b'c", deserConfig, ";"); testField(STRING(), "'a;b''c'", "a;b'c", serConfig, deserConfig, ";"); - testField(INT(), " 12 ", 12, deserConfig, ";"); + testFieldDeserialization(INT(), " 12 ", 12, deserConfig, ";"); testField(INT(), "12", 12, serConfig, deserConfig, ";"); - testField( + testFieldDeserialization( ROW(FIELD("f0", STRING()), FIELD("f1", STRING())), "1:hello", Row.of("1", "hello"), deserConfig, @@ -150,6 +167,26 @@ public void testSerializeDeserializeCustomizedProperties() throws Exception { deserConfig, ";"); testField(STRING(), "null", "null", serConfig, deserConfig, ";"); // string because null literal has not been set + testFieldDeserialization(TIME(3), "12:12:12.232", LocalTime.parse("12:12:12.232"), deserConfig, ";"); + testFieldDeserialization(TIME(3), "12:12:12.232342", LocalTime.parse("12:12:12.232"), deserConfig, ";"); + testFieldDeserialization(TIME(3), "12:12:12.23", LocalTime.parse("12:12:12.23"), deserConfig, ";"); + testFieldDeserialization(TIME(2), "12:12:12.23", LocalTime.parse("12:12:12.23"), deserConfig, ";"); + testFieldDeserialization(TIME(2), "12:12:12.232312", LocalTime.parse("12:12:12.23"), deserConfig, ";"); + testFieldDeserialization(TIME(2), "12:12:12.2", LocalTime.parse("12:12:12.2"), deserConfig, ";"); + testFieldDeserialization(TIME(1), "12:12:12.2", LocalTime.parse("12:12:12.2"), deserConfig, ";"); + testFieldDeserialization(TIME(1), "12:12:12.2235", LocalTime.parse("12:12:12.2"), deserConfig, ";"); + testFieldDeserialization(TIME(1), "12:12:12", LocalTime.parse("12:12:12"), deserConfig, ";"); + testFieldDeserialization(TIME(0), "12:12:12", LocalTime.parse("12:12:12"), deserConfig, ";"); + testFieldDeserialization(TIME(0), "12:12:12.45", LocalTime.parse("12:12:12"), deserConfig, ";"); + int precision = 5; + try { + testFieldDeserialization(TIME(5), "12:12:12.45", LocalTime.parse("12:12:12"), deserConfig, ";"); + fail(); + } catch (Exception e) { + assertEquals( + "Csv does not support TIME type with precision: 5, it only supports precision 0 ~ 3.", + e.getMessage()); + } } @Test @@ -306,7 +343,7 @@ private void testField( } @SuppressWarnings("unchecked") - private void testField( + private void testFieldDeserialization( DataType fieldType, String csvValue, Object value,