-
Notifications
You must be signed in to change notification settings - Fork 40
/
log_rate_limiter.go
127 lines (112 loc) · 4.17 KB
/
log_rate_limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package log_streamer
import (
"context"
"fmt"
"sync/atomic"
"time"
loggingclient "code.cloudfoundry.org/diego-logging-client"
"golang.org/x/time/rate"
)
const (
AppInstanceExceededLogRateLimitCount = "AppInstanceExceededLogRateLimitCount"
LogRateLimitExceededLogInterval = time.Second
)
// A logRateLimiter is used by the streamDestination to limit logs from an app instance.
// This can be done by limiting the number of lines per second,
// or by limiting the number of bytes per second.
type logRateLimiter struct {
ctx context.Context
metronClient loggingclient.IngressClient
maxLogLinesPerSecond int
maxLogBytesPerSecond int64
maxLogLinesPerSecondLimiter *rate.Limiter
maxLogBytesPerSecondLimiter *rate.Limiter
metricReportLimiter *rate.Limiter
logReportLimiter *rate.Limiter
logMetricsEmitInterval time.Duration
bytesEmittedLastInterval uint64
needToReportOverlimitMessage atomic.Value
tags map[string]string
}
func NewLogRateLimiter(
ctx context.Context,
metronClient loggingclient.IngressClient,
tags map[string]string,
maxLogLinesPerSecond int,
maxLogBytesPerSecond int64,
logMetricsEmitInterval time.Duration,
) *logRateLimiter {
var needToReportOverlimitMessage atomic.Value
needToReportOverlimitMessage.Store(true)
limiter := &logRateLimiter{
ctx: ctx,
metronClient: metronClient,
maxLogLinesPerSecond: maxLogLinesPerSecond,
maxLogBytesPerSecond: maxLogBytesPerSecond,
logMetricsEmitInterval: logMetricsEmitInterval,
bytesEmittedLastInterval: 0,
tags: tags,
needToReportOverlimitMessage: needToReportOverlimitMessage,
}
if maxLogLinesPerSecond > 0 {
limiter.maxLogLinesPerSecondLimiter = rate.NewLimiter(rate.Limit(maxLogLinesPerSecond), maxLogLinesPerSecond)
} else {
limiter.maxLogLinesPerSecondLimiter = rate.NewLimiter(rate.Inf, 0)
}
if limiter.maxLogBytesPerSecond > -1 {
limiter.maxLogBytesPerSecondLimiter = rate.NewLimiter(rate.Limit(maxLogBytesPerSecond), int(maxLogBytesPerSecond))
} else {
limiter.maxLogBytesPerSecondLimiter = rate.NewLimiter(rate.Inf, 0)
}
go limiter.emitMetrics()
return limiter
}
// Limit is called before logging to determine if the log should be dropped (returns err) or logged (returns nil).
func (r *logRateLimiter) Limit(sourceName string, logLength int) error {
if r.maxLogBytesPerSecond == 0 {
return fmt.Errorf("Not allowed to log")
}
if !r.maxLogBytesPerSecondLimiter.AllowN(time.Now(), logLength) {
reportMessage := fmt.Sprintf("app instance exceeded log rate limit (%d bytes/sec)", r.maxLogBytesPerSecond)
r.reportOverlimit(sourceName, reportMessage)
return fmt.Errorf(reportMessage)
}
if !r.maxLogLinesPerSecondLimiter.Allow() {
reportMessage := fmt.Sprintf("app instance exceeded log rate limit (%d log-lines/sec) set by platform operator", r.maxLogLinesPerSecond)
r.reportOverlimit(sourceName, reportMessage)
return fmt.Errorf(reportMessage)
}
atomic.AddUint64(&r.bytesEmittedLastInterval, uint64(logLength))
r.needToReportOverlimitMessage.Store(true)
return nil
}
func (r *logRateLimiter) emitMetrics() {
if r.logMetricsEmitInterval <= 0 {
return
}
t := time.NewTicker(r.logMetricsEmitInterval)
defer t.Stop()
intervalDivider := r.logMetricsEmitInterval.Seconds()
for {
select {
case <-t.C:
lastIntervalEmitted := atomic.SwapUint64(&r.bytesEmittedLastInterval, 0)
perSecondValue := float64(lastIntervalEmitted) / intervalDivider
r.metronClient.SendAppLogRate(perSecondValue, float64(r.maxLogBytesPerSecond), r.tags)
case <-r.ctx.Done():
return
}
}
}
func (r *logRateLimiter) reportOverlimit(sourceName string, reportMessage string) {
if r.needToReportOverlimitMessage.CompareAndSwap(true, false) {
r.reportLogRateLimitExceededMetric()
r.reportLogRateLimitExceededLog(sourceName, reportMessage)
}
}
func (r *logRateLimiter) reportLogRateLimitExceededMetric() {
_ = r.metronClient.IncrementCounter(AppInstanceExceededLogRateLimitCount)
}
func (r *logRateLimiter) reportLogRateLimitExceededLog(sourceName string, reportMessage string) {
_ = r.metronClient.SendAppLog(reportMessage, sourceName, r.tags)
}