From cfd3bf7816abfbe472b0c366410b6283d3f63673 Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Fri, 15 Jan 2021 17:38:25 +0800 Subject: [PATCH] [TUBEMQ-512] Add package length control based on Topic --- .../server/broker/BrokerServiceServer.java | 12 +++--- .../broker/metadata/ClusterConfigHolder.java | 25 +++++++---- .../server/broker/metadata/TopicMetadata.java | 43 ++++++++++++++++++- .../server/broker/msgstore/MessageStore.java | 14 +++--- .../server/common/TServerConstants.java | 1 + .../common/paramcheck/PBParameterUtils.java | 2 +- .../bdbentitys/BdbTopicConfEntity.java | 18 ++++++++ .../nodebroker/BrokerConfManager.java | 6 +++ .../nodebroker/BrokerSyncStatusInfo.java | 15 +++++++ .../handler/WebBrokerTopicConfHandler.java | 43 +++++++++++++++++-- 10 files changed, 153 insertions(+), 26 deletions(-) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java index c9cfba4ba93..a95fa09c207 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java @@ -55,8 +55,8 @@ import org.apache.tubemq.corerpc.service.BrokerReadService; import org.apache.tubemq.corerpc.service.BrokerWriteService; import org.apache.tubemq.server.Server; -import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder; import org.apache.tubemq.server.broker.metadata.MetadataManager; +import org.apache.tubemq.server.broker.metadata.TopicMetadata; import org.apache.tubemq.server.broker.msgstore.MessageStore; import org.apache.tubemq.server.broker.msgstore.MessageStoreManager; import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult; @@ -621,7 +621,8 @@ public SendMessageResponseB2P sendMessageP2B(SendMessageRequestP2B request, builder.setErrMsg(result.errInfo); return builder.build(); } - final String topicName = (String) result.retData1; + final TopicMetadata topicMetadata = (TopicMetadata) result.retData1; + final String topicName = topicMetadata.getTopic(); String msgType = null; int msgTypeCode = -1; if (TStringUtils.isNotBlank(request.getMsgType())) { @@ -635,10 +636,10 @@ public SendMessageResponseB2P sendMessageP2B(SendMessageRequestP2B request, builder.setErrMsg("data length is zero!"); return builder.build(); } - if (dataLength > ClusterConfigHolder.getMaxMsgSize()) { + if (dataLength > topicMetadata.getMaxMsgSize()) { builder.setErrCode(TErrCodeConstants.BAD_REQUEST); builder.setErrMsg(strBuffer.append("data length over max length, allowed max length is ") - .append(ClusterConfigHolder.getMaxMsgSize()) + .append(topicMetadata.getMaxMsgSize()) .append(", data length is ").append(dataLength).toString()); return builder.build(); } @@ -1137,7 +1138,8 @@ public CommitOffsetResponseB2C consumerCommitC2B(CommitOffsetRequestC2B request, builder.setErrMsg(result.errInfo); return builder.build(); } - final String topicName = (String) result.retData1; + final TopicMetadata topicMetadata = (TopicMetadata) result.retData1; + final String topicName = topicMetadata.getTopic(); String partStr = getPartStr(groupName, topicName, partitionId); ConsumerNodeInfo consumerNodeInfo = consumerRegisterMap.get(partStr); if (consumerNodeInfo == null) { diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java index 1b370d5391b..7d89dba6e8f 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java @@ -22,7 +22,7 @@ import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.protobuf.generated.ClientMaster; import org.apache.tubemq.corebase.utils.MixedUtils; - +import org.apache.tubemq.corebase.utils.Tuple2; public class ClusterConfigHolder { @@ -46,14 +46,11 @@ public static void updClusterSetting(ClientMaster.ClusterConfig clusterConfig) { if (configId.get() != clusterConfig.getConfigId()) { configId.set(clusterConfig.getConfigId()); if (clusterConfig.hasMaxMsgSize()) { - int tmpMaxSize = MixedUtils.mid(clusterConfig.getMaxMsgSize(), - TBaseConstants.META_MAX_MESSAGE_DATA_SIZE, - TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT) - + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE; - if (tmpMaxSize != maxMsgSize.get()) { - maxMsgSize.set(tmpMaxSize); - minMemCacheSize.set(tmpMaxSize + - (tmpMaxSize % 4 + 1) * TBaseConstants.META_MESSAGE_SIZE_ADJUST); + Tuple2 calcResult = + calcMaxMsgSize(clusterConfig.getMaxMsgSize()); + if (calcResult.getF0() != maxMsgSize.get()) { + maxMsgSize.set(calcResult.getF0()); + minMemCacheSize.set(calcResult.getF1()); } } } @@ -71,4 +68,14 @@ public static int getMinMemCacheSize() { return minMemCacheSize.get(); } + public static Tuple2 calcMaxMsgSize(int maxMsgSize) { + int tmpMaxSize = MixedUtils.mid(maxMsgSize, + TBaseConstants.META_MAX_MESSAGE_DATA_SIZE, + TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT) + + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE; + int tmpMinMemCacheSize = tmpMaxSize + + (tmpMaxSize % 4 + 1) * TBaseConstants.META_MESSAGE_SIZE_ADJUST; + return new Tuple2<>(tmpMaxSize, tmpMinMemCacheSize); + } + } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java index 800254b00fc..6e47d63f74e 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java @@ -25,6 +25,7 @@ import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.TokenConstants; import org.apache.tubemq.corebase.utils.TStringUtils; +import org.apache.tubemq.corebase.utils.Tuple2; import org.apache.tubemq.server.common.TStatusConstants; @@ -62,6 +63,10 @@ public class TopicMetadata { private int memCacheMsgCnt = 5 * 1024; // the max interval(milliseconds) that topic's memory cache will flush to disk. private int memCacheFlushIntvl = 20000; + // the allowed max message size + private int maxMsgSize = TBaseConstants.META_VALUE_UNDEFINED; + // the allowed min memory cache size + private int minMemCacheSize = TBaseConstants.META_VALUE_UNDEFINED; /*** * Build TopicMetadata from brokerDefMetadata(default config) and topicMetaConfInfo(custom config). @@ -141,6 +146,17 @@ public TopicMetadata(final BrokerDefMetadata brokerDefMetadata, String topicMeta } else { this.memCacheFlushIntvl = Integer.parseInt(topicConfInfoArr[13]); } + this.maxMsgSize = ClusterConfigHolder.getMaxMsgSize(); + this.minMemCacheSize = ClusterConfigHolder.getMinMemCacheSize(); + if (topicConfInfoArr.length > 14) { + if (TStringUtils.isNotBlank(topicConfInfoArr[14])) { + int maxMsgSize = Integer.parseInt(topicConfInfoArr[14]); + Tuple2 calcResult = + ClusterConfigHolder.calcMaxMsgSize(maxMsgSize); + this.maxMsgSize = calcResult.getF0(); + this.minMemCacheSize = calcResult.getF1(); + } + } } private TopicMetadata(String topic, int unflushThreshold, @@ -149,7 +165,8 @@ private TopicMetadata(String topic, int unflushThreshold, int numPartitions, boolean acceptPublish, boolean acceptSubscribe, int statusId, int numTopicStores, int memCacheMsgSize, - int memCacheMsgCnt, int memCacheFlushIntvl) { + int memCacheMsgCnt, int memCacheFlushIntvl, + int maxMsgSize, int minMemCacheSize) { this.topic = topic; this.unflushThreshold = unflushThreshold; this.unflushInterval = unflushInterval; @@ -165,6 +182,8 @@ private TopicMetadata(String topic, int unflushThreshold, this.memCacheMsgSize = memCacheMsgSize; this.memCacheMsgCnt = memCacheMsgCnt; this.memCacheFlushIntvl = memCacheFlushIntvl; + this.maxMsgSize = maxMsgSize; + this.minMemCacheSize = minMemCacheSize; } @Override @@ -175,7 +194,8 @@ public TopicMetadata clone() { this.numPartitions, this.acceptPublish, this.acceptSubscribe, this.statusId, this.numTopicStores, this.memCacheMsgSize, - this.memCacheMsgCnt, this.memCacheFlushIntvl); + this.memCacheMsgCnt, this.memCacheFlushIntvl, + this.maxMsgSize, this.minMemCacheSize); } public boolean isAcceptPublish() { @@ -304,6 +324,14 @@ public int getMemCacheFlushIntvl() { return memCacheFlushIntvl; } + public int getMaxMsgSize() { + return maxMsgSize; + } + + public int getMinMemCacheSize() { + return minMemCacheSize; + } + @Override public int hashCode() { final int prime = 31; @@ -323,6 +351,8 @@ public int hashCode() { result = prime * result + this.memCacheMsgSize; result = prime * result + this.memCacheMsgCnt; result = prime * result + this.memCacheFlushIntvl; + result = prime * result + this.maxMsgSize; + result = prime * result + this.minMemCacheSize; return result; } @@ -400,6 +430,12 @@ public boolean equals(Object obj) { if (this.memCacheFlushIntvl != other.memCacheFlushIntvl) { return false; } + if (this.maxMsgSize != other.maxMsgSize) { + return false; + } + if (this.minMemCacheSize != other.minMemCacheSize) { + return false; + } return true; } @@ -418,6 +454,7 @@ public boolean isPropertyEquals(final TopicMetadata other) { && this.memCacheMsgSize == other.memCacheMsgSize && this.memCacheMsgCnt == other.memCacheMsgCnt && this.memCacheFlushIntvl == other.memCacheFlushIntvl + && this.maxMsgSize == other.maxMsgSize && this.deletePolicy.equals(other.deletePolicy)); } @@ -438,6 +475,8 @@ public String toString() { .append(", memCacheMsgSizeInMs=").append(this.memCacheMsgSize / 1024 / 512) .append(", memCacheMsgCntInK=").append(this.memCacheMsgCnt / 512) .append(", memCacheFlushIntvl=").append(this.memCacheFlushIntvl) + .append(", maxMsgSize=").append(this.maxMsgSize) + .append(", minMemCacheSize=").append(this.minMemCacheSize) .append("]").toString(); } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java index 1987c32d7fe..a025f7c7ec1 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java @@ -131,7 +131,7 @@ public MessageStore(final MessageStoreManager messageStoreManager, this.unflushThreshold.set(topicMetadata.getUnflushThreshold()); this.unflushDataHold.set(topicMetadata.getUnflushDataHold()); this.writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt(); - this.writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize()); + this.writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata); this.writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl(); int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum; memMaxIndexReadCnt.set(MixedUtils.mid(tmpIndexReadCnt, 6000, 10000)); @@ -419,7 +419,7 @@ public void refreshUnflushThreshold(TopicMetadata topicMetadata) { writeCacheMutex.readLock().lock(); try { writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt(); - writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize()); + writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata); writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl(); } finally { writeCacheMutex.readLock().unlock(); @@ -601,13 +601,15 @@ private long parseDeletePolicy(String delPolicy) { } } - private int validAndGetMemCacheSize(int memCacheSize) { - if (memCacheSize <= ClusterConfigHolder.getMinMemCacheSize()) { + private int validAndGetMemCacheSize(TopicMetadata topicMetadata) { + int memCacheSize = topicMetadata.getMemCacheMsgSize(); + if (memCacheSize <= topicMetadata.getMinMemCacheSize()) { logger.info(new StringBuilder(512) - .append("[Data Store] writeCacheMaxSize changed, from ") + .append("[Data Store] ").append(getTopic()) + .append(" writeCacheMaxSize changed, from ") .append(memCacheSize).append(" to ") .append(ClusterConfigHolder.getMinMemCacheSize()).toString()); - memCacheSize = ClusterConfigHolder.getMinMemCacheSize(); + memCacheSize = topicMetadata.getMinMemCacheSize(); } return memCacheSize; } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java index 57933647530..4e2547a0bcd 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java @@ -26,6 +26,7 @@ public final class TServerConstants { public static final String TOKEN_JOB_STORE_MGR = "messageStoreManager"; public static final String TOKEN_DEFAULT_FLOW_CONTROL = "default_master_ctrl"; public static final String TOKEN_DEFAULT_CLUSTER_SETTING = "default_cluster_config"; + public static final String TOKEN_MAX_MSG_SIZE = "maxMsgSize"; public static final String TOKEN_BLANK_FILTER_CONDITION = ",,"; diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java index 1c2d5ca3716..c27ad5bd20a 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java @@ -608,7 +608,7 @@ public static boolean getTopicNamePartIdInfo(String topicName, int partitionId, strBuffer.delete(0, strBuffer.length()); return result.success; } - result.setSuccResult(tmpValue); + result.setSuccResult(topicMetadata); return result.success; } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java index 3763ef929cc..de9f6efd4fd 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java @@ -345,6 +345,23 @@ public void setMemCacheFlushIntvl(final int memCacheFlushIntvl) { String.valueOf(memCacheFlushIntvl)); } + public int getMaxMsgSize() { + String atrVal = + TStringUtils.getAttrValFrmAttributes(this.attributes, + TServerConstants.TOKEN_MAX_MSG_SIZE); + if (atrVal != null) { + return Integer.parseInt(atrVal); + } + return TBaseConstants.META_VALUE_UNDEFINED; + } + + public void setMaxMsgSize(int maxMsgSize) { + this.attributes = + TStringUtils.setAttrValToAttributes(this.attributes, + TServerConstants.TOKEN_MAX_MSG_SIZE, + String.valueOf(maxMsgSize)); + } + public void appendAttributes(String attrKey, String attrVal) { this.attributes = TStringUtils.setAttrValToAttributes(this.attributes, attrKey, attrVal); @@ -375,6 +392,7 @@ public StringBuilder toJsonString(final StringBuilder sBuilder) { .append(",\"memCacheMsgCntInK\":").append(getMemCacheMsgCntInK()) .append(",\"memCacheMsgSizeInMB\":").append(getMemCacheMsgSizeInMB()) .append(",\"memCacheFlushIntvl\":").append(getMemCacheFlushIntvl()) + .append(",\"maxMsgSize\":").append(getMaxMsgSize()) .append(",\"dataPath\":\"").append(dataPath) .append("\",\"createUser\":\"").append(createUser) .append("\",\"createDate\":\"") diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java index 3eae7dccd22..56f1914f7c1 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java @@ -1205,6 +1205,12 @@ private List innGetTopicStrConfigInfo(BdbBrokerConfEntity brokerConfEnti } else { sbuffer.append(TokenConstants.ATTR_SEP).append(topicEntity.getMemCacheFlushIntvl()); } + int maxMsgSize = topicEntity.getMaxMsgSize(); + if (maxMsgSize == TBaseConstants.META_VALUE_UNDEFINED) { + sbuffer.append(TokenConstants.ATTR_SEP).append(" "); + } else { + sbuffer.append(TokenConstants.ATTR_SEP).append(maxMsgSize); + } brokerTopicStrConfSet.add(sbuffer.toString()); sbuffer.delete(0, sbuffer.length()); } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java index 176c39a2dd7..83e2794113d 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java @@ -26,6 +26,8 @@ import org.apache.tubemq.corebase.TokenConstants; import org.apache.tubemq.corebase.utils.CheckSum; import org.apache.tubemq.corebase.utils.TStringUtils; +import org.apache.tubemq.corebase.utils.Tuple2; +import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder; import org.apache.tubemq.server.common.TServerConstants; import org.apache.tubemq.server.common.TStatusConstants; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity; @@ -730,6 +732,8 @@ private StringBuilder getBrokerAndTopicConfJsonInfo(String brokerConfInfo, int tmpmemCacheMsgSizeInMB = memCacheMsgSizeInMB; int tmpmemCacheMsgCntInK = memCacheMsgCntInK; int tmpmemCacheFlushIntvl = memCacheFlushIntvl; + int tmpMaxMsgSize = ClusterConfigHolder.getMaxMsgSize(); + int tmpMinMemCacheSize = ClusterConfigHolder.getMinMemCacheSize(); if (!TStringUtils.isBlank(topicConfInfoArr[11])) { tmpmemCacheMsgSizeInMB = Integer.parseInt(topicConfInfoArr[11]); } @@ -739,9 +743,20 @@ private StringBuilder getBrokerAndTopicConfJsonInfo(String brokerConfInfo, if (!TStringUtils.isBlank(topicConfInfoArr[13])) { tmpmemCacheFlushIntvl = Integer.parseInt(topicConfInfoArr[13]); } + if (topicConfInfoArr.length > 14) { + if (!TStringUtils.isNotBlank(topicConfInfoArr[14])) { + tmpMaxMsgSize = Integer.parseInt(topicConfInfoArr[14]); + Tuple2 calcResult = + ClusterConfigHolder.calcMaxMsgSize(tmpMaxMsgSize); + tmpMaxMsgSize = calcResult.getF0(); + tmpMinMemCacheSize = calcResult.getF1(); + } + } strBuffer.append(",\"memCacheMsgSizeInMB\":").append(tmpmemCacheMsgSizeInMB); strBuffer.append(",\"memCacheMsgCntInK\":").append(tmpmemCacheMsgCntInK); strBuffer.append(",\"memCacheFlushIntvl\":").append(tmpmemCacheFlushIntvl); + strBuffer.append(",\"maxMsgSize\":").append(tmpMaxMsgSize); + strBuffer.append(",\"minMemCacheSize\":").append(tmpMinMemCacheSize); strBuffer.append(",\"topicStatusId\":").append(topicStatusId); strBuffer.append("}"); } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java index 8bef5ab346e..9a4cf042afd 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java @@ -25,11 +25,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; + import javax.servlet.http.HttpServletRequest; import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.TokenConstants; import org.apache.tubemq.corebase.cluster.BrokerInfo; import org.apache.tubemq.corebase.cluster.TopicInfo; +import org.apache.tubemq.corebase.utils.SettingValidUtils; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.server.common.TServerConstants; import org.apache.tubemq.server.common.TStatusConstants; @@ -212,6 +214,11 @@ public StringBuilder adminAddTopicEntityInfo(HttpServletRequest req) throws Exce WebParameterUtils.validIntDataParameter("memCacheFlushIntvl", req.getParameter("memCacheFlushIntvl"), false, defmemCacheFlushIntvl, 4000); + int maxMsgSizeInMB = + WebParameterUtils.validIntDataParameter("maxMsgSizeInMB", + req.getParameter("maxMsgSizeInMB"), + false, TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB, + TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB); String attributes = strBuffer.append(TokenConstants.TOKEN_STORE_NUM) .append(TokenConstants.EQ).append(numTopicStores) .append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_DATA_UNFLUSHHOLD) @@ -221,7 +228,11 @@ public StringBuilder adminAddTopicEntityInfo(HttpServletRequest req) throws Exce .append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_MSG_SIZE) .append(TokenConstants.EQ).append(memCacheMsgSizeInMB) .append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL) - .append(TokenConstants.EQ).append(memCacheFlushIntvl).toString(); + .append(TokenConstants.EQ).append(memCacheFlushIntvl) + .append(TokenConstants.SEGMENT_SEP).append(TServerConstants.TOKEN_MAX_MSG_SIZE) + .append(TokenConstants.EQ) + .append(SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB)) + .toString(); strBuffer.delete(0, strBuffer.length()); for (String itemTopicName : batchAddTopicNames) { batchAddBdbTopicEntities.add(new BdbTopicConfEntity(oldEntity.getBrokerId(), @@ -359,6 +370,11 @@ public StringBuilder adminBatchAddTopicEntityInfo(HttpServletRequest req) throws WebParameterUtils.validIntDataParameter("memCacheFlushIntvl", jsonObject.get("memCacheFlushIntvl"), false, brokerConfEntity.getDftMemCacheFlushIntvl(), 4000); + int maxMsgSizeInMB = + WebParameterUtils.validIntDataParameter("maxMsgSizeInMB", + jsonObject.get("maxMsgSizeInMB"), + false, TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB, + TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB); String itemCreateUser = WebParameterUtils.validStringParameter("createUser", jsonObject.get("createUser"), @@ -381,7 +397,11 @@ public StringBuilder adminBatchAddTopicEntityInfo(HttpServletRequest req) throws .append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_MSG_SIZE) .append(TokenConstants.EQ).append(memCacheMsgSizeInMB) .append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL) - .append(TokenConstants.EQ).append(memCacheFlushIntvl).toString(); + .append(TokenConstants.EQ).append(memCacheFlushIntvl) + .append(TokenConstants.SEGMENT_SEP).append(TServerConstants.TOKEN_MAX_MSG_SIZE) + .append(TokenConstants.EQ) + .append(SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB)) + .toString(); strBuffer.delete(0, strBuffer.length()); batchAddItemKeys.add(inputKey); batchAddBdbTopicEntities.add(new BdbTopicConfEntity(brokerConfEntity.getBrokerId(), @@ -595,6 +615,8 @@ public StringBuilder adminQueryTopicCfgEntityAndRunInfo(HttpServletRequest req) .append(",\"memCacheMsgSizeInMB\":").append(entity.getMemCacheMsgSizeInMB()) .append(",\"memCacheFlushIntvl\":").append(entity.getMemCacheFlushIntvl()) .append(",\"memCacheMsgCntInK\":").append(entity.getMemCacheMsgCntInK()) + .append(",\"maxMsgSizeInMB\":") + .append(entity.getMaxMsgSize() / TBaseConstants.META_MB_UNIT_SIZE) .append(",\"createUser\":\"").append(entity.getCreateUser()) .append("\",\"createDate\":\"").append(formatter.format(entity.getCreateDate())) .append("\",\"modifyUser\":\"").append(entity.getModifyUser()) @@ -1334,13 +1356,19 @@ public StringBuilder adminModifyTopicEntityInfo(HttpServletRequest req) throws E int memCacheMsgSizeInMB = WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB", req.getParameter("memCacheMsgSizeInMB"), false, TBaseConstants.META_VALUE_UNDEFINED, 2); - memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : memCacheMsgSizeInMB; + memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048); int memCacheFlushIntvl = WebParameterUtils.validIntDataParameter("memCacheFlushIntvl", req.getParameter("memCacheFlushIntvl"), false, TBaseConstants.META_VALUE_UNDEFINED, 4000); int unFlushDataHold = WebParameterUtils.validIntDataParameter("unflushDataHold", req.getParameter("unflushDataHold"), false, TBaseConstants.META_VALUE_UNDEFINED, 0); + int maxMsgSizeInMB = + WebParameterUtils.validIntDataParameter("maxMsgSizeInMB", + req.getParameter("maxMsgSizeInMB"), + false, TBaseConstants.META_VALUE_UNDEFINED, + TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB); + List batchModBdbTopicEntities = new ArrayList<>(); for (BdbBrokerConfEntity tgtEntity : batchBrokerEntitySet) { if (tgtEntity == null) { @@ -1418,6 +1446,15 @@ public StringBuilder adminModifyTopicEntityInfo(HttpServletRequest req) throws E newEntity.appendAttributes(TokenConstants.TOKEN_MCACHE_MSG_SIZE, String.valueOf(memCacheMsgSizeInMB)); } + if (maxMsgSizeInMB > 0) { + int maxMsgSizeInB = + SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB); + if (maxMsgSizeInB != oldEntity.getMaxMsgSize()) { + foundChange = true; + newEntity.appendAttributes(TServerConstants.TOKEN_MAX_MSG_SIZE, + String.valueOf(maxMsgSizeInB)); + } + } if (memCacheFlushIntvl >= 0 && memCacheFlushIntvl != oldEntity.getMemCacheFlushIntvl()) { foundChange = true; newEntity.appendAttributes(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL,