Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix peek message metadata broker while enable broker entry metadata. #9255

Merged

Conversation

codelipenghui
Copy link
Contributor

Motivation

Fix peek message metadata broker while enable broker entry metadata.

When enabled the broker entry metadata, following error occurs:

22:09:57.802 [broker-topic-workers-OrderedScheduler-4-0:org.apache.pulsar.common.protocol.Commands@1658] ERROR org.apache.pulsar.common.protocol.Commands - [PersistentSubscription{topic=persistent://public/default/__consumer_offsets-partition-0, name=reader-31a9742e6c}] [-1] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:425) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:415) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1653) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:82) ~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:232) ~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$readEntriesComplete$1(PersistentDispatcherSingleActiveConsumer.java:178) ~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [managed-ledger-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.12.1.jar:4.12.1]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_261]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_261]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_261]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_261]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_261]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_261]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_261]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.51.Final.jar:4.1.51.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]

The root cause is peeking message metadata does not skip the broker entry metadata.

Modifications

Skip the broker entry metadata if exists when peek message metadata.

Verifying this change

Tests added.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@codelipenghui codelipenghui self-assigned this Jan 21, 2021
@codelipenghui codelipenghui added the type/bug The PR fixed a bug or issue reported a bug label Jan 21, 2021
@codelipenghui codelipenghui added this to the 2.8.0 milestone Jan 21, 2021
Comment on lines 1674 to 1676
return metadata.getPartitionKey().getBytes();
}
return key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change getBytes() to getBytes(StandardCharsets.UTF_8) to fix spotbugs check. It looks like that pulsar-broker module doesn't enable spotbugs check yet so the same code passed the check in pulsar-broker but failed in pulsar-common.

@BeforeClass
protected void setup() throws Exception {
conf.setBrokerEntryMetadataInterceptors(Sets.newTreeSet(
"org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor to test multiple BrokerEntryMetadataInterceptors?

skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.readerIndex(readerIdx);
byte[] key = NONE_KEY.getBytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also use getBytes(StandardCharsets.UTF_8) here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no key, instead of calling getBytes() each time, we should just keep a static ref to the byte[]

Copy link
Member

@jiazhai jiazhai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 overall lgtm.

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

2 similar comments
@hangc0276
Copy link
Contributor

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants