Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14795: Provide message formatter for RemoteLogMetadata #13362

Merged
merged 2 commits into from Mar 21, 2023

Conversation

ivanyu
Copy link
Contributor

@ivanyu ivanyu commented Mar 8, 2023

This commit introduces a formatter for RemoteLogMetadata.

Example usage:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __remote_log_metadata --from-beginning --formatter 'org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde$RemoteLogMetadataFormatter'

RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=M1z1YtfhQ5i7oqLNve_0UQ:topic1-0, id=iWtc1Z6xQu2_DJXTklzKxQ}, startOffset=97990, endOffset=98467, brokerId=0, maxTimestampMs=1678292889855, eventTimestampMs=1678292938280, segmentLeaderEpochs={0=97990}, segmentSizeInBytes=511460, state=COPY_SEGMENT_STARTED}

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ivanyu ivanyu marked this pull request as ready for review March 8, 2023 18:16

@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
output.println(remoteLogMetadataSerde.deserialize(consumerRecord.value()).toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here that key for a RemoteLogMetadataRecord is null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@satishd satishd Mar 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to include offset and partition like below, which will be helpful for debugging.

output.printf("partition: %d, offset: %d, value: %s%n",record.partition(),  record.offset(), remoteLogMetadataSerde.deserialize(consumerRecord.value()).toString());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, done

This commit introduces a formatter for `RemoteLogMetadata`.

Example usage:
```bash
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __remote_log_metadata --from-beginning --formatter 'org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde$RemoteLogMetadataFormatter'

RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=M1z1YtfhQ5i7oqLNve_0UQ:topic1-0, id=iWtc1Z6xQu2_DJXTklzKxQ}, startOffset=97990, endOffset=98467, brokerId=0, maxTimestampMs=1678292889855, eventTimestampMs=1678292938280, segmentLeaderEpochs={0=97990}, segmentSizeInBytes=511460, state=COPY_SEGMENT_STARTED}
```
@ivanyu ivanyu force-pushed the kafka-14795-remotelogmetadata-formatter branch from ce4cc73 to b85c79f Compare March 16, 2023 17:40
@divijvaidya
Copy link
Contributor

@satishd perhaps you would like to review this since it's associated with KIP405?

@showuon
Copy link
Contributor

showuon commented Mar 17, 2023

I'll take a look this week.

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ivanyu for the PR, left a minor comment.


@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
output.println(remoteLogMetadataSerde.deserialize(consumerRecord.value()).toString());
Copy link
Member

@satishd satishd Mar 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to include offset and partition like below, which will be helpful for debugging.

output.printf("partition: %d, offset: %d, value: %s%n",record.partition(),  record.offset(), remoteLogMetadataSerde.deserialize(consumerRecord.value()).toString());

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ivanyu for addressing the review comments. LGTM

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, the only comment is should we make the formatter as a separate file? @satishd , thoughts?

@satishd
Copy link
Member

satishd commented Mar 21, 2023

LGTM, the only comment is should we make the formatter as a separate file? @satishd , thoughts?

@showuon I am fine with formatter inside the serde class.

@showuon
Copy link
Contributor

showuon commented Mar 21, 2023

Failed tests are unrelated

    Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
    Build / JDK 11 and Scala 2.13 / kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft
    Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
    Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
    Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
    Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
    Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
    Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT

@showuon showuon merged commit dc1cd00 into apache:trunk Mar 21, 2023
@ivanyu ivanyu deleted the kafka-14795-remotelogmetadata-formatter branch March 21, 2023 06:57
ivanyu added a commit to aiven/kafka that referenced this pull request May 12, 2023
…13362)

* KAFKA-14795: Provide message formatter for RemoteLogMetadata

This commit introduces a formatter for `RemoteLogMetadata`.

Example usage:
```bash
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __remote_log_metadata --from-beginning --formatter 'org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde$RemoteLogMetadataFormatter'

RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=M1z1YtfhQ5i7oqLNve_0UQ:topic1-0, id=iWtc1Z6xQu2_DJXTklzKxQ}, startOffset=97990, endOffset=98467, brokerId=0, maxTimestampMs=1678292889855, eventTimestampMs=1678292938280, segmentLeaderEpochs={0=97990}, segmentSizeInBytes=511460, state=COPY_SEGMENT_STARTED}
```

Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants