Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19872][csv] Fix CSV format is unable to parse millisecond for TIME type #13834

Merged
merged 1 commit into from Nov 12, 2020

Conversation

pyscala
Copy link
Contributor

@pyscala pyscala commented Oct 29, 2020

What is the purpose of the change

Support to parse millisecond for TIME type in CSV format

Brief change log

Support to parse millisecond for TIME type in CSV format

Verifying this change

This change added tests and can be verified as follows:
testSerializeDeserializeForTime()

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): ( no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 9a45aa4 (Thu Oct 29 06:44:57 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 29, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@pyscala
Copy link
Contributor Author

pyscala commented Oct 29, 2020

CC @wuchong

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution. I left some comments.

throw new IllegalArgumentException("Csv does not support TIME type " +
"with precision: " + precision + ", it only supports precision 0 or 3.");
}
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

		final int precision = timeType.getPrecision();
		if (precision > 3) {
			throw new IllegalArgumentException("CSV format does not support TIME type " +
				"with precision: " + precision + ", it only supports precision 0 ~ 3.");
		}
		// get number of milliseconds of the day
		return jsonNode -> {
			LocalTime localTime = LocalTime.parse(jsonNode.asText());
			if (precision == 0) {
				return localTime.toSecondOfDay() * 1000L;
			} else {
				return (int) (localTime.toNanoOfDay() / 1000_000L);
			}
		};

Throws exception in compile phase instead during runtime.
Besides, we can also support precision 0~3.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will optimize.

@@ -128,7 +128,7 @@ private static RowFieldConverter createRowFieldConverter(LogicalType fieldType)
case DATE:
return (csvMapper, container, row, pos) -> convertDate(row.getInt(pos), container);
case TIME_WITHOUT_TIME_ZONE:
return (csvMapper, container, row, pos) -> convertTime(row.getInt(pos), container);
return (csvMapper, container, row, pos) -> convertTime(row.getLong(pos), container);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should still use getInt, because we always use int to represent number of milliseconds of the day in Flink SQL internally. See Javadoc of RowData.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will overflow when supporting nanosecond precision in the future

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why we don't support precision > 3 for now.
When the planner supports precision > 3 in the future, we can update csv format to use the higer precision data structure (maybe long or otheres).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it

"12:12:12.232",
"12:12:12.232",
(deserSchema) -> deserSchema.setNullLiteral("null"),
(serSchema) -> serSchema.setNullLiteral("null"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not reuse the testNullableField method?

Copy link
Contributor Author

@pyscala pyscala Oct 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, the input is "12:12:12.232", but the precision is 2, then the result should be "12:12:12.23". The existing test named testNullableField must be modified to meet the needs of this test. But the change affects other test cases. So i add a new test methed named testFieldForTime.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For such case, I think you can use the test method only validate the deserialization.

testField(
			DataType fieldType,
			String csvValue,
			Object value,
			Consumer<CsvRowDataDeserializationSchema.Builder> deserializationConfig,
			String fieldDelimiter)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my carelessness, I will optimize it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wuchong
the following code
Row actualRow = (Row) DataFormatConverters.getConverterForDataType(dataType).toExternal(deserializedRow);
in methed
testField( DataType fieldType, String csvValue, Object value, Consumer<CsvRowDataDeserializationSchema.Builder> deserializationConfig, String fieldDelimiter)
will change TIME(n) (0<=n<=3) to TIME(0) .Then the test will fail.
I still use testFieldForTime().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works in my environment:

		testField(
			TIME(3),
			"12:12:12.232421",
			LocalTime.parse("12:12:12.232"),
			(deserSchema) -> deserSchema.setNullLiteral("null"),
			",");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix it

(serSchema) -> serSchema.setNullLiteral("null"),
",");
testFieldForTime(
TIME(0),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more tests for TIME(1), TIME(2).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Comment on lines 153 to 157
",");
} catch (Exception e) {
actualMessage = e.getCause().getMessage();
}
assertEquals(expectedMessage, actualMessage);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
",");
} catch (Exception e) {
actualMessage = e.getCause().getMessage();
}
assertEquals(expectedMessage, actualMessage);
",");
fail("Exception should be thrown.");
} catch (Exception e) {
assertEquals(expectedMessage, e.getCause().getMessage());
}

