Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3787: Preserve the message timestamp in mirror maker #1466

Closed
wants to merge 5 commits into from
Closed

KAFKA-3787: Preserve the message timestamp in mirror maker #1466

wants to merge 5 commits into from

Conversation

xiaotao183
Copy link
Contributor

The timestamp of messages consumed by mirror maker is not preserved after sending to target cluster. The correct behavior is to keep create timestamp the same in both source and target clusters.

@ijuma
Copy link
Contributor

ijuma commented Jun 3, 2016

@becketqin, any reason not to do this? cc @junrao

Assuming that this is the right thing to do, we should include a test.

@junrao
Copy link
Contributor

junrao commented Jun 3, 2016

Yes, that makes sense. We should include this in the 0.10.0 branch.

@ijuma
Copy link
Contributor

ijuma commented Jun 3, 2016

@xiaotao183, are you willing to add a test to cover the fix?

@xiaotao183
Copy link
Contributor Author

@ijuma Sure, will do

@xiaotao183
Copy link
Contributor Author

@ijuma test case added

@@ -673,9 +673,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
}

private object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can leave it as private[tools] instead of public.

@xiaotao183
Copy link
Contributor Author

@ijuma please review again

assertEquals("topic", producerRecord.topic())
assertNull(producerRecord.partition())
assertEquals("key", new String(producerRecord.key()))
assertEquals("value", new String(producerRecord.value()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nitpick: can you please remove the () from producerRecord.timestamp, producerRecord.topic, etc.? There are no side-effects, so there is no need for ().

@xiaotao183
Copy link
Contributor Author

@ijuma done

@ijuma
Copy link
Contributor

ijuma commented Jun 5, 2016

@xiaotao183 Thanks! LGTM, merging to 0.10.0 and trunk.

@ijuma
Copy link
Contributor

ijuma commented Jun 5, 2016

Sorry, I just realised that the test is in core/src/test/scala/kafka/tools/MirrorMakerTest.scala, but it should be under core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala. The package should still be kafka.tools. Can you please fix that?

@xiaotao183
Copy link
Contributor Author

@ijuma done

asfgit pushed a commit that referenced this pull request Jun 5, 2016
The timestamp of messages consumed by mirror maker is not preserved after sending to target cluster. The correct behavior is to keep create timestamp the same in both source and target clusters.

Author: Tao Xiao <xiaotao183@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1466 from xiaotao183/KAFKA-3787

(cherry picked from commit f4a263b)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>
@asfgit asfgit closed this in f4a263b Jun 5, 2016
@ijuma
Copy link
Contributor

ijuma commented Jun 5, 2016

Thanks, tests passed locally. Merged to trunk and 0.10.0 branches.

granthenke pushed a commit to granthenke/kafka that referenced this pull request Oct 24, 2016
The timestamp of messages consumed by mirror maker is not preserved after sending to target cluster. The correct behavior is to keep create timestamp the same in both source and target clusters.

Author: Tao Xiao <xiaotao183@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes apache#1466 from xiaotao183/KAFKA-3787

(cherry picked from commit f4a263b)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants