/
action_status.go
139 lines (115 loc) · 4 KB
/
action_status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package controller
import (
"net"
"sort"
"strings"
"github.com/caddyserver/ingress/internal/k8s"
"go.uber.org/zap"
"gopkg.in/go-playground/pool.v3"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/client-go/kubernetes"
)
// dispatchSync is run every syncInterval duration to sync ingress source address fields.
func (c *CaddyController) dispatchSync() {
c.syncQueue.Add(SyncStatusAction{})
}
// SyncStatusAction provides an implementation of the action interface.
type SyncStatusAction struct {
}
// handle is run when a syncStatusAction appears in the queue.
func (r SyncStatusAction) handle(c *CaddyController) error {
return c.syncStatus(c.resourceStore.Ingresses)
}
// syncStatus ensures that the ingress source address points to this ingress controller's IP address.
func (c *CaddyController) syncStatus(ings []*networkingv1.Ingress) error {
addrs, err := k8s.GetAddresses(c.resourceStore.CurrentPod, c.kubeClient)
if err != nil {
return err
}
c.logger.Debugf("Syncing %d Ingress resources source addresses", len(ings))
c.updateIngStatuses(sliceToLoadBalancerIngress(addrs), ings)
return nil
}
// updateIngStatuses starts a queue and adds all monitored ingresses to update their status source address to the on
// that the ingress controller is running on. This is called by the syncStatus queue.
func (c *CaddyController) updateIngStatuses(controllerAddresses []networkingv1.IngressLoadBalancerIngress, ings []*networkingv1.Ingress) {
p := pool.NewLimited(10)
defer p.Close()
batch := p.Batch()
sort.SliceStable(controllerAddresses, lessLoadBalancerIngress(controllerAddresses))
for _, ing := range ings {
curIPs := ing.Status.LoadBalancer.Ingress
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
// check to see if ingresses source address does not match the ingress controller's.
if ingressSliceEqual(curIPs, controllerAddresses) {
c.logger.Debugf("skipping update of Ingress %v/%v (no change)", ing.Namespace, ing.Name)
continue
}
batch.Queue(runUpdate(c.logger, ing, controllerAddresses, c.kubeClient))
}
batch.QueueComplete()
batch.WaitAll()
}
// runUpdate updates the ingress status field.
func runUpdate(logger *zap.SugaredLogger, ing *networkingv1.Ingress, status []networkingv1.IngressLoadBalancerIngress, client *kubernetes.Clientset) pool.WorkFunc {
return func(wu pool.WorkUnit) (interface{}, error) {
if wu.IsCancelled() {
return nil, nil
}
updated, err := k8s.UpdateIngressStatus(client, ing, status)
if err != nil {
logger.Warnf("error updating ingress rule: %v", err)
} else {
logger.Debugf(
"updating Ingress %v/%v status from %v to %v",
ing.Namespace,
ing.Name,
ing.Status.LoadBalancer.Ingress,
updated.Status.LoadBalancer.Ingress,
)
}
return true, nil
}
}
// ingressSliceEqual determines if the ingress source matches the ingress controller's.
func ingressSliceEqual(lhs, rhs []networkingv1.IngressLoadBalancerIngress) bool {
if len(lhs) != len(rhs) {
return false
}
for i := range lhs {
if lhs[i].IP != rhs[i].IP {
return false
}
if lhs[i].Hostname != rhs[i].Hostname {
return false
}
}
return true
}
// lessLoadBalancerIngress is a sorting function for ingress hostnames.
func lessLoadBalancerIngress(addrs []networkingv1.IngressLoadBalancerIngress) func(int, int) bool {
return func(a, b int) bool {
switch strings.Compare(addrs[a].Hostname, addrs[b].Hostname) {
case -1:
return true
case 1:
return false
}
return addrs[a].IP < addrs[b].IP
}
}
// sliceToLoadBalancerIngress converts a slice of IP and/or hostnames to LoadBalancerIngress
func sliceToLoadBalancerIngress(endpoints []string) []networkingv1.IngressLoadBalancerIngress {
lbi := []networkingv1.IngressLoadBalancerIngress{}
for _, ep := range endpoints {
if net.ParseIP(ep) == nil {
lbi = append(lbi, networkingv1.IngressLoadBalancerIngress{Hostname: ep})
} else {
lbi = append(lbi, networkingv1.IngressLoadBalancerIngress{IP: ep})
}
}
sort.SliceStable(lbi, func(a, b int) bool {
return lbi[a].IP < lbi[b].IP
})
return lbi
}