Skip to content

Commit 38301f4

Browse files
committed
Fix to write fluent asynchronously
1 parent 884b834 commit 38301f4

File tree

1 file changed

+47
-9
lines changed

1 file changed

+47
-9
lines changed

logger.go

+47-9
Original file line numberDiff line numberDiff line change
@@ -13,37 +13,48 @@ import (
1313
)
1414

1515
var (
16-
defaultAddress = "127.0.0.1:24224"
16+
defaultAddress = "127.0.0.1:24224"
17+
defaultFlushInterval = 5 * time.Second
1718
)
1819

1920
type Config struct {
2021
Address string
2122
ConnectionTimeout time.Duration
23+
FlushInterval time.Duration
2224
}
2325

2426
func withDefaultConfig(c Config) Config {
2527
if c.Address == "" {
2628
c.Address = defaultAddress
2729
}
30+
if c.FlushInterval == 0 {
31+
c.FlushInterval = defaultFlushInterval
32+
}
2833
return c
2934
}
3035

3136
type Logger struct {
32-
conf Config
33-
conn io.WriteCloser
34-
bmu sync.Mutex
35-
cmu sync.Mutex
36-
buf []byte
37+
conf Config
38+
conn io.WriteCloser
39+
bmu sync.Mutex
40+
cmu sync.Mutex
41+
buf []byte
42+
wg sync.WaitGroup
43+
done chan struct{}
44+
dirty chan struct{}
3745
}
3846

3947
func NewLogger(c Config) (*Logger, error) {
4048
logger := &Logger{
41-
conf: withDefaultConfig(c),
42-
buf: []byte{},
49+
conf: withDefaultConfig(c),
50+
buf: []byte{},
51+
done: make(chan struct{}),
52+
dirty: make(chan struct{}),
4353
}
4454
if err := logger.connect(); err != nil {
4555
return nil, err
4656
}
57+
logger.start()
4758
return logger, nil
4859
}
4960

@@ -69,10 +80,14 @@ func (logger *Logger) PostWithTime(tag string, t time.Time, obj interface{}) err
6980
logger.buf = append(logger.buf, raw...)
7081
logger.bmu.Unlock()
7182

72-
return logger.send()
83+
go func() {
84+
logger.dirty <- struct{}{}
85+
}()
86+
return nil
7387
}
7488

7589
func (logger *Logger) Close() error {
90+
logger.stop()
7691
return logger.disconnect()
7792
}
7893

@@ -137,3 +152,26 @@ func (logger *Logger) send() error {
137152

138153
return err
139154
}
155+
156+
func (logger *Logger) start() {
157+
ticker := time.NewTicker(logger.conf.FlushInterval)
158+
logger.wg.Add(1)
159+
go func() {
160+
defer logger.wg.Done()
161+
for {
162+
select {
163+
case <-logger.done:
164+
logger.send()
165+
return
166+
case <-logger.dirty:
167+
case <-ticker.C:
168+
}
169+
logger.send()
170+
}
171+
}()
172+
}
173+
174+
func (logger *Logger) stop() {
175+
close(logger.done)
176+
logger.wg.Wait()
177+
}

0 commit comments

Comments
 (0)