Skip to content

Commit

Permalink
[FLINK-19872][csv] Fix CSV format is unable to parse millisecond for …
Browse files Browse the repository at this point in the history
…TIME type

This closes #13834
  • Loading branch information
pyscala authored and wuchong committed Nov 12, 2020
1 parent bf16a7d commit 6dd7a16
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 13 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -39,7 +40,6 @@
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalTime;

Expand Down Expand Up @@ -143,7 +143,7 @@ private CsvToRowDataConverter createConverter(LogicalType type) {
case DATE:
return this::convertToDate;
case TIME_WITHOUT_TIME_ZONE:
return this::convertToTime;
return convertToTime((TimeType) type);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return this::convertToTimestamp;
Expand Down Expand Up @@ -221,12 +221,28 @@ private int convertToDate(JsonNode jsonNode) {
return (int) Date.valueOf(jsonNode.asText()).toLocalDate().toEpochDay();
}

private int convertToTime(JsonNode jsonNode) {
private CsvToRowDataConverter convertToTime(TimeType timeType) {
final int precision = timeType.getPrecision();
// csv currently is using Time.valueOf() to parse time string
LocalTime localTime = Time.valueOf(jsonNode.asText()).toLocalTime();
// TODO: FLINK-17525 support millisecond and nanosecond
// get number of milliseconds of the day
return localTime.toSecondOfDay() * 1000;
if (precision > 3) {
throw new IllegalArgumentException("Csv does not support TIME type " +
"with precision: " + precision + ", it only supports precision 0 ~ 3.");
}
return jsonNode -> {
LocalTime localTime = LocalTime.parse(jsonNode.asText());
int mills = (int) (localTime.toNanoOfDay() / 1000_000L);
// this is for rounding off values out of precision
if (precision == 2) {
mills = mills / 10 * 10;
} else if (precision == 1) {
mills = mills / 100 * 100;
} else if (precision == 0) {
mills = mills / 1000 * 1000;
}
return mills;
};
}

private TimestampData convertToTimestamp(JsonNode jsonNode) {
Expand Down Expand Up @@ -284,7 +300,7 @@ private static void validateArity(int expected, int actual, boolean ignoreParseE

/**
* Exception which refers to parse errors in converters.
* */
*/
private static final class CsvParseException extends RuntimeException {
private static final long serialVersionUID = 1L;
public CsvParseException(String message, Throwable cause) {
Expand Down
Expand Up @@ -244,7 +244,7 @@ private static JsonNode convertDate(int days, ContainerNode<?> container) {
}

private static JsonNode convertTime(int millisecond, ContainerNode<?> container) {
LocalTime time = LocalTime.ofSecondOfDay(millisecond / 1000L);
LocalTime time = LocalTime.ofNanoOfDay(millisecond * 1000_000L);
return container.textNode(ISO_LOCAL_TIME.format(time));
}

Expand Down
Expand Up @@ -35,6 +35,7 @@
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalTime;
import java.util.function.Consumer;

import static org.apache.flink.table.api.DataTypes.ARRAY;
Expand Down Expand Up @@ -107,6 +108,22 @@ public void testSerializeDeserialize() throws Exception {
BYTES(),
"awML",
new byte[] {107, 3, 11});
testNullableField(
TIME(3),
"12:12:12.232",
LocalTime.parse("12:12:12.232"));
testNullableField(
TIME(2),
"12:12:12.23",
LocalTime.parse("12:12:12.23"));
testNullableField(
TIME(1),
"12:12:12.2",
LocalTime.parse("12:12:12.2"));
testNullableField(
TIME(0),
"12:12:12",
LocalTime.parse("12:12:12"));
}

@Test
Expand All @@ -124,13 +141,13 @@ public void testSerializeDeserializeCustomizedProperties() throws Exception {
.setArrayElementDelimiter(":")
.setFieldDelimiter(';');

testField(STRING(), "123*'4**", "123'4*", deserConfig, ";");
testField(STRING(), "'123''4**'", "123'4*", serConfig, deserConfig, ";");
testField(STRING(), "'a;b*'c'", "a;b'c", deserConfig, ";");
testFieldDeserialization(STRING(), "123*'4**", "123'4*", deserConfig, ";");
testField(STRING(), "'123''4**'", "'123''4**'", serConfig, deserConfig, ";");
testFieldDeserialization(STRING(), "'a;b*'c'", "a;b'c", deserConfig, ";");
testField(STRING(), "'a;b''c'", "a;b'c", serConfig, deserConfig, ";");
testField(INT(), " 12 ", 12, deserConfig, ";");
testFieldDeserialization(INT(), " 12 ", 12, deserConfig, ";");
testField(INT(), "12", 12, serConfig, deserConfig, ";");
testField(
testFieldDeserialization(
ROW(FIELD("f0", STRING()), FIELD("f1", STRING())),
"1:hello", Row.of("1", "hello"),
deserConfig,
Expand All @@ -150,6 +167,26 @@ public void testSerializeDeserializeCustomizedProperties() throws Exception {
deserConfig,
";");
testField(STRING(), "null", "null", serConfig, deserConfig, ";"); // string because null literal has not been set
testFieldDeserialization(TIME(3), "12:12:12.232", LocalTime.parse("12:12:12.232"), deserConfig, ";");
testFieldDeserialization(TIME(3), "12:12:12.232342", LocalTime.parse("12:12:12.232"), deserConfig, ";");
testFieldDeserialization(TIME(3), "12:12:12.23", LocalTime.parse("12:12:12.23"), deserConfig, ";");
testFieldDeserialization(TIME(2), "12:12:12.23", LocalTime.parse("12:12:12.23"), deserConfig, ";");
testFieldDeserialization(TIME(2), "12:12:12.232312", LocalTime.parse("12:12:12.23"), deserConfig, ";");
testFieldDeserialization(TIME(2), "12:12:12.2", LocalTime.parse("12:12:12.2"), deserConfig, ";");
testFieldDeserialization(TIME(1), "12:12:12.2", LocalTime.parse("12:12:12.2"), deserConfig, ";");
testFieldDeserialization(TIME(1), "12:12:12.2235", LocalTime.parse("12:12:12.2"), deserConfig, ";");
testFieldDeserialization(TIME(1), "12:12:12", LocalTime.parse("12:12:12"), deserConfig, ";");
testFieldDeserialization(TIME(0), "12:12:12", LocalTime.parse("12:12:12"), deserConfig, ";");
testFieldDeserialization(TIME(0), "12:12:12.45", LocalTime.parse("12:12:12"), deserConfig, ";");
int precision = 5;
try {
testFieldDeserialization(TIME(5), "12:12:12.45", LocalTime.parse("12:12:12"), deserConfig, ";");
fail();
} catch (Exception e) {
assertEquals(
"Csv does not support TIME type with precision: 5, it only supports precision 0 ~ 3.",
e.getMessage());
}
}

@Test
Expand Down Expand Up @@ -306,7 +343,7 @@ private void testField(
}

@SuppressWarnings("unchecked")
private void testField(
private void testFieldDeserialization(
DataType fieldType,
String csvValue,
Object value,
Expand Down

0 comments on commit 6dd7a16

Please sign in to comment.