Skip to content

Commit

Permalink
[ISSUE #568] Update lastPullTime use atomic.Value as same with lastCo…
Browse files Browse the repository at this point in the history
…nsumeTime and lastLockTime (#613)
  • Loading branch information
jerry-tao committed Mar 16, 2021
1 parent 05929da commit d6e66a2
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
19 changes: 15 additions & 4 deletions consumer/process_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type processQueue struct {
consumeLock sync.Mutex
consumingMsgOrderlyTreeMap *treemap.Map
dropped *uatomic.Bool
lastPullTime time.Time
lastPullTime atomic.Value
lastConsumeTime atomic.Value
locked *uatomic.Bool
lastLockTime atomic.Value
Expand All @@ -69,9 +69,12 @@ func newProcessQueue(order bool) *processQueue {
lastLockTime := atomic.Value{}
lastLockTime.Store(time.Now())

lastPullTime := atomic.Value{}
lastPullTime.Store(time.Now())

pq := &processQueue{
msgCache: treemap.NewWith(utils.Int64Comparator),
lastPullTime: time.Now(),
lastPullTime: lastPullTime,
lastConsumeTime: lastConsumeTime,
lastLockTime: lastLockTime,
msgCh: make(chan []*primitive.MessageExt, 32),
Expand Down Expand Up @@ -157,6 +160,14 @@ func (pq *processQueue) LastLockTime() time.Time {
return pq.lastLockTime.Load().(time.Time)
}

func (pq *processQueue) LastPullTime() time.Time {
return pq.lastPullTime.Load().(time.Time)
}

func (pq *processQueue) UpdateLastPullTime() {
pq.lastPullTime.Store(time.Now())
}

func (pq *processQueue) makeMessageToCosumeAgain(messages ...*primitive.MessageExt) {
pq.mutex.Lock()
for _, msg := range messages {
Expand Down Expand Up @@ -199,7 +210,7 @@ func (pq *processQueue) isLockExpired() bool {
}

func (pq *processQueue) isPullExpired() bool {
return time.Now().Sub(pq.lastPullTime) > _PullMaxIdleTime
return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime
}

func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
Expand Down Expand Up @@ -360,7 +371,7 @@ func (pq *processQueue) currentInfo() internal.ProcessQueueInfo {
TryUnlockTimes: pq.tryUnlockTimes,
LastLockTimestamp: pq.LastLockTime().UnixNano() / int64(time.Millisecond),
Dropped: pq.dropped.Load(),
LastPullTimestamp: pq.lastPullTime.UnixNano() / int64(time.Millisecond),
LastPullTimestamp: pq.LastPullTime().UnixNano() / int64(time.Millisecond),
LastConsumeTimestamp: pq.LastConsumeTime().UnixNano() / int64(time.Millisecond),
}

Expand Down
2 changes: 1 addition & 1 deletion consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
}
// reset time
sleepTime = pc.option.PullInterval
pq.lastPullTime = time.Now()
pq.lastPullTime.Store(time.Now())
err := pc.makeSureStateOK()
if err != nil {
rlog.Warning("consumer state error", map[string]interface{}{
Expand Down

0 comments on commit d6e66a2

Please sign in to comment.