@pyscala pyscala requested a review from wuchong November 2, 2020 02:38
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pyscala , I think the pull request is in a good shape now.

I only left some comments to improve the tests.

@@ -320,6 +339,34 @@ private void testField(
assertEquals(expectedCsv, new String(serializedRow));
}

private void testFieldForTime(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will clean up

testField(TIME(0), "12:12:12", LocalTime.parse("12:12:12") , deserConfig , ";");
testField(TIME(0), "12:12:12.45", LocalTime.parse("12:12:12") , deserConfig , ";");
int precision = 5;
String expectedMessage = String.format("Csv does not support TIME type with precision: %d, it only supports precision 0 ~ 3.", precision);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not hard code the exception message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix it

LocalTime localTime = LocalTime.parse(jsonNode.asText());
int mills = (int) (localTime.toNanoOfDay() / 1000_000L);
if (precision == 2) {
mills = mills / 10 * 10;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment "this is for rounding off values out of precision"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

int precision = 5;
String expectedMessage = String.format("Csv does not support TIME type with precision: %d, it only supports precision 0 ~ 3.", precision);
try {
testField(TIME(0), "12:12:12.45", LocalTime.parse("12:12:12") , deserConfig , ";");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not fail. Because it is not TIME(5).

You should add fail(); if you want to verify it should fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix it

Comment on lines 154 to 164
testField(TIME(3), "12:12:12.232", LocalTime.parse("12:12:12.232") , deserConfig , ";");
testField(TIME(3), "12:12:12.232342", LocalTime.parse("12:12:12.232") , deserConfig , ";");
testField(TIME(3), "12:12:12.23", LocalTime.parse("12:12:12.23") , deserConfig , ";");
testField(TIME(2), "12:12:12.23", LocalTime.parse("12:12:12.23") , deserConfig , ";");
testField(TIME(2), "12:12:12.232312", LocalTime.parse("12:12:12.23") , deserConfig , ";");
testField(TIME(2), "12:12:12.2", LocalTime.parse("12:12:12.2") , deserConfig , ";");
testField(TIME(1), "12:12:12.2", LocalTime.parse("12:12:12.2") , deserConfig , ";");
testField(TIME(1), "12:12:12.2235", LocalTime.parse("12:12:12.2") , deserConfig , ";");
testField(TIME(1), "12:12:12", LocalTime.parse("12:12:12") , deserConfig , ";");
testField(TIME(0), "12:12:12", LocalTime.parse("12:12:12") , deserConfig , ";");
testField(TIME(0), "12:12:12.45", LocalTime.parse("12:12:12") , deserConfig , ";");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these only test the deserialization. We should also add tests for serialization. I think we can use testNullableField for this purpose.

Besides, we can rename method testField(fieldType, csvValue, value, deserializationConfig, fieldDelimiter) to testFieldSerialization to be more specific.

Copy link
Contributor Author

@pyscala pyscala Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename method testField(fieldType, csvValue, value, deserializationConfig, fieldDelimiter) to testFieldSerialization ? or testFieldDeserialization ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for not using testNullableField can be seen in our previous dialogue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to testFieldDeserialization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only use testNullableField to test conversion without precision loss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @pyscala . LGTM.

@wuchong wuchong changed the title [FLINK-19872][Formats/Csv] Support to parse millisecond for TIME type in CSV format [FLINK-19872][csv] Fix CSV format is unable to parse millisecond for TIME type Nov 12, 2020
@wuchong
Copy link
Member

wuchong commented Nov 12, 2020

Merging...

@wuchong wuchong merged commit e86b409 into apache:master Nov 12, 2020
rkhachatryan pushed a commit to rkhachatryan/flink that referenced this pull request Nov 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants