Skip to content

Commit

Permalink
[ISSUE #139]add locker for updating data (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolftankk authored and zongtanghu committed Aug 13, 2019
1 parent 5e093c8 commit 08cd71e
Showing 1 changed file with 4 additions and 0 deletions.
4 changes: 4 additions & 0 deletions consumer/offset_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType)
}

func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
local.mutex.Lock()
defer local.mutex.Unlock()
rlog.Debugf("update offset: %s to %d", mq, offset)
localOffset, exist := local.OffsetTable[mq.Topic]
if !exist {
Expand Down Expand Up @@ -149,6 +151,8 @@ func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) {
if len(mqs) == 0 {
return
}
local.mutex.Lock()
defer local.mutex.Unlock()
table := make(map[string]map[int]*queueOffset)
for idx := range mqs {
mq := mqs[idx]
Expand Down

0 comments on commit 08cd71e

Please sign in to comment.