Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
Expand Down Expand Up @@ -148,12 +149,15 @@ private RegisterBrokerResult registerBroker(
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(true);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);

if (request.getVersion() <= 0) {
request.setVersion(MQVersion.CURRENT_VERSION);
}
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
request.setBody(requestBody.encode());
request.setBody(requestBody.encode(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea!
How about setting a threshold for compression being triggered, because if the data you compress is small, it overhead is higher than the benefits you get from the compression?


if (oneway) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,28 @@

package org.apache.rocketmq.common.protocol.body;

import com.alibaba.fastjson.JSON;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RegisterBrokerBody extends RemotingSerializable {

private static final Logger LOGGER = LoggerFactory.getLogger(RegisterBrokerBody.class);

private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
private List<String> filterServerList = new ArrayList<String>();

Expand All @@ -40,4 +57,122 @@ public List<String> getFilterServerList() {
public void setFilterServerList(List<String> filterServerList) {
this.filterServerList = filterServerList;
}

public byte[] encode(boolean compress) {

if (!compress) {
return encode();
}
long start = System.currentTimeMillis();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION));
DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigSerializeWrapper.getTopicConfigTable();
assert topicConfigTable != null;
try {
byte[] buffer = dataVersion.encode();

// write data version
outputStream.write(convertIntToByteArray(buffer.length));
outputStream.write(buffer);

int topicNumber = topicConfigTable.size();

// write number of topic configs
outputStream.write(convertIntToByteArray(topicNumber));

// write topic config entry one by one.
for (ConcurrentMap.Entry<String, TopicConfig> next : topicConfigTable.entrySet()) {
buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET);
outputStream.write(convertIntToByteArray(buffer.length));
outputStream.write(buffer);
}

buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET);

// write filter server list json length
outputStream.write(convertIntToByteArray(buffer.length));

// write filter server list json
outputStream.write(buffer);

outputStream.finish();
long interval = System.currentTimeMillis() - start;
if (interval > 50) {
LOGGER.info("Compressing takes {}ms", interval);
}
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
LOGGER.error("Failed to compress RegisterBrokerBody object", e);
}

return null;
}

public static RegisterBrokerBody decode(byte[] data, boolean compressed) throws IOException {
if (!compressed) {
return RegisterBrokerBody.decode(data, RegisterBrokerBody.class);
}
long start = System.currentTimeMillis();
InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(data));
int dataVersionLength = readInt(inflaterInputStream);
byte[] dataVersionBytes = readBytes(inflaterInputStream, dataVersionLength);
DataVersion dataVersion = DataVersion.decode(dataVersionBytes, DataVersion.class);

RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
ConcurrentMap<String, TopicConfig> topicConfigTable = registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();


int topicConfigNumber = readInt(inflaterInputStream);
LOGGER.debug("{} topic configs to extract", topicConfigNumber);

for (int i = 0; i < topicConfigNumber; i++) {
int topicConfigJsonLength = readInt(inflaterInputStream);

byte[] buffer = readBytes(inflaterInputStream, topicConfigJsonLength);
TopicConfig topicConfig = new TopicConfig();
String topicConfigJson = new String(buffer, MixAll.DEFAULT_CHARSET);
topicConfig.decode(topicConfigJson);
topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}

int filterServerListJsonLength = readInt(inflaterInputStream);

byte[] filterServerListBuffer = readBytes(inflaterInputStream, filterServerListJsonLength);
String filterServerListJson = new String(filterServerListBuffer, MixAll.DEFAULT_CHARSET);
List<String> filterServerList = JSON.parseArray(filterServerListJson, String.class);
registerBrokerBody.setFilterServerList(filterServerList);
long interval = System.currentTimeMillis() - start;
if (interval > 50) {
LOGGER.info("Decompressing takes {}ms", interval);
}
return registerBrokerBody;
}

private static byte[] convertIntToByteArray(int n) {
ByteBuffer byteBuffer = ByteBuffer.allocate(4);
byteBuffer.putInt(n);
return byteBuffer.array();
}

private static byte[] readBytes(InflaterInputStream inflaterInputStream, int length) throws IOException {
byte[] buffer = new byte[length];
int bytesRead = 0;
while (bytesRead < length) {
int len = inflaterInputStream.read(buffer, bytesRead, length - bytesRead);
if (len == -1) {
throw new IOException("End of compressed data has reached");
} else {
bytesRead += len;
}
}
return buffer;
}

private static int readInt(InflaterInputStream inflaterInputStream) throws IOException {
byte[] buffer = readBytes(inflaterInputStream, 4);
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
return byteBuffer.getInt();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
@CFNotNull
private Long brokerId;

private boolean compressed;

@Override
public void checkFields() throws RemotingCommandException {
}
Expand Down Expand Up @@ -79,4 +81,12 @@ public Long getBrokerId() {
public void setBrokerId(Long brokerId) {
this.brokerId = brokerId;
}

public boolean isCompressed() {
return compressed;
}

public void setCompressed(boolean compressed) {
this.compressed = compressed;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common.protocol.body;

import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.junit.Assert;
import org.junit.Test;

public class RegisterBrokerBodyTest {

@Test
public void encode() throws Exception {
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
List<String> filterServerList = new ArrayList<String>();
for (int i = 0; i < 100; i++) {
filterServerList.add("localhost:10911");
}
registerBrokerBody.setFilterServerList(filterServerList);

TopicConfigSerializeWrapper wrapper = new TopicConfigSerializeWrapper();

ConcurrentMap<String, TopicConfig> topicConfigs = wrapper.getTopicConfigTable();

NumberFormat numberFormat = new DecimalFormat("0000000000");
List<String> topics = new ArrayList<String>();
for (int i = 0; i < 1024; i++) {
TopicConfig topicConfig = new TopicConfig("Topic" + numberFormat.format(i));
topicConfigs.put(topicConfig.getTopicName(), topicConfig);
topics.add(topicConfig.getTopicName());
}
registerBrokerBody.setTopicConfigSerializeWrapper(wrapper);

byte[] compressed = registerBrokerBody.encode(true);

RegisterBrokerBody registerBrokerBodyBackUp = RegisterBrokerBody.decode(compressed, true);
ConcurrentMap<String, TopicConfig> backupMap = registerBrokerBodyBackUp.getTopicConfigSerializeWrapper().getTopicConfigTable();

List<String> backupTopics = new ArrayList<String>();
for (ConcurrentMap.Entry<String, TopicConfig> next : backupMap.entrySet()) {
backupTopics.add(next.getKey());
}
Collections.sort(topics);
Collections.sort(backupTopics);
Assert.assertEquals(topics, backupTopics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.namesrv.processor;

import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -185,11 +186,16 @@ public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx,
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
RegisterBrokerBody registerBrokerBody;

if (request.getBody() != null) {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
try {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
} catch (IOException e) {
throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
}
} else {
registerBrokerBody = new RegisterBrokerBody();
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public RegisterBrokerResult registerBroker(
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}

if (filterServerList != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ public abstract class RemotingSerializable {

public static byte[] encode(final Object obj) {
final String json = toJson(obj, false);
if (json != null) {
return json.getBytes(CHARSET_UTF8);
}
return null;
return json.getBytes(CHARSET_UTF8);
}

public static String toJson(final Object obj, boolean prettyFormat) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.apache.rocketmq.test.namesrv;

import com.google.common.util.concurrent.RateLimiter;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.junit.Test;

public class NamesrvStressTest {

@Test
public void stressCompress() {
NamesrvController namesrvController = IntegrationTestBase.createAndStartNamesrv();
String nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();

NettyClientConfig nettyClientConfig = new NettyClientConfig();
BrokerOuterAPI brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
brokerOuterAPI.updateNameServerAddressList(nsAddr);
brokerOuterAPI.start();
TopicConfigSerializeWrapper topicConfigWrapper = new TopicConfigSerializeWrapper();
ConcurrentMap<String, TopicConfig> topicConfigs = topicConfigWrapper.getTopicConfigTable();
NumberFormat numberFormat = new DecimalFormat("0000000000");
List<String> topics = new ArrayList<String>();
for (int i = 0; i < 10240; i++) {
TopicConfig topicConfig = new TopicConfig("Topic" + numberFormat.format(i));
topicConfigs.put(topicConfig.getTopicName(), topicConfig);
topics.add(topicConfig.getTopicName());
}

RateLimiter rateLimiter = RateLimiter.create(256);
for (int i = 0; i < 32; i++) {
rateLimiter.acquire();
brokerOuterAPI.registerBrokerAll("TestCluster", "localhost:10911", "broker-a",
i % 2 , "localhost:10912", topicConfigWrapper, null, false, 3000);
}
}
}