Skip to content

Commit af41bfd

Browse files
committed
Add ErrorHandlerFunc
1 parent cd0e0b7 commit af41bfd

File tree

4 files changed

+118
-14
lines changed

4 files changed

+118
-14
lines changed

benchmark/benchmark_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ func BenchmarkStructDaichirata(b *testing.B) {
140140
panic(err)
141141
}
142142
defer logger.Close()
143+
logger.ErrorHandler = daichirata.ErrorHandlerFunc(func(err error, _ []byte) error {
144+
panic(err)
145+
})
143146

144147
b.ResetTimer()
145148
for i := 0; i < b.N; i++ {
@@ -170,6 +173,9 @@ func BenchmarkMapDaichirata(b *testing.B) {
170173
panic(err)
171174
}
172175
defer logger.Close()
176+
logger.ErrorHandler = daichirata.ErrorHandlerFunc(func(err error, _ []byte) error {
177+
panic(err)
178+
})
173179

174180
b.ResetTimer()
175181
for i := 0; i < b.N; i++ {

buffer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ func (buffer *buffer) Remove() []*Message {
3838
}
3939

4040
var messages []*Message
41-
messages = append(messages, buffer.new...)
4241
messages = append(messages, buffer.pending...)
42+
messages = append(messages, buffer.new...)
4343

44-
buffer.new = buffer.new[:0]
4544
buffer.pending = buffer.pending[:0]
45+
buffer.new = buffer.new[:0]
4646
return messages
4747
}
4848

error_handler.go

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package fluent
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
9+
"gopkg.in/vmihailenco/msgpack.v2"
10+
)
11+
12+
type ErrorHandler interface {
13+
HandleError(error, []byte) error
14+
}
15+
16+
type ErrorHandlerFunc func(error, []byte) error
17+
18+
func (f ErrorHandlerFunc) HandleError(err error, data []byte) error {
19+
return f(err, data)
20+
}
21+
22+
type FallbackHandler struct {
23+
logger *Logger
24+
}
25+
26+
func NewFallbackHandler(logger *Logger) *FallbackHandler {
27+
return &FallbackHandler{
28+
logger: logger,
29+
}
30+
}
31+
32+
func (h *FallbackHandler) HandleError(_ error, data []byte) error {
33+
return h.logger.writeWithBreaker(data)
34+
}
35+
36+
type FallbackJSONHandler struct {
37+
io io.Writer
38+
}
39+
40+
func NewFallbackJSONHandler(io io.Writer) *FallbackJSONHandler {
41+
return &FallbackJSONHandler{io: io}
42+
}
43+
44+
func (h *FallbackJSONHandler) HandleError(_ error, data []byte) error {
45+
r := bytes.NewReader(data)
46+
d := msgpack.NewDecoder(r)
47+
48+
for r.Len() > 0 {
49+
var tag string
50+
var timestamp uint64
51+
iRecord := map[interface{}]interface{}{}
52+
53+
message := []interface{}{&tag, &timestamp, &iRecord}
54+
if err := d.Decode(&message); err != nil {
55+
return err
56+
}
57+
record := h.castMap(iRecord)
58+
59+
str, err := json.Marshal([]interface{}{tag, timestamp, record})
60+
if err != nil {
61+
return err
62+
}
63+
_, err = h.io.Write(append(str, "\n"...))
64+
if err != nil {
65+
return err
66+
}
67+
}
68+
69+
return nil
70+
}
71+
72+
func (h *FallbackJSONHandler) castMap(in map[interface{}]interface{}) map[string]interface{} {
73+
res := make(map[string]interface{})
74+
for k, v := range in {
75+
res[fmt.Sprintf("%v", k)] = h.castValue(v)
76+
}
77+
return res
78+
}
79+
80+
func (h *FallbackJSONHandler) castValue(iv interface{}) interface{} {
81+
switch val := iv.(type) {
82+
case map[interface{}]interface{}:
83+
return h.castMap(val)
84+
case []interface{}:
85+
ia := make([]interface{}, len(val))
86+
for i, v := range val {
87+
ia[i] = h.castValue(v)
88+
}
89+
return ia
90+
default:
91+
return iv
92+
}
93+
}

logger.go

+17-12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fluent
22

33
import (
4+
"fmt"
45
"io"
56
"net"
67
"strings"
@@ -19,8 +20,9 @@ var (
1920
type Config struct {
2021
Address string
2122
ConnectionTimeout time.Duration
22-
FlushInterval time.Duration
2323
FailureThreshold int64
24+
FlushInterval time.Duration
25+
PendingLimit int
2426
}
2527

2628
func withDefaultConfig(c Config) Config {
@@ -37,6 +39,8 @@ func withDefaultConfig(c Config) Config {
3739
}
3840

3941
type Logger struct {
42+
ErrorHandler ErrorHandler
43+
4044
conf Config
4145
conn io.WriteCloser
4246
buf buffer
@@ -153,10 +157,17 @@ func (logger *Logger) send() error {
153157
for _, m := range messages {
154158
data = append(data, m.buf.Bytes()...)
155159
}
156-
err := logger.write(data)
160+
161+
err := logger.writeWithBreaker(data)
157162
if err != nil {
158-
logger.buf.Back(messages)
159-
return err
163+
if logger.ErrorHandler != nil && len(messages) > logger.conf.PendingLimit {
164+
err = logger.ErrorHandler.HandleError(err, data)
165+
}
166+
if err != nil {
167+
fmt.Println(err)
168+
logger.buf.Back(messages)
169+
return err
170+
}
160171
}
161172

162173
for _, m := range messages {
@@ -173,18 +184,12 @@ func (logger *Logger) start() {
173184
for {
174185
select {
175186
case <-logger.done:
176-
err := logger.send()
177-
if err != nil {
178-
panic(err)
179-
}
187+
logger.send()
180188
return
181189
case <-logger.buf.Dirty:
182190
case <-ticker.C:
183191
}
184-
err := logger.send()
185-
if err != nil {
186-
panic(err)
187-
}
192+
logger.send()
188193
}
189194
}()
190195
}

0 commit comments

Comments
 (0)