-
Notifications
You must be signed in to change notification settings - Fork 687
/
accesslog.go
80 lines (74 loc) · 2.14 KB
/
accesslog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package test
import (
"fmt"
"sync"
"time"
alf "github.com/datawire/ambassador/pkg/api/envoy/data/accesslog/v2"
als "github.com/datawire/ambassador/pkg/api/envoy/service/accesslog/v2"
alsgrpc "github.com/datawire/ambassador/pkg/api/envoy/service/accesslog/v2"
)
// AccessLogService buffers access logs from the remote Envoy nodes.
type AccessLogService struct {
entries []string
mu sync.Mutex
}
func (svc *AccessLogService) log(entry string) {
svc.mu.Lock()
defer svc.mu.Unlock()
svc.entries = append(svc.entries, entry)
}
// Dump releases the collected log entries and clears the log entry list.
func (svc *AccessLogService) Dump(f func(string)) {
svc.mu.Lock()
defer svc.mu.Unlock()
for _, entry := range svc.entries {
f(entry)
}
svc.entries = nil
}
// StreamAccessLogs implements the access log service.
func (svc *AccessLogService) StreamAccessLogs(stream alsgrpc.AccessLogService_StreamAccessLogsServer) error {
var logName string
for {
msg, err := stream.Recv()
if err != nil {
return nil
}
if msg.Identifier != nil {
logName = msg.Identifier.LogName
}
switch entries := msg.LogEntries.(type) {
case *als.StreamAccessLogsMessage_HttpLogs:
for _, entry := range entries.HttpLogs.LogEntry {
if entry != nil {
common := entry.CommonProperties
req := entry.Request
resp := entry.Response
if common == nil {
common = &alf.AccessLogCommon{}
}
if req == nil {
req = &alf.HTTPRequestProperties{}
}
if resp == nil {
resp = &alf.HTTPResponseProperties{}
}
svc.log(fmt.Sprintf("[%s%s] %s %s %s %d %s %s",
logName, time.Now().Format(time.RFC3339), req.Authority, req.Path, req.Scheme,
resp.ResponseCode.GetValue(), req.RequestId, common.UpstreamCluster))
}
}
case *als.StreamAccessLogsMessage_TcpLogs:
for _, entry := range entries.TcpLogs.LogEntry {
if entry != nil {
common := entry.CommonProperties
if common == nil {
common = &alf.AccessLogCommon{}
}
svc.log(fmt.Sprintf("[%s%s] tcp %s %s",
logName, time.Now().Format(time.RFC3339), common.UpstreamLocalAddress, common.UpstreamCluster))
}
}
}
}
}