Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Ensure timestamp type is provided when up-converting messages #2483

Closed

Conversation

hachikuji
Copy link
Contributor

No description provided.

@hachikuji
Copy link
Contributor Author

cc @ijuma

I ran into a couple issues with up-conversion when working with EoS message format changes. Seems we should be a little safer with the handling of NO_TIMESTAMP_TYPE.

@hachikuji hachikuji changed the title MINOR: Ensure timestamp type is provided when upconverting messages MINOR: Ensure timestamp type is provided when up-converting messages Feb 2, 2017
@asfbot
Copy link

asfbot commented Feb 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1421/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Feb 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1418/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Feb 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1418/
Test PASSed (JDK 7 and Scala 2.10).

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, looks good overall. I left some comments and questions.

I had made a note to check this more carefully when reviewing the Record refactoring PRs so one less thing for me to do. :)

@@ -316,7 +316,7 @@ public void testConvertNonCompressedToMagic1() throws IOException {
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(records);
fileRecords.flush();
Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1);
Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for picking CREATE_TIME instead of LOG_APPEND_TIME? I guess either is wrong since the timestamp field will be -1 in this case, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason. We have to choose one of them when up-converting. This just verifies that it uses what we set.

def testCreateTimeUpConversion() {
val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
val validatedResults =
LogValidator.validateMessagesAndAssignOffsets(records,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: shouldn't this be in the previous line?

@@ -186,7 +186,7 @@ private[kafka] object LogValidator {
if (record.magic != messageFormatVersion)
inPlaceAssignment = false

validatedRecords += record.convert(messageFormatVersion)
validatedRecords += record.convert(messageFormatVersion, messageTimestampType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is messageTimestampType never NO_TIMESTAMP if messageFormatVersion > 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. It seems it may actually be possible for the user to use "NO_TIMESTAMP_TYPE" for "message.timestamp.type." If so, it's probably worth checking for that and raising an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked into this. Our config definition constrains message.timestamp.type to be one of "CreateTime" or "LogAppendTime," so I think we are safe.

throw new IllegalArgumentException("Cannot up-convert using timestamp type " + upconvertTimestampType);
timestampType = upconvertTimestampType;
} else {
timestampType = TimestampType.forAttributes(attributes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we don't check wrapperRecordTimestampType any more (which could have been done by simply calling timestampType() btw)?

Also, can you search for timstamp in this file and fix an existing typo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you are right. I'll fix. Hmm... wonder why no tests failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I guess it's because we would never have a wrapper timestamp type if we're converting from magic 0 to magic 1, and of course it wouldn't matter when converting in the other direction. But we will need to consider it when up-converting to magic 2.

@hachikuji hachikuji force-pushed the minor-upconvert-timestamp-safety branch from 1e0d210 to b42804b Compare February 2, 2017 21:46
Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, LGTM. Will merge to trunk.

@asfbot
Copy link

asfbot commented Feb 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1446/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Feb 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1443/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit asfgit closed this in ea70a9b Feb 2, 2017
@asfbot
Copy link

asfbot commented Feb 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1443/
Test FAILed (JDK 7 and Scala 2.10).

soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes apache#2483 from hachikuji/minor-upconvert-timestamp-safety
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants