Skip to content

Commit

Permalink
[TUBEMQ-140] Remove the SSD auxiliary consumption function (#89)
Browse files Browse the repository at this point in the history
Co-authored-by: gosonzhang <gosonzhang@tencent.com>
  • Loading branch information
gosonzhang and gosonzhang committed May 22, 2020
1 parent 63f8286 commit bccceba
Show file tree
Hide file tree
Showing 25 changed files with 57 additions and 1,944 deletions.
12 changes: 4 additions & 8 deletions docs/http_access_API_definition.md
Original file line number Diff line number Diff line change
Expand Up @@ -627,11 +627,10 @@ The flow control info is described in JSON format, for example:
```json
[{"type":0,"rule":[{"start":"08:00","end":"17:59","dltInM":1024,"limitInM":20,"freqInMs":1000},{"start":"18:00","end":"22:00","dltInM":1024,"limitInM":20,"freqInMs":5000}]},{"type":2,"rule":[{"start":"18:00","end":"23:59","dltStInM":20480,"dltEdInMM":2048}]},{"type":1,"rule":[{"zeroCnt":3,"freqInMs":300},{"zeroCnt":8,"freqInMs":1000}]},{"type":3,"rule":[{"normFreqInMs":0,"filterFreqInMs":100,"minDataFilterFreqInMs":400}]}]
```
The `type` has four values [0, 1, 2, 3]. 0: flow control, 1: frequency control, 2: delay to SSD storage control, 3: filter consumer frequency control,<br>
`[start, end]` is an inclusive range of time, `dltInM` is the consuming delta in MB, `dltStInM` is consuming data delta when enabling SSD to storage <br>
`dltEdInM` is consuming data delta when terminating SSD to storage, `limitInM` is the flow control each minute, `freqInMs` is the interval for sending request
after exceeding the flow or freq limit, `zeroCnt` is the count of how many times occurs zero data, `normFreqInMs` is the interval of sequential pulling,<br>
`filterFreqInMs` is the interval of pulling filtered request.
The `type` has four values [0, 1, 3]. 0: flow control, 1: frequency control, 3: filter consumer frequency control,<br>
`[start, end]` is an inclusive range of time, `dltInM` is the consuming delta in MB, `limitInM` is the flow control each minute, <br>
`freqInMs` is the interval for sending request after exceeding the flow or freq limit, `zeroCnt` is the count of how many times occurs zero data, <br>
`normFreqInMs` is the interval of sequential pulling, `filterFreqInMs` is the interval of pulling filtered request.

__Request__

Expand All @@ -658,7 +657,6 @@ __Request__
|StatusId|no| the strategy status Id, default 0|int|
|qryPriorityId|no| the consuming priority Id. It is a composed field `A0B` with default value 301, <br>the value of A,B is [1, 2, 3] which means file, backup memory, and main memory respectively|int|
|createUser|yes|the creator|String|
|needSSDProc|no|whether to enable SSD to handle, default false|Boolean|
|flowCtrlInfo|yes|the flow control info in JSON format|String|
|createDate|yes|the creating date in format `yyyyMMddHHmmss`|String|

Expand Down Expand Up @@ -688,7 +686,6 @@ __Request__
|StatusId|no| the strategy status Id, default 0|int|
|qryPriorityId|no|the consuming priority Id. It is a composed field `A0B` with default value 301,<br> the value of A,B is [1, 2, 3] which means file, backup memory, and main memory respectively|int|
|createUser|yes|the creator|String|
|needSSDProc|no|whether to enable SSD to handle, default false|Boolean|
|createDate|yes|the creating date in format `yyyyMMddHHmmss`|String|

### `admin_upd_group_flow_control_rule`
Expand All @@ -705,7 +702,6 @@ __Request__
|StatusId|no| the strategy status Id, default 0|int|
|qryPriorityId|no|the consuming priority Id. It is a composed field `A0B` with default value 301,<br> the value of A,B is [1, 2, 3] which means file, backup memory, and main memory respectively|int|
|createUser|yes|the creator|String|
|needSSDProc|no|whether to enable SSD to handle, default false|Boolean|
|createDate|yes|the creating date in format `yyyyMMddHHmmss`|String|


Expand Down
Binary file modified docs/http_access_API_definition_cn.xls
Binary file not shown.
3 changes: 0 additions & 3 deletions docs/tubemq_config_introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ In addition to the back-end system configuration file, the Master also stores th
| consumerRegTimeoutMs | no | long | Consumer heartbeat timeout, optional, in milliseconds, default 30 seconds |
| socketRecvBuffer | no | long | Socket receives the size of the Buffer buffer SO_RCVBUF, the unit byte, the negative number is not set, the default value is |
| socketSendBuffer | no | long | Socket sends Buffer buffer SO_SNDBUF size, unit byte, negative number is not set, the default value is |
| secondDataPath | no | string | The SSD to storage location where the broker is located, optional field. The default is blank to indicate that the machine has no SSD. |
| maxSSDTotalFileCnt | no | int | The maximum number of Data files allowed by the SSD where the Broker is located, optional field, default 70 |
| maxSSDTotalFileSizes | no | long | The SSD where the Broker is located allows the maximum size of the data file to be saved. The optional field is 32G by default. |
| tcpWriteServiceThread | no | int | Broker supports the number of socket worker threads for TCP production services, optional fields, and defaults to 2 times the number of CPUs of the machine. |
| tcpReadServiceThread | no | int | Broker supports the number of socket worker threads for TCP consumer services, optional fields, defaults to 2 times the number of CPUs of the machine |
| logClearupDurationMs | no | long | The aging cleanup period of the message file, in milliseconds. The default is 3 minutes for a log cleanup operation. The minimum is 1 minutes. |
Expand Down
Binary file modified docs/tubemq_config_introduction_cn.doc
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,6 @@ private ClientMaster.RegisterRequestC2M createMasterRegisterRequest() throws Exc
builder.setSessionTime(this.consumeSubInfo.getSubscribedTime());
builder.addAllTopicList(this.consumeSubInfo.getSubscribedTopics());
builder.setDefFlowCheckId(defFlowCtrlRuleHandler.getFlowCtrlId());
builder.setSsdStoreId(groupFlowCtrlRuleHandler.getSsdTranslateId());
builder.setGroupFlowCheckId(groupFlowCtrlRuleHandler.getFlowCtrlId());
builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
List<SubscribeInfo> subInfoList =
Expand Down Expand Up @@ -945,7 +944,6 @@ private ClientMaster.HeartRequestC2M createMasterHeartbeatRequest(ConsumerEvent
builder.setGroupName(this.consumerConfig.getConsumerGroup());
builder.setReportSubscribeInfo(reportSubscribeInfo);
builder.setDefFlowCheckId(defFlowCtrlRuleHandler.getFlowCtrlId());
builder.setSsdStoreId(groupFlowCtrlRuleHandler.getSsdTranslateId());
builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
builder.setGroupFlowCheckId(groupFlowCtrlRuleHandler.getFlowCtrlId());
if (event != null) {
Expand Down Expand Up @@ -989,7 +987,6 @@ private ClientBroker.RegisterRequestC2B createBrokerRegisterRequest(Partition pa
builder.setOpType(RpcConstants.MSG_OPTYPE_REGISTER);
builder.setTopicName(partition.getTopic());
builder.setPartitionId(partition.getPartitionId());
builder.setSsdStoreId(groupFlowCtrlRuleHandler.getSsdTranslateId());
builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
builder.setReadStatus(getGroupInitReadStatus(rmtDataCache.bookPartition(partition.getPartitionKey())));
TopicProcessor topicProcessor =
Expand Down Expand Up @@ -1042,7 +1039,6 @@ private ClientBroker.HeartBeatRequestC2B createBrokerHeartBeatRequest(
builder.setClientId(consumerId);
builder.setGroupName(this.consumerConfig.getConsumerGroup());
builder.setReadStatus(getGroupInitReadStatus(false));
builder.setSsdStoreId(groupFlowCtrlRuleHandler.getSsdTranslateId());
builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
builder.addAllPartitionInfo(partitionList);
ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
Expand All @@ -1062,16 +1058,12 @@ private void processRegisterAllocAndRspFlowRules(ClientMaster.RegisterResponseM2
? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
if (response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
try {
groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(response.getSsdStoreId(),
TBaseConstants.META_VALUE_UNDEFINED,
groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
} catch (Exception e1) {
logger.warn("[Register response] found parse group flowCtrl rules failure", e1);
}
}
if (response.getSsdStoreId() != groupFlowCtrlRuleHandler.getSsdTranslateId()) {
groupFlowCtrlRuleHandler.setSsdTranslateId(response.getSsdStoreId());
}
if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
}
Expand All @@ -1093,17 +1085,13 @@ private void procHeartBeatRspAllocAndFlowRules(ClientMaster.HeartResponseM2C res
? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
if (response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
try {
groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(response.getSsdStoreId(),
TBaseConstants.META_VALUE_UNDEFINED,
groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
} catch (Exception e1) {
logger.warn(
"[Heartbeat response] found parse group flowCtrl rules failure", e1);
}
}
if (response.getSsdStoreId() != groupFlowCtrlRuleHandler.getSsdTranslateId()) {
groupFlowCtrlRuleHandler.setSsdTranslateId(response.getSsdStoreId());
}
if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
}
Expand Down
Loading

0 comments on commit bccceba

Please sign in to comment.