diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 6c2a987d29e..2879b1fc59a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; @@ -148,12 +149,15 @@ private RegisterBrokerResult registerBroker( requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); + requestHeader.setCompressed(true); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); - + if (request.getVersion() <= 0) { + request.setVersion(MQVersion.CURRENT_VERSION); + } RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); - request.setBody(requestBody.encode()); + request.setBody(requestBody.encode(true)); if (oneway) { try { diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java index c220927c23b..b39dad170af 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java @@ -17,11 +17,28 @@ package org.apache.rocketmq.common.protocol.body; +import com.alibaba.fastjson.JSON; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.InflaterInputStream; +import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RegisterBrokerBody extends RemotingSerializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(RegisterBrokerBody.class); + private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); private List filterServerList = new ArrayList(); @@ -40,4 +57,122 @@ public List getFilterServerList() { public void setFilterServerList(List filterServerList) { this.filterServerList = filterServerList; } + + public byte[] encode(boolean compress) { + + if (!compress) { + return encode(); + } + long start = System.currentTimeMillis(); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION)); + DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion(); + ConcurrentMap topicConfigTable = topicConfigSerializeWrapper.getTopicConfigTable(); + assert topicConfigTable != null; + try { + byte[] buffer = dataVersion.encode(); + + // write data version + outputStream.write(convertIntToByteArray(buffer.length)); + outputStream.write(buffer); + + int topicNumber = topicConfigTable.size(); + + // write number of topic configs + outputStream.write(convertIntToByteArray(topicNumber)); + + // write topic config entry one by one. + for (ConcurrentMap.Entry next : topicConfigTable.entrySet()) { + buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET); + outputStream.write(convertIntToByteArray(buffer.length)); + outputStream.write(buffer); + } + + buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET); + + // write filter server list json length + outputStream.write(convertIntToByteArray(buffer.length)); + + // write filter server list json + outputStream.write(buffer); + + outputStream.finish(); + long interval = System.currentTimeMillis() - start; + if (interval > 50) { + LOGGER.info("Compressing takes {}ms", interval); + } + return byteArrayOutputStream.toByteArray(); + } catch (IOException e) { + LOGGER.error("Failed to compress RegisterBrokerBody object", e); + } + + return null; + } + + public static RegisterBrokerBody decode(byte[] data, boolean compressed) throws IOException { + if (!compressed) { + return RegisterBrokerBody.decode(data, RegisterBrokerBody.class); + } + long start = System.currentTimeMillis(); + InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(data)); + int dataVersionLength = readInt(inflaterInputStream); + byte[] dataVersionBytes = readBytes(inflaterInputStream, dataVersionLength); + DataVersion dataVersion = DataVersion.decode(dataVersionBytes, DataVersion.class); + + RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); + registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion); + ConcurrentMap topicConfigTable = registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable(); + + + int topicConfigNumber = readInt(inflaterInputStream); + LOGGER.debug("{} topic configs to extract", topicConfigNumber); + + for (int i = 0; i < topicConfigNumber; i++) { + int topicConfigJsonLength = readInt(inflaterInputStream); + + byte[] buffer = readBytes(inflaterInputStream, topicConfigJsonLength); + TopicConfig topicConfig = new TopicConfig(); + String topicConfigJson = new String(buffer, MixAll.DEFAULT_CHARSET); + topicConfig.decode(topicConfigJson); + topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + + int filterServerListJsonLength = readInt(inflaterInputStream); + + byte[] filterServerListBuffer = readBytes(inflaterInputStream, filterServerListJsonLength); + String filterServerListJson = new String(filterServerListBuffer, MixAll.DEFAULT_CHARSET); + List filterServerList = JSON.parseArray(filterServerListJson, String.class); + registerBrokerBody.setFilterServerList(filterServerList); + long interval = System.currentTimeMillis() - start; + if (interval > 50) { + LOGGER.info("Decompressing takes {}ms", interval); + } + return registerBrokerBody; + } + + private static byte[] convertIntToByteArray(int n) { + ByteBuffer byteBuffer = ByteBuffer.allocate(4); + byteBuffer.putInt(n); + return byteBuffer.array(); + } + + private static byte[] readBytes(InflaterInputStream inflaterInputStream, int length) throws IOException { + byte[] buffer = new byte[length]; + int bytesRead = 0; + while (bytesRead < length) { + int len = inflaterInputStream.read(buffer, bytesRead, length - bytesRead); + if (len == -1) { + throw new IOException("End of compressed data has reached"); + } else { + bytesRead += len; + } + } + return buffer; + } + + private static int readInt(InflaterInputStream inflaterInputStream) throws IOException { + byte[] buffer = readBytes(inflaterInputStream, 4); + ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); + return byteBuffer.getInt(); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java index 45d5b6e9ebd..de4043212fa 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java @@ -36,6 +36,8 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader { @CFNotNull private Long brokerId; + private boolean compressed; + @Override public void checkFields() throws RemotingCommandException { } @@ -79,4 +81,12 @@ public Long getBrokerId() { public void setBrokerId(Long brokerId) { this.brokerId = brokerId; } + + public boolean isCompressed() { + return compressed; + } + + public void setCompressed(boolean compressed) { + this.compressed = compressed; + } } diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBodyTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBodyTest.java new file mode 100644 index 00000000000..594819e2b10 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBodyTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.junit.Assert; +import org.junit.Test; + +public class RegisterBrokerBodyTest { + + @Test + public void encode() throws Exception { + RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); + List filterServerList = new ArrayList(); + for (int i = 0; i < 100; i++) { + filterServerList.add("localhost:10911"); + } + registerBrokerBody.setFilterServerList(filterServerList); + + TopicConfigSerializeWrapper wrapper = new TopicConfigSerializeWrapper(); + + ConcurrentMap topicConfigs = wrapper.getTopicConfigTable(); + + NumberFormat numberFormat = new DecimalFormat("0000000000"); + List topics = new ArrayList(); + for (int i = 0; i < 1024; i++) { + TopicConfig topicConfig = new TopicConfig("Topic" + numberFormat.format(i)); + topicConfigs.put(topicConfig.getTopicName(), topicConfig); + topics.add(topicConfig.getTopicName()); + } + registerBrokerBody.setTopicConfigSerializeWrapper(wrapper); + + byte[] compressed = registerBrokerBody.encode(true); + + RegisterBrokerBody registerBrokerBodyBackUp = RegisterBrokerBody.decode(compressed, true); + ConcurrentMap backupMap = registerBrokerBodyBackUp.getTopicConfigSerializeWrapper().getTopicConfigTable(); + + List backupTopics = new ArrayList(); + for (ConcurrentMap.Entry next : backupMap.entrySet()) { + backupTopics.add(next.getKey()); + } + Collections.sort(topics); + Collections.sort(backupTopics); + Assert.assertEquals(topics, backupTopics); + } +} \ No newline at end of file diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index 9647684224b..316fbe046d3 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.namesrv.processor; import io.netty.channel.ChannelHandlerContext; +import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; @@ -185,11 +186,16 @@ public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); - RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); + RegisterBrokerBody registerBrokerBody; if (request.getBody() != null) { - registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class); + try { + registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed()); + } catch (IOException e) { + throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e); + } } else { + registerBrokerBody = new RegisterBrokerBody(); registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0)); registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0); } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 7479fcc5f79..62a82eef985 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -152,7 +152,7 @@ public RegisterBrokerResult registerBroker( channel, haServerAddr)); if (null == prevBrokerLiveInfo) { - log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr); + log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } if (filterServerList != null) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java index f80ff14c107..13fdb1f0797 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java @@ -24,10 +24,7 @@ public abstract class RemotingSerializable { public static byte[] encode(final Object obj) { final String json = toJson(obj, false); - if (json != null) { - return json.getBytes(CHARSET_UTF8); - } - return null; + return json.getBytes(CHARSET_UTF8); } public static String toJson(final Object obj, boolean prettyFormat) { diff --git a/test/src/test/java/org/apache/rocketmq/test/namesrv/NamesrvStressTest.java b/test/src/test/java/org/apache/rocketmq/test/namesrv/NamesrvStressTest.java new file mode 100644 index 00000000000..bea4dd35264 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/namesrv/NamesrvStressTest.java @@ -0,0 +1,45 @@ +package org.apache.rocketmq.test.namesrv; + +import com.google.common.util.concurrent.RateLimiter; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import org.apache.rocketmq.broker.out.BrokerOuterAPI; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.test.base.IntegrationTestBase; +import org.junit.Test; + +public class NamesrvStressTest { + + @Test + public void stressCompress() { + NamesrvController namesrvController = IntegrationTestBase.createAndStartNamesrv(); + String nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort(); + + NettyClientConfig nettyClientConfig = new NettyClientConfig(); + BrokerOuterAPI brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); + brokerOuterAPI.updateNameServerAddressList(nsAddr); + brokerOuterAPI.start(); + TopicConfigSerializeWrapper topicConfigWrapper = new TopicConfigSerializeWrapper(); + ConcurrentMap topicConfigs = topicConfigWrapper.getTopicConfigTable(); + NumberFormat numberFormat = new DecimalFormat("0000000000"); + List topics = new ArrayList(); + for (int i = 0; i < 10240; i++) { + TopicConfig topicConfig = new TopicConfig("Topic" + numberFormat.format(i)); + topicConfigs.put(topicConfig.getTopicName(), topicConfig); + topics.add(topicConfig.getTopicName()); + } + + RateLimiter rateLimiter = RateLimiter.create(256); + for (int i = 0; i < 32; i++) { + rateLimiter.acquire(); + brokerOuterAPI.registerBrokerAll("TestCluster", "localhost:10911", "broker-a", + i % 2 , "localhost:10912", topicConfigWrapper, null, false, 3000); + } + } +}