Skip to content

[pulsar-common] MaxMessageSize to user configuration#4063

Closed
ambition119 wants to merge 1 commit intoapache:masterfrom
ambition119:issue-4054
Closed

[pulsar-common] MaxMessageSize to user configuration#4063
ambition119 wants to merge 1 commit intoapache:masterfrom
ambition119:issue-4054

Conversation

@ambition119
Copy link
Contributor

Motivation

Fixes #4054

Modifications

PulsarDecoder use common.conf set MaxMessageSize value

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)
  • The rest endpoints: (yes / no)
  • The admin cli options: (yes / no)
  • Anything that affects deployment: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaxMessageSize should be agreed between client, broker and bookie. So a better approach would be:

  1. adding a setting MaxMessageSize setting to broker configuration.
  2. introduce a mechanism between client and broker to fetch some broker configurations when the client establishes a connection to broker. in this case, broker can return the MaxMessageSize it supports. so the client knows exactly the MaxMessageSize setting which it can use.

in any case, since we are changing a default value that is impacting end-users directly, a PIP is required.

public static int MaxMessageSize;
public static int MaxFrameSize;

private static String commonConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/common.conf";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can a client load this setting?

I am not sure it is a good approach to load such settings from a config file. A system property or environment variable (e.g. org.apache.pulsar.MaxMessageSize = xyz) is better.

}

@Test
public void testMaxSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to have tests for when there is a mismatch between client and broker settings. Right now, a client won't attempt to send out a message bigger than 5Mb.

This should be negotiated in the connect/connected phase.

Also, what would happen if a consumer has a smaller limit and the broker is trying to push a bigger message? Current behavior would be to force close connection and immediately reconnect, in a loop. That won't be appropriate in case of a conf mismatch.

@ambition119
Copy link
Contributor Author

MaxMessageSize should be agreed between client, broker and bookie. So a better approach would be:

  1. adding a setting MaxMessageSize setting to broker configuration.
  2. introduce a mechanism between client and broker to fetch some broker configurations when the client establishes a connection to broker. in this case, broker can return the MaxMessageSize it supports. so the client knows exactly the MaxMessageSize setting which it can use.

in any case, since we are changing a default value that is impacting end-users directly, a PIP is required.

I simply think client and broker use PulsarDecoder.MaxFrameSize or PulsarDecoder.MaxMessageSize config,like:

[client]
if (payloadSize > PulsarDecoder.MaxMessageSize) 
if (compressedSize > PulsarDecoder.MaxMessageSize) 
[broker]
new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4))

I think bookie can through bookkeeper.conf set nettyMaxFrameSizeBytes can resolve,but run java example found it did not work, occured exception :

[bookie-io-1-6] ERROR org.apache.bookkeeper.proto.BookieRequestHandler - Unhandled exception occurred in I/O thread or handler
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 5242880: 32637007 - discarded
	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:522) ~[netty-codec-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:500) ~[netty-codec-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.exceededFrameLength(LengthFieldBasedFrameDecoder.java:387) ~[netty-codec-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:430) ~[netty-codec-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343) ~[netty-codec-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502) ~[netty-codec-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441) ~[netty-codec-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278) ~[netty-codec-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:648) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]

so nettyMaxFrameSizeBytes config to use Is the focus of the problem, tks.

@hellozepp
Copy link
Contributor

@ambition119 HTAL
I tried to modify the configuration of bk_server to nettyMaxFrameSizeBytes=52428800(50M). Sending 5M data addEntry can be successful, but the following exception occurred when readEntries:

2019-04-29 15:10:37,498 - ERROR - [bookie-io-1-2:BookieRequestHandler@70] - Unhandled exception occurred in I/O thread or handler
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
	at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:648)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

test source code:

  bkc = new BookKeeper(conf);
 ledger = bkc.createLedger(3, 2, DigestType.CRC32, "foobar".getBytes());
...
long ledgerId = ledger.getId();
        log.info("Writing to ledger: {}", ledgerId);
        for (int i = 0; i < 10; i++) {
            String content = "entry-" + i;
//            ledger.addEntry(new byte[5 * 1024 * 1024 + 1]);
            ledger.addEntry(new byte[5 * 1024 * 1024 ]);
        }
        ledger.close();
        LedgerHandle lh2 = bkc.openLedger(ledger.getId(), DigestType.CRC32, "foobar".getBytes());
        Enumeration<LedgerEntry> entries = lh2.readEntries(0, 9);

Then try to modify to less than 5M to get the information.

@ambition119
Copy link
Contributor Author

@ambition119 HTAL
I tried to modify the configuration of bk_server to nettyMaxFrameSizeBytes=52428800(50M). Sending 5M data addEntry can be successful, but the following exception occurred when readEntries:

2019-04-29 15:10:37,498 - ERROR - [bookie-io-1-2:BookieRequestHandler@70] - Unhandled exception occurred in I/O thread or handler
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
	at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:648)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

test source code:

  bkc = new BookKeeper(conf);
 ledger = bkc.createLedger(3, 2, DigestType.CRC32, "foobar".getBytes());
...
long ledgerId = ledger.getId();
        log.info("Writing to ledger: {}", ledgerId);
        for (int i = 0; i < 10; i++) {
            String content = "entry-" + i;
//            ledger.addEntry(new byte[5 * 1024 * 1024 + 1]);
            ledger.addEntry(new byte[5 * 1024 * 1024 ]);
        }
        ledger.close();
        LedgerHandle lh2 = bkc.openLedger(ledger.getId(), DigestType.CRC32, "foobar".getBytes());
        Enumeration<LedgerEntry> entries = lh2.readEntries(0, 9);

Then try to modify to less than 5M to get the information.

pulsar config's bookkeeper.conf set nettyMaxFrameSizeBytes value is it effective?

@hellozepp
Copy link
Contributor

@ambition119 I modified the bookie server-side configuration file conf/bk_server.conf, and then restart the bookkeeper service

@hellozepp
Copy link
Contributor

It seems more appropriate to modify the environment variables,I'm interested in trying to solve this issue, tks @sijie

@ambition119
Copy link
Contributor Author

It seems more appropriate to modify the environment variables,I'm interested in trying to solve this issue, tks @sijie

Thank you for your interest in this issue, I closed this PR.

@ambition119 ambition119 closed this May 6, 2019
@sijie
Copy link
Member

sijie commented May 6, 2019

@hellozepp I think @zymap is also interested in working at this issue. It might be worth checking with him :)

@hellozepp
Copy link
Contributor

@sijie got it,I'll get in touch with him

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Exposing MaxMessageSize to user configuration

4 participants