Skip to content

Commit

Permalink
[TUBEMQ-512] Add package length control based on Topic
Browse files Browse the repository at this point in the history
  • Loading branch information
gosonzhang authored and gosonzhang committed Jan 16, 2021
1 parent b3b7803 commit cfd3bf7
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
import org.apache.tubemq.corerpc.service.BrokerReadService;
import org.apache.tubemq.corerpc.service.BrokerWriteService;
import org.apache.tubemq.server.Server;
import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.broker.metadata.MetadataManager;
import org.apache.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
Expand Down Expand Up @@ -621,7 +621,8 @@ public SendMessageResponseB2P sendMessageP2B(SendMessageRequestP2B request,
builder.setErrMsg(result.errInfo);
return builder.build();
}
final String topicName = (String) result.retData1;
final TopicMetadata topicMetadata = (TopicMetadata) result.retData1;
final String topicName = topicMetadata.getTopic();
String msgType = null;
int msgTypeCode = -1;
if (TStringUtils.isNotBlank(request.getMsgType())) {
Expand All @@ -635,10 +636,10 @@ public SendMessageResponseB2P sendMessageP2B(SendMessageRequestP2B request,
builder.setErrMsg("data length is zero!");
return builder.build();
}
if (dataLength > ClusterConfigHolder.getMaxMsgSize()) {
if (dataLength > topicMetadata.getMaxMsgSize()) {
builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
builder.setErrMsg(strBuffer.append("data length over max length, allowed max length is ")
.append(ClusterConfigHolder.getMaxMsgSize())
.append(topicMetadata.getMaxMsgSize())
.append(", data length is ").append(dataLength).toString());
return builder.build();
}
Expand Down Expand Up @@ -1137,7 +1138,8 @@ public CommitOffsetResponseB2C consumerCommitC2B(CommitOffsetRequestC2B request,
builder.setErrMsg(result.errInfo);
return builder.build();
}
final String topicName = (String) result.retData1;
final TopicMetadata topicMetadata = (TopicMetadata) result.retData1;
final String topicName = topicMetadata.getTopic();
String partStr = getPartStr(groupName, topicName, partitionId);
ConsumerNodeInfo consumerNodeInfo = consumerRegisterMap.get(partStr);
if (consumerNodeInfo == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.protobuf.generated.ClientMaster;
import org.apache.tubemq.corebase.utils.MixedUtils;

import org.apache.tubemq.corebase.utils.Tuple2;


public class ClusterConfigHolder {
Expand All @@ -46,14 +46,11 @@ public static void updClusterSetting(ClientMaster.ClusterConfig clusterConfig) {
if (configId.get() != clusterConfig.getConfigId()) {
configId.set(clusterConfig.getConfigId());
if (clusterConfig.hasMaxMsgSize()) {
int tmpMaxSize = MixedUtils.mid(clusterConfig.getMaxMsgSize(),
TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT)
+ TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE;
if (tmpMaxSize != maxMsgSize.get()) {
maxMsgSize.set(tmpMaxSize);
minMemCacheSize.set(tmpMaxSize +
(tmpMaxSize % 4 + 1) * TBaseConstants.META_MESSAGE_SIZE_ADJUST);
Tuple2<Integer, Integer> calcResult =
calcMaxMsgSize(clusterConfig.getMaxMsgSize());
if (calcResult.getF0() != maxMsgSize.get()) {
maxMsgSize.set(calcResult.getF0());
minMemCacheSize.set(calcResult.getF1());
}
}
}
Expand All @@ -71,4 +68,14 @@ public static int getMinMemCacheSize() {
return minMemCacheSize.get();
}

public static Tuple2<Integer, Integer> calcMaxMsgSize(int maxMsgSize) {
int tmpMaxSize = MixedUtils.mid(maxMsgSize,
TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT)
+ TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE;
int tmpMinMemCacheSize = tmpMaxSize +
(tmpMaxSize % 4 + 1) * TBaseConstants.META_MESSAGE_SIZE_ADJUST;
return new Tuple2<>(tmpMaxSize, tmpMinMemCacheSize);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.common.TStatusConstants;


Expand Down Expand Up @@ -62,6 +63,10 @@ public class TopicMetadata {
private int memCacheMsgCnt = 5 * 1024;
// the max interval(milliseconds) that topic's memory cache will flush to disk.
private int memCacheFlushIntvl = 20000;
// the allowed max message size
private int maxMsgSize = TBaseConstants.META_VALUE_UNDEFINED;
// the allowed min memory cache size
private int minMemCacheSize = TBaseConstants.META_VALUE_UNDEFINED;

/***
* Build TopicMetadata from brokerDefMetadata(default config) and topicMetaConfInfo(custom config).
Expand Down Expand Up @@ -141,6 +146,17 @@ public TopicMetadata(final BrokerDefMetadata brokerDefMetadata, String topicMeta
} else {
this.memCacheFlushIntvl = Integer.parseInt(topicConfInfoArr[13]);
}
this.maxMsgSize = ClusterConfigHolder.getMaxMsgSize();
this.minMemCacheSize = ClusterConfigHolder.getMinMemCacheSize();
if (topicConfInfoArr.length > 14) {
if (TStringUtils.isNotBlank(topicConfInfoArr[14])) {
int maxMsgSize = Integer.parseInt(topicConfInfoArr[14]);
Tuple2<Integer, Integer> calcResult =
ClusterConfigHolder.calcMaxMsgSize(maxMsgSize);
this.maxMsgSize = calcResult.getF0();
this.minMemCacheSize = calcResult.getF1();
}
}
}

private TopicMetadata(String topic, int unflushThreshold,
Expand All @@ -149,7 +165,8 @@ private TopicMetadata(String topic, int unflushThreshold,
int numPartitions, boolean acceptPublish,
boolean acceptSubscribe, int statusId,
int numTopicStores, int memCacheMsgSize,
int memCacheMsgCnt, int memCacheFlushIntvl) {
int memCacheMsgCnt, int memCacheFlushIntvl,
int maxMsgSize, int minMemCacheSize) {
this.topic = topic;
this.unflushThreshold = unflushThreshold;
this.unflushInterval = unflushInterval;
Expand All @@ -165,6 +182,8 @@ private TopicMetadata(String topic, int unflushThreshold,
this.memCacheMsgSize = memCacheMsgSize;
this.memCacheMsgCnt = memCacheMsgCnt;
this.memCacheFlushIntvl = memCacheFlushIntvl;
this.maxMsgSize = maxMsgSize;
this.minMemCacheSize = minMemCacheSize;
}

@Override
Expand All @@ -175,7 +194,8 @@ public TopicMetadata clone() {
this.numPartitions, this.acceptPublish,
this.acceptSubscribe, this.statusId,
this.numTopicStores, this.memCacheMsgSize,
this.memCacheMsgCnt, this.memCacheFlushIntvl);
this.memCacheMsgCnt, this.memCacheFlushIntvl,
this.maxMsgSize, this.minMemCacheSize);
}

public boolean isAcceptPublish() {
Expand Down Expand Up @@ -304,6 +324,14 @@ public int getMemCacheFlushIntvl() {
return memCacheFlushIntvl;
}

public int getMaxMsgSize() {
return maxMsgSize;
}

public int getMinMemCacheSize() {
return minMemCacheSize;
}

@Override
public int hashCode() {
final int prime = 31;
Expand All @@ -323,6 +351,8 @@ public int hashCode() {
result = prime * result + this.memCacheMsgSize;
result = prime * result + this.memCacheMsgCnt;
result = prime * result + this.memCacheFlushIntvl;
result = prime * result + this.maxMsgSize;
result = prime * result + this.minMemCacheSize;
return result;
}

Expand Down Expand Up @@ -400,6 +430,12 @@ public boolean equals(Object obj) {
if (this.memCacheFlushIntvl != other.memCacheFlushIntvl) {
return false;
}
if (this.maxMsgSize != other.maxMsgSize) {
return false;
}
if (this.minMemCacheSize != other.minMemCacheSize) {
return false;
}

return true;
}
Expand All @@ -418,6 +454,7 @@ public boolean isPropertyEquals(final TopicMetadata other) {
&& this.memCacheMsgSize == other.memCacheMsgSize
&& this.memCacheMsgCnt == other.memCacheMsgCnt
&& this.memCacheFlushIntvl == other.memCacheFlushIntvl
&& this.maxMsgSize == other.maxMsgSize
&& this.deletePolicy.equals(other.deletePolicy));
}

Expand All @@ -438,6 +475,8 @@ public String toString() {
.append(", memCacheMsgSizeInMs=").append(this.memCacheMsgSize / 1024 / 512)
.append(", memCacheMsgCntInK=").append(this.memCacheMsgCnt / 512)
.append(", memCacheFlushIntvl=").append(this.memCacheFlushIntvl)
.append(", maxMsgSize=").append(this.maxMsgSize)
.append(", minMemCacheSize=").append(this.minMemCacheSize)
.append("]").toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public MessageStore(final MessageStoreManager messageStoreManager,
this.unflushThreshold.set(topicMetadata.getUnflushThreshold());
this.unflushDataHold.set(topicMetadata.getUnflushDataHold());
this.writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt();
this.writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize());
this.writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata);
this.writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum;
memMaxIndexReadCnt.set(MixedUtils.mid(tmpIndexReadCnt, 6000, 10000));
Expand Down Expand Up @@ -419,7 +419,7 @@ public void refreshUnflushThreshold(TopicMetadata topicMetadata) {
writeCacheMutex.readLock().lock();
try {
writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt();
writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize());
writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata);
writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
} finally {
writeCacheMutex.readLock().unlock();
Expand Down Expand Up @@ -601,13 +601,15 @@ private long parseDeletePolicy(String delPolicy) {
}
}

private int validAndGetMemCacheSize(int memCacheSize) {
if (memCacheSize <= ClusterConfigHolder.getMinMemCacheSize()) {
private int validAndGetMemCacheSize(TopicMetadata topicMetadata) {
int memCacheSize = topicMetadata.getMemCacheMsgSize();
if (memCacheSize <= topicMetadata.getMinMemCacheSize()) {
logger.info(new StringBuilder(512)
.append("[Data Store] writeCacheMaxSize changed, from ")
.append("[Data Store] ").append(getTopic())
.append(" writeCacheMaxSize changed, from ")
.append(memCacheSize).append(" to ")
.append(ClusterConfigHolder.getMinMemCacheSize()).toString());
memCacheSize = ClusterConfigHolder.getMinMemCacheSize();
memCacheSize = topicMetadata.getMinMemCacheSize();
}
return memCacheSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public final class TServerConstants {
public static final String TOKEN_JOB_STORE_MGR = "messageStoreManager";
public static final String TOKEN_DEFAULT_FLOW_CONTROL = "default_master_ctrl";
public static final String TOKEN_DEFAULT_CLUSTER_SETTING = "default_cluster_config";
public static final String TOKEN_MAX_MSG_SIZE = "maxMsgSize";

public static final String TOKEN_BLANK_FILTER_CONDITION = ",,";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ public static boolean getTopicNamePartIdInfo(String topicName, int partitionId,
strBuffer.delete(0, strBuffer.length());
return result.success;
}
result.setSuccResult(tmpValue);
result.setSuccResult(topicMetadata);
return result.success;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,23 @@ public void setMemCacheFlushIntvl(final int memCacheFlushIntvl) {
String.valueOf(memCacheFlushIntvl));
}

public int getMaxMsgSize() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
TServerConstants.TOKEN_MAX_MSG_SIZE);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
return TBaseConstants.META_VALUE_UNDEFINED;
}

public void setMaxMsgSize(int maxMsgSize) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes,
TServerConstants.TOKEN_MAX_MSG_SIZE,
String.valueOf(maxMsgSize));
}

public void appendAttributes(String attrKey, String attrVal) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes, attrKey, attrVal);
Expand Down Expand Up @@ -375,6 +392,7 @@ public StringBuilder toJsonString(final StringBuilder sBuilder) {
.append(",\"memCacheMsgCntInK\":").append(getMemCacheMsgCntInK())
.append(",\"memCacheMsgSizeInMB\":").append(getMemCacheMsgSizeInMB())
.append(",\"memCacheFlushIntvl\":").append(getMemCacheFlushIntvl())
.append(",\"maxMsgSize\":").append(getMaxMsgSize())
.append(",\"dataPath\":\"").append(dataPath)
.append("\",\"createUser\":\"").append(createUser)
.append("\",\"createDate\":\"")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,12 @@ private List<String> innGetTopicStrConfigInfo(BdbBrokerConfEntity brokerConfEnti
} else {
sbuffer.append(TokenConstants.ATTR_SEP).append(topicEntity.getMemCacheFlushIntvl());
}
int maxMsgSize = topicEntity.getMaxMsgSize();
if (maxMsgSize == TBaseConstants.META_VALUE_UNDEFINED) {
sbuffer.append(TokenConstants.ATTR_SEP).append(" ");
} else {
sbuffer.append(TokenConstants.ATTR_SEP).append(maxMsgSize);
}
brokerTopicStrConfSet.add(sbuffer.toString());
sbuffer.delete(0, sbuffer.length());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.CheckSum;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
Expand Down Expand Up @@ -730,6 +732,8 @@ private StringBuilder getBrokerAndTopicConfJsonInfo(String brokerConfInfo,
int tmpmemCacheMsgSizeInMB = memCacheMsgSizeInMB;
int tmpmemCacheMsgCntInK = memCacheMsgCntInK;
int tmpmemCacheFlushIntvl = memCacheFlushIntvl;
int tmpMaxMsgSize = ClusterConfigHolder.getMaxMsgSize();
int tmpMinMemCacheSize = ClusterConfigHolder.getMinMemCacheSize();
if (!TStringUtils.isBlank(topicConfInfoArr[11])) {
tmpmemCacheMsgSizeInMB = Integer.parseInt(topicConfInfoArr[11]);
}
Expand All @@ -739,9 +743,20 @@ private StringBuilder getBrokerAndTopicConfJsonInfo(String brokerConfInfo,
if (!TStringUtils.isBlank(topicConfInfoArr[13])) {
tmpmemCacheFlushIntvl = Integer.parseInt(topicConfInfoArr[13]);
}
if (topicConfInfoArr.length > 14) {
if (!TStringUtils.isNotBlank(topicConfInfoArr[14])) {
tmpMaxMsgSize = Integer.parseInt(topicConfInfoArr[14]);
Tuple2<Integer, Integer> calcResult =
ClusterConfigHolder.calcMaxMsgSize(tmpMaxMsgSize);
tmpMaxMsgSize = calcResult.getF0();
tmpMinMemCacheSize = calcResult.getF1();
}
}
strBuffer.append(",\"memCacheMsgSizeInMB\":").append(tmpmemCacheMsgSizeInMB);
strBuffer.append(",\"memCacheMsgCntInK\":").append(tmpmemCacheMsgCntInK);
strBuffer.append(",\"memCacheFlushIntvl\":").append(tmpmemCacheFlushIntvl);
strBuffer.append(",\"maxMsgSize\":").append(tmpMaxMsgSize);
strBuffer.append(",\"minMemCacheSize\":").append(tmpMinMemCacheSize);
strBuffer.append(",\"topicStatusId\":").append(topicStatusId);
strBuffer.append("}");
}
Expand Down
Loading

0 comments on commit cfd3bf7

Please sign in to comment.