-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
pod_cidr.go
114 lines (98 loc) · 2.89 KB
/
pod_cidr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package speaker
import (
"sync"
"sync/atomic"
"github.com/sirupsen/logrus"
metallbbgp "go.universe.tf/metallb/pkg/bgp"
metallbspr "go.universe.tf/metallb/pkg/speaker"
"github.com/cilium/cilium/pkg/cidr"
)
var (
emptyAdverts = []*metallbbgp.Advertisement{}
)
// CidrSlice is a slice of Cidr strings with a method set for
// converting them to MetalLB advertisements.
type CidrSlice []string
// ToAdvertisements converts the CidrSlice into metallb Advertisements.
//
// If a cidr cannot be parsed it is omitted from the array of Advertisements
// returned an an error is logged.
func (cs CidrSlice) ToAdvertisements() []*metallbbgp.Advertisement {
var (
l = log.WithFields(logrus.Fields{
"component": "CidrSlice.ToAdvertisements",
})
)
adverts := make([]*metallbbgp.Advertisement, 0, len(cs))
for _, c := range cs {
parsed, err := cidr.ParseCIDR(c)
if err != nil {
continue
}
// only advertise ipv4 addresses
if parsed.IP.To4() == nil {
// log error and continue
l.WithField("advertisement", parsed.IP.String()).
Error("cannot advertise non-v4 prefix")
continue
}
adverts = append(adverts, &metallbbgp.Advertisement{
Prefix: parsed.IPNet,
})
}
return adverts
}
func (s *MetalLBSpeaker) withdraw() {
var (
l = log.WithFields(logrus.Fields{
"component": "MetalLBSpeaker.withdraw",
})
)
// flip this bool so we start rejecting new events from
// entering the queue.
atomic.AddInt32(&s.shutdown, 1)
var wg sync.WaitGroup // waitgroup here since we don't care about errors
for _, session := range s.speaker.PeerSessions() {
wg.Add(1)
go func(sess metallbspr.Session) { // Need an outer closure to capture session.
defer wg.Done()
// providing an empty array of advertisements will
// provoke the BGP controller to withdrawal any currently
// advertised bgp routes.
err := sess.Set(emptyAdverts...)
if err != nil {
l.Error("Failed to gracefully remove BGP routes.")
}
}(session)
}
wg.Wait()
}
// announcePodCidrs will announce the list of cidrs to any
// established BGP sessions.
//
// returning an error from this method will requeue the event
// which triggered the invocation.
//
// This function is not thread safe and should not be called while other functions that modify the underlying speaker
// are being called.
func (s *MetalLBSpeaker) announcePodCIDRs(cidrs CidrSlice) error {
var (
l = log.WithFields(logrus.Fields{
"component": "MetalLBSpeaker.announcePodCidrs",
})
)
ctl := s.speaker.GetBGPController()
// create advertisements
adverts := cidrs.ToAdvertisements()
if len(adverts) == 0 {
// we logged the error above, but return nil
// since we don't want to requeue this event.
return nil
}
l.WithField("advertisements", cidrs).
Info("Advertising CIDRs to all available session")
ctl.SvcAds["podcidr"] = adverts
return ctl.UpdateAds()
}