-
Notifications
You must be signed in to change notification settings - Fork 51
/
event_retry_handler.go
104 lines (95 loc) · 2.64 KB
/
event_retry_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package k8smonitor
import (
"context"
"time"
"go.aporeto.io/enforcerd/internal/extractors/containermetadata"
"go.uber.org/zap"
)
var (
retryWaittimeUnit = time.Second
retryTimeout = time.Second * 30
)
type startEventRetryFunc func(containermetadata.CommonKubernetesContainerMetadata, uint)
func newStartEventRetryFunc(mainCtx context.Context, extractor containermetadata.CommonContainerMetadataExtractor, startEvent startEventFunc) startEventRetryFunc {
return func(kmd containermetadata.CommonKubernetesContainerMetadata, retry uint) {
// we only care about pod sandboxes for restarts
// make sure that we stick to that
if kmd.Kind() != containermetadata.PodSandbox {
zap.L().Debug(
"K8sMonitor: startEventRetry: this is not a pod sandbox. Aborting retry...",
zap.Uint("retry", retry),
zap.String("kind", kmd.Kind().String()),
zap.String("id", kmd.ID()),
)
return
}
// wait before we retry
waitTime := calculateWaitTime(retry)
zap.L().Debug(
"K8sMonitor: startEventRetry: waiting before retry...",
zap.Uint("retry", retry),
zap.Duration("waitTime", waitTime),
zap.String("id", kmd.ID()),
)
select {
case <-mainCtx.Done():
// no point in continuing if the main context is done
return
case <-time.After(waitTime):
}
// check if the sandbox still exists, otherwise we can abort the retries
if !extractor.Has(containermetadata.NewRuncArguments(containermetadata.StartAction, kmd.ID())) {
zap.L().Debug(
"K8sMonitor: startEventRetry: container for start event does not exist any longer. Aborting...",
zap.Uint("retry", retry),
zap.String("id", kmd.ID()),
)
return
}
// now create a new context and retry
// the recursion occurs within the startEvent
ctx, cancel := context.WithTimeout(mainCtx, retryTimeout)
defer cancel()
if err := startEvent(ctx, kmd, retry); err != nil {
zap.L().Error(
"K8sMonitor: startEventRetry: failed to process start event on retry",
zap.Uint("retry", retry),
zap.Error(err),
zap.String("id", kmd.ID()),
zap.String("podUID", kmd.PodUID()),
zap.String("podName", kmd.PodName()),
zap.String("podNamespace", kmd.PodNamespace()),
)
}
}
}
// calculateWaitTime calculates a fibonacci style backoff wait time based on the number of retry
// It uses `retryWaittimeUnit` as the base unit for the wait time
func calculateWaitTime(retry uint) time.Duration {
var n uint
switch retry {
case 0:
n = 0
case 1:
n = 1
case 2:
n = 1
case 3:
n = 2
case 4:
n = 3
case 5:
n = 5
case 6:
n = 8
case 7:
n = 13
case 8:
n = 21
case 9:
n = 34
default:
n = 55
}
return retryWaittimeUnit * time.Duration(n)
}