Skip to content

Commit

Permalink
fix PullThresholdForQueue and PullThresholdSizeForQueue update (#1140)
Browse files Browse the repository at this point in the history
* seperate interface and implement

* fix panic when close tracedispatcher
  • Loading branch information
wenxuwan committed Apr 23, 2024
1 parent 0e19ee6 commit ee5a175
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 22 deletions.
12 changes: 6 additions & 6 deletions consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ type consumerOptions struct {
// Concurrently max span offset.it has no effect on sequential consumption
ConsumeConcurrentlyMaxSpan int

// Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
// Flow control threshold on queue level, each message queue will cache at most 1024 messages by default,
// Consider the {PullBatchSize}, the instantaneous value may exceed the limit
PullThresholdForQueue int64
PullThresholdForQueue atomic.Int64

// Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
// Limit the cached message size on queue level, each message queue will cache at most 512 MiB messages by default,
// Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
//
// The size of a message only measured by message body, so it's not accurate
PullThresholdSizeForQueue int
PullThresholdSizeForQueue atomic.Int32

// Flow control threshold on topic level, default value is -1(Unlimited)
//
Expand Down Expand Up @@ -198,13 +198,13 @@ func WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option {

func WithPullThresholdForQueue(pullThresholdForQueue int64) Option {
return func(options *consumerOptions) {
options.PullThresholdForQueue = pullThresholdForQueue
options.PullThresholdForQueue.Store(pullThresholdForQueue)
}
}

func WithPullThresholdSizeForQueue(pullThresholdSizeForQueue int) Option {
return func(options *consumerOptions) {
options.PullThresholdSizeForQueue = pullThresholdSizeForQueue
options.PullThresholdSizeForQueue.Store(int32(pullThresholdSizeForQueue))
}
}

Expand Down
32 changes: 16 additions & 16 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,23 +518,23 @@ func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*pr
if newVal == 0 {
newVal = 1
}
rlog.Info("The PullThresholdForTopic is changed", map[string]interface{}{
rlog.LogKeyValueChangedFrom: pc.option.PullThresholdForTopic,
rlog.Info("The PullThresholdForQueue is changed", map[string]interface{}{
rlog.LogKeyValueChangedFrom: pc.option.PullThresholdForQueue.Load(),
rlog.LogKeyValueChangedTo: newVal,
})
pc.option.PullThresholdForTopic = newVal
pc.option.PullThresholdForQueue.Store(int64(newVal))
}

if pc.option.PullThresholdSizeForTopic != -1 {
newVal := pc.option.PullThresholdSizeForTopic / count
if newVal == 0 {
newVal = 1
}
rlog.Info("The PullThresholdSizeForTopic is changed", map[string]interface{}{
rlog.LogKeyValueChangedFrom: pc.option.PullThresholdSizeForTopic,
rlog.Info("The PullThresholdSizeForQueue is changed", map[string]interface{}{
rlog.LogKeyValueChangedFrom: pc.option.PullThresholdSizeForQueue.Load(),
rlog.LogKeyValueChangedTo: newVal,
})
pc.option.PullThresholdSizeForTopic = newVal
pc.option.PullThresholdSizeForQueue.Store(int32(newVal))
}
}
pc.client.SendHeartbeatToAllBrokerWithLock()
Expand Down Expand Up @@ -564,9 +564,9 @@ func (pc *pushConsumer) validate() error {
}
}

if pc.option.PullThresholdForQueue < 1 || pc.option.PullThresholdForQueue > 65535 {
if pc.option.PullThresholdForQueue == 0 {
pc.option.PullThresholdForQueue = 1024
if pc.option.PullThresholdForQueue.Load() < 1 || pc.option.PullThresholdForQueue.Load() > 65535 {
if pc.option.PullThresholdForQueue.Load() == 0 {
pc.option.PullThresholdForQueue.Store(1024)
} else {
return errors.New("option.PullThresholdForQueue out of range [1, 65535]")
}
Expand All @@ -580,9 +580,9 @@ func (pc *pushConsumer) validate() error {
}
}

if pc.option.PullThresholdSizeForQueue < 1 || pc.option.PullThresholdSizeForQueue > 1024 {
if pc.option.PullThresholdSizeForQueue == 0 {
pc.option.PullThresholdSizeForQueue = 512
if pc.option.PullThresholdSizeForQueue.Load() < 1 || pc.option.PullThresholdSizeForQueue.Load() > 1024 {
if pc.option.PullThresholdSizeForQueue.Load() == 0 {
pc.option.PullThresholdSizeForQueue.Store(512)
} else {
return errors.New("option.PullThresholdSizeForQueue out of range [1, 1024]")
}
Expand Down Expand Up @@ -693,10 +693,10 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
}

cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb)
if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue {
if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue.Load() {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdForQueue": pc.option.PullThresholdForQueue,
"PullThresholdForQueue": pc.option.PullThresholdForQueue.Load(),
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count": pq.cachedMsgCount,
Expand All @@ -710,10 +710,10 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
goto NEXT
}

if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {
if cachedMessageSizeInMiB > int(pc.option.PullThresholdSizeForQueue.Load()) {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue,
"PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue.Load(),
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count": pq.cachedMsgCount,
Expand Down
1 change: 1 addition & 0 deletions internal/utils/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func UnCompress(data []byte) []byte {
if err != nil {
return data
}
defer r.Close()
retData, err := ioutil.ReadAll(r)
if err != nil {
return data
Expand Down

0 comments on commit ee5a175

Please sign in to comment.