forked from zrepl/zrepl
/
logging_outlets.go
174 lines (147 loc) · 3.56 KB
/
logging_outlets.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package logging
import (
"bytes"
"context"
"crypto/tls"
"io"
"log/syslog"
"net"
"time"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/logger"
)
type EntryFormatter interface {
SetMetadataFlags(flags MetadataFlags)
Format(e *logger.Entry) ([]byte, error)
}
type WriterOutlet struct {
formatter EntryFormatter
writer io.Writer
}
func (h WriterOutlet) WriteEntry(entry logger.Entry) error {
bytes, err := h.formatter.Format(&entry)
if err != nil {
return err
}
_, err = h.writer.Write(bytes)
if err != nil {
return err
}
_, err = h.writer.Write([]byte("\n"))
return err
}
type TCPOutlet struct {
formatter EntryFormatter
// Specifies how much time must pass between a connection error and a reconnection attempt
// Log entries written to the outlet during this time interval are silently dropped.
connect func(ctx context.Context) (net.Conn, error)
entryChan chan *bytes.Buffer
}
func NewTCPOutlet(formatter EntryFormatter, network, address string, tlsConfig *tls.Config, retryInterval time.Duration) *TCPOutlet {
connect := func(ctx context.Context) (conn net.Conn, err error) {
deadl, ok := ctx.Deadline()
if !ok {
deadl = time.Time{}
}
dialer := net.Dialer{
Deadline: deadl,
}
if tlsConfig != nil {
conn, err = tls.DialWithDialer(&dialer, network, address, tlsConfig)
} else {
conn, err = dialer.DialContext(ctx, network, address)
}
return
}
entryChan := make(chan *bytes.Buffer, 1) // allow one message in flight while previos is in io.Copy()
o := &TCPOutlet{
formatter: formatter,
connect: connect,
entryChan: entryChan,
}
go o.outLoop(retryInterval)
return o
}
// FIXME: use this method
func (h *TCPOutlet) Close() {
close(h.entryChan)
}
func (h *TCPOutlet) outLoop(retryInterval time.Duration) {
var retry time.Time
var conn net.Conn
for msg := range h.entryChan {
var err error
for conn == nil {
time.Sleep(time.Until(retry))
ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(retryInterval))
conn, err = h.connect(ctx)
cancel()
if err != nil {
retry = time.Now().Add(retryInterval)
conn = nil
}
}
err = conn.SetWriteDeadline(time.Now().Add(retryInterval))
if err == nil {
_, err = io.Copy(conn, msg)
}
if err != nil {
retry = time.Now().Add(retryInterval)
conn.Close()
conn = nil
}
}
}
func (h *TCPOutlet) WriteEntry(e logger.Entry) error {
ebytes, err := h.formatter.Format(&e)
if err != nil {
return err
}
buf := new(bytes.Buffer)
buf.Write(ebytes)
buf.WriteString("\n")
select {
case h.entryChan <- buf:
return nil
default:
return errors.New("connection broken or not fast enough")
}
}
type SyslogOutlet struct {
Formatter EntryFormatter
RetryInterval time.Duration
Facility syslog.Priority
writer *syslog.Writer
lastConnectAttempt time.Time
}
func (o *SyslogOutlet) WriteEntry(entry logger.Entry) error {
bytes, err := o.Formatter.Format(&entry)
if err != nil {
return err
}
s := string(bytes)
if o.writer == nil {
now := time.Now()
if now.Sub(o.lastConnectAttempt) < o.RetryInterval {
return nil // not an error toward logger
}
o.writer, err = syslog.New(o.Facility, "zrepl")
o.lastConnectAttempt = time.Now()
if err != nil {
o.writer = nil
return err
}
}
switch entry.Level {
case logger.Debug:
return o.writer.Debug(s)
case logger.Info:
return o.writer.Info(s)
case logger.Warn:
return o.writer.Warning(s)
case logger.Error:
return o.writer.Err(s)
default:
return o.writer.Err(s) // write as error as reaching this case is in fact an error
}
}