Skip to content

Commit b81248d

Browse files
committed
Fix to separate newly added data and pending data in buffer
1 parent 2620523 commit b81248d

1 file changed

Lines changed: 19 additions & 12 deletions

File tree

buffer.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ import (
55
)
66

77
type buffer struct {
8-
buf []*Message
9-
mu sync.Mutex
10-
Dirty chan struct{}
8+
new []*Message
9+
pending []*Message
10+
mu sync.Mutex
11+
Dirty chan struct{}
1112
}
1213

1314
func newBuffer() buffer {
@@ -20,28 +21,34 @@ func (buffer *buffer) Add(message *Message) {
2021
buffer.mu.Lock()
2122
defer buffer.mu.Unlock()
2223

23-
buffer.buf = append(buffer.buf, message)
24-
go func() {
25-
buffer.Dirty <- struct{}{}
26-
}()
24+
buffer.new = append(buffer.new, message)
25+
if len(buffer.new) == 1 {
26+
go func() {
27+
buffer.Dirty <- struct{}{}
28+
}()
29+
}
2730
}
2831

2932
func (buffer *buffer) Remove() []*Message {
3033
buffer.mu.Lock()
3134
defer buffer.mu.Unlock()
3235

33-
if len(buffer.buf) == 0 {
36+
if len(buffer.new) == 0 && len(buffer.pending) == 0 {
3437
return nil
3538
}
3639

37-
m := buffer.buf
38-
buffer.buf = buffer.buf[:0]
39-
return m
40+
var messages []*Message
41+
messages = append(messages, buffer.new...)
42+
messages = append(messages, buffer.pending...)
43+
44+
buffer.new = buffer.new[:0]
45+
buffer.pending = buffer.pending[:0]
46+
return messages
4047
}
4148

4249
func (buffer *buffer) Back(messages []*Message) {
4350
buffer.mu.Lock()
4451
defer buffer.mu.Unlock()
4552

46-
buffer.buf = append(buffer.buf, messages...)
53+
buffer.pending = append(buffer.pending, messages...)
4754
}

0 commit comments

Comments
 (0)