Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix BookkeeperSchemaStorage NPE #9264

Conversation

codelipenghui
Copy link
Contributor

@codelipenghui codelipenghui commented Jan 21, 2021

Motivation

The NullPointerException is thrown when the zookeeper had an OOM issue. After we increase the zookeeper memory and restart the zookeeper cluster, the broker still kept throwing NullPointerException. The exception was fixed after rolling restart all brokers.

07:54:13.142 [pulsar-io-25-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.0.168.5:42978] Subscribing on topic [topic] / [subscription]
07:54:13.143 [Thread-241] WARN  org.apache.pulsar.broker.service.ServerCnx - [/10.0.168.5:42978][topic][subscription] Failed to create consumer: null
java.util.concurrent.CompletionException: java.lang.NullPointerException
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_252]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1005) ~[?:1.8.0_252]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) ~[?:1.8.0_252]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$getSchema$6(BookkeeperSchemaStorage.java:175) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) ~[?:1.8.0_252]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.getSchema(BookkeeperSchemaStorage.java:169) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.get(BookkeeperSchemaStorage.java:126) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.getSchema(SchemaRegistryServiceImpl.java:95) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.getSchema(SchemaRegistryServiceImpl.java:81) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator.getSchema(SchemaRegistryServiceWithSchemaDataValidator.java:52) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.AbstractTopic.hasSchema(AbstractTopic.java:244) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.addSchemaIfIdleOrCheckCompatible(PersistentTopic.java:2144) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:920) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) ~[?:1.8.0_252]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) ~[?:1.8.0_252]
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$16(ServerCnx.java:902) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_252]
	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) ~[?:1.8.0_252]
	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) ~[?:1.8.0_252]
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:852) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:239) ~[org.apache.pulsar-pulsar-common-2.6.2.jar:2.6.2]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:191) ~[io.netty-netty-handler-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:153) ~[io.netty-netty-handler-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: java.lang.NullPointerException
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.openLedger(BookkeeperSchemaStorage.java:565) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.readSchemaEntry(BookkeeperSchemaStorage.java:470) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$null$4(BookkeeperSchemaStorage.java:185) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) ~[?:1.8.0_252]
	... 43 more

The problem is the bookkeeper client used by the BookkeeperSchemaStorage does not create success due to the zookeeper issue. Currently, start the broker will create and start the SchemaStorage, but if the SchemaStorage start failed, the broker only prints a log Unable to create schema registry storage. after this moment, the broker will continue the start process, if the subsequent steps do not throw any exceptions, the broker will start successfully, however, the bookkeeper client of the BookkeeperSchemaStorage will always be null.

Modifications

Make sure the SchemaStorage start success when starting the broker, if SchemaStorage starts failed, the broker also should be start failed.

Verifying this change

New test added.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@codelipenghui codelipenghui added type/bug The PR fixed a bug or issue reported a bug release/2.7.1 labels Jan 21, 2021
@codelipenghui codelipenghui added this to the 2.8.0 milestone Jan 21, 2021
@codelipenghui codelipenghui self-assigned this Jan 21, 2021
@codelipenghui codelipenghui changed the title Fix schema storage NPE Fix BookkeeperSchemaStorage NPE Jan 21, 2021
@jiazhai
Copy link
Member

jiazhai commented Jan 22, 2021

/pulsarbot run-failure-checks

@jiazhai
Copy link
Member

jiazhai commented Jan 22, 2021

/pulsarbot run-failure-checks

2 similar comments
@jiazhai
Copy link
Member

jiazhai commented Jan 22, 2021

/pulsarbot run-failure-checks

@hangc0276
Copy link
Contributor

/pulsarbot run-failure-checks

@zymap
Copy link
Member

zymap commented Jan 23, 2021

/pulsarbot run-failure-checks

1 similar comment
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui codelipenghui merged commit 3923643 into apache:master Feb 5, 2021
@codelipenghui codelipenghui deleted the penghui/fix-schema-storage-start-fail branch February 5, 2021 06:47
@codelipenghui codelipenghui added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Feb 5, 2021
codelipenghui added a commit that referenced this pull request Feb 5, 2021
### Motivation

The NullPointerException is thrown when the zookeeper had an OOM issue. After we increase the zookeeper memory and restart the zookeeper cluster, the broker still kept throwing NullPointerException. The exception was fixed after rolling restart all brokers.

```
07:54:13.142 [pulsar-io-25-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.0.168.5:42978] Subscribing on topic [topic] / [subscription]
07:54:13.143 [Thread-241] WARN  org.apache.pulsar.broker.service.ServerCnx - [/10.0.168.5:42978][topic][subscription] Failed to create consumer: null
java.util.concurrent.CompletionException: java.lang.NullPointerException
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_252]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1005) ~[?:1.8.0_252]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) ~[?:1.8.0_252]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$getSchema$6(BookkeeperSchemaStorage.java:175) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) ~[?:1.8.0_252]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.getSchema(BookkeeperSchemaStorage.java:169) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.get(BookkeeperSchemaStorage.java:126) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.getSchema(SchemaRegistryServiceImpl.java:95) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.getSchema(SchemaRegistryServiceImpl.java:81) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator.getSchema(SchemaRegistryServiceWithSchemaDataValidator.java:52) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.AbstractTopic.hasSchema(AbstractTopic.java:244) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.addSchemaIfIdleOrCheckCompatible(PersistentTopic.java:2144) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:920) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) ~[?:1.8.0_252]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) ~[?:1.8.0_252]
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$16(ServerCnx.java:902) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_252]
	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) ~[?:1.8.0_252]
	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) ~[?:1.8.0_252]
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:852) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:239) ~[org.apache.pulsar-pulsar-common-2.6.2.jar:2.6.2]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:191) ~[io.netty-netty-handler-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:153) ~[io.netty-netty-handler-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: java.lang.NullPointerException
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.openLedger(BookkeeperSchemaStorage.java:565) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.readSchemaEntry(BookkeeperSchemaStorage.java:470) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$null$4(BookkeeperSchemaStorage.java:185) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) ~[?:1.8.0_252]
	... 43 more
```

The problem is the bookkeeper client used by the `BookkeeperSchemaStorage` does not create success due to the zookeeper issue. Currently, start the broker will create and start the SchemaStorage, but if the SchemaStorage start failed, the broker only prints a log `Unable to create schema registry storage`. after this moment, the broker will continue the start process, if the subsequent steps do not throw any exceptions, the broker will start successfully, however, the bookkeeper client of the `BookkeeperSchemaStorage`  will always be null.

### Modifications

Make sure the SchemaStorage start success when starting the broker, if SchemaStorage starts failed, the broker also should be start failed.

(cherry picked from commit 3923643)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.7 Archived: 2.7 is end of life release/2.7.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants