Skip to content

Commit d0cdf96

Browse files
committed
Change to use a pool of byte buffer to encode data
1 parent e0f40b1 commit d0cdf96

File tree

3 files changed

+61
-19
lines changed

3 files changed

+61
-19
lines changed

buffer.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,44 +5,43 @@ import (
55
)
66

77
type buffer struct {
8-
buf []byte
8+
buf []*Message
99
mu sync.Mutex
1010
Dirty chan struct{}
1111
}
1212

1313
func newBuffer() buffer {
1414
return buffer{
15-
buf: []byte{},
1615
Dirty: make(chan struct{}),
1716
}
1817
}
1918

20-
func (buffer *buffer) Add(raw []byte) {
19+
func (buffer *buffer) Add(message *Message) {
2120
buffer.mu.Lock()
2221
defer buffer.mu.Unlock()
2322

24-
buffer.buf = append(buffer.buf, raw...)
23+
buffer.buf = append(buffer.buf, message)
2524
go func() {
2625
buffer.Dirty <- struct{}{}
2726
}()
2827
}
2928

30-
func (buffer *buffer) Remove() []byte {
29+
func (buffer *buffer) Remove() []*Message {
3130
buffer.mu.Lock()
3231
defer buffer.mu.Unlock()
3332

3433
if len(buffer.buf) == 0 {
3534
return nil
3635
}
3736

38-
data := buffer.buf
37+
m := buffer.buf
3938
buffer.buf = buffer.buf[:0]
40-
return data
39+
return m
4140
}
4241

43-
func (buffer *buffer) Back(raw []byte) {
42+
func (buffer *buffer) Back(messages []*Message) {
4443
buffer.mu.Lock()
4544
defer buffer.mu.Unlock()
4645

47-
buffer.buf = append(buffer.buf, raw...)
46+
buffer.buf = append(buffer.buf, messages...)
4847
}

logger.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"sync"
88
"time"
99

10-
"bytes"
11-
1210
"gopkg.in/vmihailenco/msgpack.v2"
1311
)
1412

@@ -66,12 +64,12 @@ func (logger *Logger) PostWithTime(tag string, t time.Time, obj interface{}) err
6664
obj,
6765
}
6866

69-
buf := bytes.NewBuffer([]byte{})
70-
enc := msgpack.NewEncoder(buf)
67+
m := getMessage()
68+
enc := msgpack.NewEncoder(m.buf)
7169
if err := enc.Encode(record); err != nil {
7270
return err
7371
}
74-
logger.buf.Add(buf.Bytes())
72+
logger.buf.Add(m)
7573
return nil
7674
}
7775

@@ -119,24 +117,31 @@ func (logger *Logger) disconnect() error {
119117
const maxWriteAttempts = 3
120118

121119
func (logger *Logger) send() error {
122-
data := logger.buf.Remove()
123-
if len(data) == 0 {
120+
messages := logger.buf.Remove()
121+
if len(messages) == 0 {
124122
return nil
125123
}
124+
var data []byte
125+
for _, m := range messages {
126+
data = append(data, m.buf.Bytes()...)
127+
}
126128

127129
var err error
128130
for i := 0; i < maxWriteAttempts; i++ {
129131
err = logger.connect()
130132
if err == nil {
131133
_, err := logger.conn.Write(data)
132134
if err == nil {
135+
for _, m := range messages {
136+
putMessage(m)
137+
}
133138
break
134139
}
135140
}
136141
logger.disconnect()
137142
}
138143
if err != nil {
139-
logger.buf.Back(data)
144+
logger.buf.Back(messages)
140145
}
141146
return err
142147
}
@@ -149,12 +154,18 @@ func (logger *Logger) start() {
149154
for {
150155
select {
151156
case <-logger.done:
152-
logger.send()
157+
err := logger.send()
158+
if err != nil {
159+
panic(err)
160+
}
153161
return
154162
case <-logger.buf.Dirty:
155163
case <-ticker.C:
156164
}
157-
logger.send()
165+
err := logger.send()
166+
if err != nil {
167+
panic(err)
168+
}
158169
}
159170
}()
160171
}

message.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package fluent
2+
3+
import (
4+
"bytes"
5+
"sync"
6+
)
7+
8+
type Message struct {
9+
buf *bytes.Buffer
10+
}
11+
12+
var messagePool = sync.Pool{
13+
New: func() interface{} {
14+
return newMessage()
15+
},
16+
}
17+
18+
func getMessage() *Message {
19+
message := messagePool.Get().(*Message)
20+
message.buf.Reset()
21+
return message
22+
}
23+
24+
func putMessage(message *Message) {
25+
messagePool.Put(message)
26+
}
27+
28+
func newMessage() *Message {
29+
return &Message{
30+
buf: bytes.NewBuffer([]byte{}),
31+
}
32+
}

0 commit comments

Comments
 (0)