Skip to content

Commit

Permalink
Address comment of local_ip_detector_linux.go
Browse files Browse the repository at this point in the history
  • Loading branch information
tnqn committed Apr 7, 2021
1 parent c9b08b1 commit 7e9b1c6
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions pkg/agent/controller/egress/local_ip_detector_linux.go
Expand Up @@ -69,6 +69,17 @@ func (d *localIPDetector) notify(ip string, added bool) {
}

func (d *localIPDetector) listAndWatchIPAddresses(stopCh <-chan struct{}) {
// Subscribe IP address update before listing existing IP addresses to prevent event loss.
ch := make(chan netlink.AddrUpdate)
if err := netlink.AddrSubscribeWithOptions(ch, stopCh, netlink.AddrSubscribeOptions{
ErrorCallback: func(err error) {
klog.Errorf("Received error from IP address update subscription: %v", err)
},
}); err != nil {
klog.Errorf("Failed to subscribe IP address update: %v", err)
return
}

// List existing IP addresses first.
addresses, err := netlink.AddrList(nil, netlink.FAMILY_ALL)
if err != nil {
Expand Down Expand Up @@ -98,21 +109,6 @@ func (d *localIPDetector) listAndWatchIPAddresses(stopCh <-chan struct{}) {
d.notify(addr, false)
}

// Subscribe IP address update.
// TODO: It's unlikely but possible that an IP is removed or added in-between the list and the watch calls, in which
// situation the update will be missing. Ideally we should subscribe the update with the option ListExisting to
// avoid it. However, "HasSynced" would not be deterministic as the subscribe function doesn't have a mechanism to
// notify that the list call is done yet. This should be fixed once the subscribe function is improved.
ch := make(chan netlink.AddrUpdate)
if err := netlink.AddrSubscribeWithOptions(ch, stopCh, netlink.AddrSubscribeOptions{
ErrorCallback: func(err error) {
klog.Errorf("Received error from IP address update subscription: %v", err)
},
}); err != nil {
klog.Errorf("Failed to subscribe IP address update: %v", err)
return
}

for {
select {
case <-stopCh:
Expand All @@ -126,11 +122,15 @@ func (d *localIPDetector) listAndWatchIPAddresses(stopCh <-chan struct{}) {
ip := addrUpdate.LinkAddress.IP.String()
d.mutex.Lock()
if addrUpdate.NewAddr {
d.localIPs.Insert(ip)
d.notify(ip, true)
if !d.localIPs.Has(ip) {
d.localIPs.Insert(ip)
d.notify(ip, true)
}
} else {
d.localIPs.Delete(ip)
d.notify(ip, false)
if d.localIPs.Has(ip) {
d.localIPs.Delete(ip)
d.notify(ip, false)
}
}
d.mutex.Unlock()
}
Expand Down

0 comments on commit 7e9b1c6

Please sign in to comment.