forked from metallb/metallb
/
main.go
227 lines (197 loc) · 8.36 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"reflect"
"go.universe.tf/metallb/internal/allocator"
"go.universe.tf/metallb/internal/config"
"go.universe.tf/metallb/internal/k8s"
"go.universe.tf/metallb/internal/k8s/controllers"
"go.universe.tf/metallb/internal/k8s/epslices"
"go.universe.tf/metallb/internal/logging"
"go.universe.tf/metallb/internal/version"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
v1 "k8s.io/api/core/v1"
)
// Service offers methods to mutate a Kubernetes service object.
type service interface {
UpdateStatus(svc *v1.Service) error
Infof(svc *v1.Service, desc, msg string, args ...interface{})
Errorf(svc *v1.Service, desc, msg string, args ...interface{})
}
type controller struct {
client service
pools map[string]*config.Pool
ips *allocator.Allocator
}
func (c *controller) SetBalancer(l log.Logger, name string, svcRo *v1.Service, _ epslices.EpsOrSlices) controllers.SyncState {
level.Debug(l).Log("event", "startUpdate", "msg", "start of service update")
defer level.Debug(l).Log("event", "endUpdate", "msg", "end of service update")
if svcRo == nil {
c.deleteBalancer(l, name)
// There might be other LBs stuck waiting for an IP, so when
// we delete a balancer we should reprocess all of them to
// check for newly feasible balancers.
return controllers.SyncStateReprocessAll
}
if c.pools == nil {
// Config hasn't been read, nothing we can do just yet.
level.Debug(l).Log("event", "noConfig", "msg", "not processing, still waiting for config")
return controllers.SyncStateSuccess
}
// Making a copy unconditionally is a bit wasteful, since we don't
// always need to update the service. But, making an unconditional
// copy makes the code much easier to follow, and we have a GC for
// a reason.
svc := svcRo.DeepCopy()
successRes := controllers.SyncStateSuccess
wasAllocated := c.isServiceAllocated(name)
if !c.convergeBalancer(l, name, svc) {
return controllers.SyncStateError
}
if wasAllocated && !c.isServiceAllocated(name) { // convergeBalancer may deallocate our service and this means it did it.
// if the service was deallocated, it may have have left room
// for another one, so we reprocess
level.Info(l).Log("event", "serviceUpdated", "msg", "removed loadbalancer from service, services will be reprocessed")
successRes = controllers.SyncStateReprocessAll
}
if reflect.DeepEqual(svcRo, svc) {
level.Debug(l).Log("event", "noChange", "msg", "service converged, no change")
return successRes
}
toWrite := svcRo.DeepCopy()
if !reflect.DeepEqual(svcRo.Status, svc.Status) {
toWrite.Status = svc.Status
}
if !reflect.DeepEqual(svcRo.Annotations, svc.Annotations) {
toWrite.Annotations = svc.Annotations
}
if !reflect.DeepEqual(toWrite, svcRo) {
if err := c.client.UpdateStatus(svc); err != nil {
level.Error(l).Log("op", "updateServiceStatus", "error", err, "msg", "failed to update service")
return controllers.SyncStateError
}
level.Info(l).Log("event", "serviceUpdated", "msg", "updated service object")
}
level.Info(l).Log("event", "serviceUpdated", "msg", "service is not updated")
return successRes
}
func (c *controller) deleteBalancer(l log.Logger, name string) {
if c.ips.Unassign(name) {
level.Info(l).Log("event", "serviceDeleted", "msg", "service deleted")
}
}
func (c *controller) SetPools(l log.Logger, pools map[string]*config.Pool) controllers.SyncState {
level.Debug(l).Log("event", "startUpdate", "msg", "start of config update")
defer level.Debug(l).Log("event", "endUpdate", "msg", "end of config update")
if pools == nil {
level.Error(l).Log("op", "setConfig", "error", "no MetalLB configuration in cluster", "msg", "configuration is missing, MetalLB will not function")
return controllers.SyncStateErrorNoRetry
}
if err := c.ips.SetPools(pools); err != nil {
level.Error(l).Log("op", "setConfig", "error", err, "msg", "applying new configuration failed")
return controllers.SyncStateError
}
c.pools = pools
return controllers.SyncStateReprocessAll
}
func main() {
var (
port = flag.Int("port", 7472, "HTTP listening port for Prometheus metrics")
namespace = flag.String("namespace", os.Getenv("METALLB_NAMESPACE"), "config / memberlist secret namespace")
mlSecret = flag.String("ml-secret-name", os.Getenv("METALLB_ML_SECRET_NAME"), "name of the memberlist secret to create")
deployName = flag.String("deployment", os.Getenv("METALLB_DEPLOYMENT"), "name of the MetalLB controller Deployment")
logLevel = flag.String("log-level", "info", fmt.Sprintf("log level. must be one of: [%s]", logging.Levels.String()))
disableEpSlices = flag.Bool("disable-epslices", false, "Disable the usage of EndpointSlices and default to Endpoints instead of relying on the autodiscovery mechanism")
enablePprof = flag.Bool("enable-pprof", false, "Enable pprof profiling")
disableCertRotation = flag.Bool("disable-cert-rotation", false, "disable automatic generation and rotation of webhook TLS certificates/keys")
certDir = flag.String("cert-dir", "/tmp/k8s-webhook-server/serving-certs", "The directory where certs are stored")
certServiceName = flag.String("cert-service-name", "webhook-service", "The service name used to generate the TLS cert's hostname")
loadBalancerClass = flag.String("lb-class", "", "load balancer class. When enabled, metallb will handle only services whose spec.loadBalancerClass matches the given lb class")
webhookMode = flag.String("webhook-mode", "enabled", "webhook mode: can be enabled, disabled or only webhook if we want the controller to act as webhook endpoint only")
)
flag.Parse()
logger, err := logging.Init(*logLevel)
if err != nil {
fmt.Printf("failed to initialize logging: %s\n", err)
os.Exit(1)
}
level.Info(logger).Log("version", version.Version(), "commit", version.CommitHash(), "branch", version.Branch(), "goversion", version.GoString(), "msg", "MetalLB controller starting "+version.String())
if *namespace == "" {
bs, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
level.Error(logger).Log("op", "startup", "msg", "Unable to get namespace from pod service account data, please specify --namespace or METALLB_NAMESPACE", "error", err)
os.Exit(1)
}
*namespace = string(bs)
}
c := &controller{
ips: allocator.New(),
}
bgpType, present := os.LookupEnv("METALLB_BGP_TYPE")
if !present {
bgpType = "native"
}
validation := config.ValidationFor(bgpType)
cfg := &k8s.Config{
ProcessName: "metallb-controller",
MetricsPort: *port,
EnablePprof: *enablePprof,
Logger: logger,
DisableEpSlices: *disableEpSlices,
Namespace: *namespace,
Listener: k8s.Listener{
ServiceChanged: c.SetBalancer,
PoolChanged: c.SetPools,
},
ValidateConfig: validation,
EnableWebhook: true,
DisableCertRotation: *disableCertRotation,
CertDir: *certDir,
CertServiceName: *certServiceName,
LoadBalancerClass: *loadBalancerClass,
}
switch *webhookMode {
case "enabled":
case "disabled":
cfg.EnableWebhook = false
case "onlywebhook":
cfg.Listener = k8s.Listener{}
default:
level.Error(logger).Log("op", "startup", "error", "invalid webhookmode value", "value", *webhookMode)
os.Exit(1)
}
client, err := k8s.New(cfg)
if err != nil {
level.Error(logger).Log("op", "startup", "error", err, "msg", "failed to create k8s client")
os.Exit(1)
}
if *mlSecret != "" {
err = client.CreateMlSecret(*namespace, *deployName, *mlSecret)
if err != nil {
level.Error(logger).Log("op", "startup", "error", err, "msg", "failed to create memberlist secret")
os.Exit(1)
}
}
c.client = client
if err := client.Run(nil); err != nil {
level.Error(logger).Log("op", "startup", "error", err, "msg", "failed to run k8s client")
os.Exit(1)
}
}