-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-36: Support set message size in broker.conf #4247
Conversation
--- *Motivation* Currently Pulsar only support 5MB size of messages.But there are many cases will use more than 5MB message to transfer. https://github.com/apache/pulsar/wiki/PIP-36%3A-Max-Message-Size *Modifications* - Add message size in protocol - Automaticlly adjust client message size by server
I will add test later. @sijie PTAL. |
@@ -139,6 +140,15 @@ private static boolean argsContains(String[] args, String arg) { | |||
brokerConfig = loadConfig(starterArguments.brokerConfigFile); | |||
} | |||
|
|||
int maxFrameSize = brokerConfig.getMaxMessageSize() + (10 * 1024); | |||
if (maxFrameSize < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The annotations in ServiceConfiguration already have a way to enforce values limits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, will replace it
if (maxFrameSize < 0) { | ||
throw new IllegalArgumentException("Max message size need smaller than 5233640 bytes"); | ||
} | ||
if (maxFrameSize > DirectMemoryUtils.maxDirectMemory()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is not going to be very useful. Even if we have 1 single message of a size comparable to maxDirectMem, we'd be already in trouble
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will fix it as >=
@FieldContext(category = CATEGORY_SERVER, doc = "The size of messages.") | ||
private int maxMessageSize = 5 * 1024 * 1024; | ||
|
||
private int maxFrameSize = 5242880; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having 2 settings, which are ultimately strongly related, could be confusing to users. I think the max message size with some padding for headers would be easier in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@@ -213,6 +213,7 @@ message CommandConnect { | |||
message CommandConnected { | |||
required string server_version = 1; | |||
optional int32 protocol_version = 2 [default = 0]; | |||
optional int32 max_message_size = 3 [default = 5242880]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this kind of numbers, I'd leave the default empty and let the client/broker assume the 5MB default.
In particular, if the default is in Java code, we can put in a constant shared between client and broker and keep it in a single place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you are right. will fix it
this.maxMessageSize = maxMessageSize; | ||
} | ||
|
||
private int maxMessageSize = 5242880; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be defined in a static constant, shared between client and broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
@@ -99,7 +99,8 @@ protected void initChannel(SocketChannel ch) throws Exception { | |||
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc())); | |||
} | |||
ch.pipeline().addLast("frameDecoder", | |||
new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4)); | |||
new LengthFieldBasedFrameDecoder(config.getMaxMessagesSize() + 10 * 1024, 0, 4, 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The amount of padding should also be defined in a constant (shared between all components)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
such as pulsar-common
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, somewhere like Commands.MESSAGE_SIZE_FRAME_PADDING
@@ -218,7 +218,8 @@ private void completeConnect() { | |||
// partitions metadata lookups | |||
state = State.ProxyLookupRequests; | |||
lookupProxyHandler = new LookupProxyHandler(service, this); | |||
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise)); | |||
ctx.writeAndFlush( | |||
Commands.newConnected(protocolVersionToAdvertise, service.getConfiguration().getMaxMessagesSize())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proxy should also take into consideration what's the min between proxy and broker.
Eg: if proxy has 5MB and broker has 1MB, proxy should downgrade itself to 1MB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, right, proxy is a client of broker so it need receive the max message size from broker and adjust itself. Will fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the proxy should not even need the config option. Just take the broker value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, already fixed in the last commit
@@ -85,7 +85,8 @@ protected void initChannel(SocketChannel ch) throws Exception { | |||
} | |||
} | |||
|
|||
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4)); | |||
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( | |||
proxyService.getConfiguration().getMaxMessagesSize() + 10 * 1024, 0, 4, 0, 4)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
padding from constant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -62,6 +62,7 @@ | |||
private int connectionTimeoutMs = 10000; | |||
private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100); | |||
private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30); | |||
private int defaultMaxFrameSize = 5242880; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default should come from constant (and not be modifiable)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this is not needed anymore. The client will not need to configure this anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it need initial value to init defaultFrameDecoder
at first time and it will replace defaultFrameDecoder
when receive command Connected
immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I can use the value in Commands
@@ -312,15 +312,15 @@ public void sendAsync(Message<T> message, SendCallback callback) { | |||
|
|||
// validate msg-size (For batching this will be check at the batch completion size) | |||
int compressedSize = compressedPayload.readableBytes(); | |||
|
|||
if (compressedSize > PulsarDecoder.MaxMessageSize) { | |||
int maxMessageSize = ClientCnx.NUMBER_OF_MAX_MESSAGE_SIZE.get(this.cnx()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be exposed in the ClientCnx
directly, like: cnx.getMaxMessageSize()
@@ -275,7 +280,10 @@ protected void handleConnected(CommandConnected connected) { | |||
} | |||
|
|||
checkArgument(state == State.SentConnectFrame || state == State.Connecting); | |||
|
|||
int maxFrameSize = connected.getMaxMessageSize() - 10 * 1024; | |||
NUMBER_OF_MAX_MESSAGE_SIZE.compareAndSet(this, 0, maxFrameSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This compare and set is not needed here. The CommandConnected
is only received once, so we should just do the replace of the frame decoder.
@@ -275,7 +280,10 @@ protected void handleConnected(CommandConnected connected) { | |||
} | |||
|
|||
checkArgument(state == State.SentConnectFrame || state == State.Connecting); | |||
|
|||
int maxFrameSize = connected.getMaxMessageSize() - 10 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Padding from constant (and also it should be +
rather than -
Broker should also set the max entry size on the BK client instance |
@merlimat Thank you for your review. I will fix those problem as you said. :) |
--- *Motivation* When broker specify a `maxMessageSize` bookie should accept this value as `nettyFrameSize` *Modifications* - Use `cnx().getMaxMessageSize` - Discovery service only redirect so use the constant value `5 * 1024 * 1024` as message size - Put `MAX_METADATA_SIZE` as constant value in `InternalConfigurationData`
@merlimat PTAL. |
category = CATEGORY_SERVER, | ||
doc = "Max size of messages.", | ||
maxValue = Integer.MAX_VALUE - InternalConfigurationData.MESSAGE_META_SIZE) | ||
private int maxMessageSize = 5 * 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use constant for default max size. eg: Commands.DEFAULT_MAX_MESSAGE_SIZE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -275,7 +282,9 @@ protected void handleConnected(CommandConnected connected) { | |||
} | |||
|
|||
checkArgument(state == State.SentConnectFrame || state == State.Connecting); | |||
|
|||
this.maxMessageSize = connected.getMaxMessageSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (connected.hasMaxMessageSize() {
. ..
} else {
maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for reminder. It' s the optional value in protocol, will fix
@@ -62,6 +62,7 @@ | |||
private int connectionTimeoutMs = 10000; | |||
private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100); | |||
private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30); | |||
private int defaultMaxFrameSize = 5242880; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this is not needed anymore. The client will not need to configure this anyway
@@ -23,6 +23,7 @@ | |||
|
|||
public class InternalConfigurationData { | |||
|
|||
public final static int MESSAGE_META_SIZE = 10 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is a POJO used to serialize to JSON.
I'd keep this constant in Commands
where all the binary protocol stuff is.
eg: Commands.MESSAGE_SIZE_FRAME_PADDING
along with Commands.DEFAULT_MAX_MESSAGE_SIZE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will fix
@@ -101,7 +102,7 @@ protected void handleConnect(CommandConnect connect) { | |||
return; | |||
} | |||
} | |||
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion())); | |||
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), ServiceConfig.MAX_MESSAGE_SIZE)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commands.DEFAULT_MAX_MESSAGE_SIZE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think discovery service only for looking up service so it's doesn't need set max message size, right?
@@ -34,6 +33,10 @@ | |||
*/ | |||
public class ServiceConfig implements PulsarConfiguration { | |||
|
|||
// Discovery service doesn't send any messages except Command connected. | |||
// So it's ok use a default value. | |||
public final static int MAX_MESSAGE_SIZE = 5 * 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need to redefine the same constant. Just reuse Commands.DEFAULT_MAX_MESSAGE_SIZE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -111,6 +111,9 @@ | |||
) | |||
private String brokerWebServiceURLTLS; | |||
|
|||
@FieldContext(category = CATEGORY_BROKER_DISCOVERY, doc = "") | |||
private int maxMessagesSize = 5233640; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proxy should not have this config option. Just repeat what broker had.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. will fix
--- *Modifications* - use `Commands` to store default `MAX_MESSAGE_SIZE` and `MESSAGE_SIZE_FRAME_PADDING` - replace `LengthFieldBasedFrameDecoder` when has set message size - replace `PulsarDecoder.MaxMessageSize`
@merlimat I do some changes. Could you please take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just a couple of minor comments
@@ -23,6 +23,7 @@ | |||
|
|||
public class InternalConfigurationData { | |||
|
|||
public final static int MESSAGE_META_SIZE = 10 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
@@ -189,9 +194,16 @@ public static ByteBuf newConnect(String authMethodName, AuthData authData, int p | |||
return res; | |||
} | |||
|
|||
public static ByteBuf newConnected(int clientProtocolVersion) { | |||
public static ByteBuf newConnected(int clientProtocoVersion) { | |||
return newConnected(clientProtocoVersion, INVALID_MAX_MESSAGE_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the INVALID_MAX_MESSAGE_SIZE
? Is this method called from unit tests? I'd rather just leave the method which requires the maxMessageSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it will be used when there no max message size in protocol, like this:
ChannelFuture channelFuture;
if (connected.hasMaxMessageSize()) {
channelFuture = inboundChannel.writeAndFlush(
Commands.newConnected(connected.getProtocolVersion(), connected.getMaxMessageSize()));
} else {
channelFuture = inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion()));
}
ParserProxyHandler.BACKEND_CONN, | ||
connected.getMaxMessageSize())); | ||
} else { | ||
inboundChannel.pipeline().addBefore("handler", "inboundParser", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the block should be indented
--- *Motivation* - Even if the cnx can't use, `maxMessageSize` should be used at compare message size. So it should as a static variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
run java8 tests |
Update the Reference > Pulsar configuration > broker for Pulsar 2.4.0 release apache#4247
This update is for the PR: #4247 Doc update for the master is approved and merged. Update the Reference > Pulsar configuration > broker for the following releases: 2.5.2, 2.5.1, 2.5.0, 2.4.2, 2.4.1, and 2.4.0.
This update is for the PR: apache#4247 Doc update for the master is approved and merged. Update the Reference > Pulsar configuration > broker for the following releases: 2.5.2, 2.5.1, 2.5.0, 2.4.2, 2.4.1, and 2.4.0.
This update is for the PR: apache#4247 Doc update for the master is approved and merged. Update the Reference > Pulsar configuration > broker for the following releases: 2.5.2, 2.5.1, 2.5.0, 2.4.2, 2.4.1, and 2.4.0.
Motivation
Currently Pulsar only support 5MB size of messages.But there are many cases will use more than 5MB message
to transfer.
https://github.com/apache/pulsar/wiki/PIP-36%3A-Max-Message-Size
Modifications