Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc ip-masq-agent improvements #11317

Merged
merged 3 commits into from May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion Documentation/cmdref/cilium-agent.md
Expand Up @@ -108,7 +108,6 @@ cilium-agent [flags]
--install-iptables-rules Install base iptables rules for cilium to mainly interact with kube-proxy (and masquerading) (default true)
--ip-allocation-timeout duration Time after which an incomplete CIDR allocation is considered failed (default 2m0s)
--ip-masq-agent-config-path string ip-masq-agent configuration file path (default "/etc/config/ip-masq-agent")
--ip-masq-agent-sync-period duration ip-masq-agent configuration file synchronization period (default 1m0s)
--ipam string Backend to use for IPAM (default "hostscope-legacy")
--ipsec-key-file string Path to IPSec key file
--ipv4-node string IPv4 address of node (default "auto")
Expand Down
10 changes: 5 additions & 5 deletions daemon/cmd/daemon_main.go
Expand Up @@ -547,9 +547,6 @@ func init() {
flags.String(option.IPMasqAgentConfigPath, "/etc/config/ip-masq-agent", "ip-masq-agent configuration file path")
option.BindEnv(option.IPMasqAgentConfigPath)

flags.Duration(option.IPMasqAgentSyncPeriod, 60*time.Second, "ip-masq-agent configuration file synchronization period")
option.BindEnv(option.IPMasqAgentSyncPeriod)

flags.Bool(option.InstallIptRules, true, "Install base iptables rules for cilium to mainly interact with kube-proxy (and masquerading)")
option.BindEnv(option.InstallIptRules)

Expand Down Expand Up @@ -1295,8 +1292,11 @@ func runDaemon() {
}

if option.Config.EnableIPMasqAgent {
ipmasq.Start(option.Config.IPMasqAgentConfigPath,
option.Config.IPMasqAgentSyncPeriod)
ipmasqAgent, err := ipmasq.NewIPMasqAgent(option.Config.IPMasqAgentConfigPath)
if err != nil {
log.WithError(err).Fatal("Failed to create ip-masq-agent")
}
ipmasqAgent.Start()
}

if !option.Config.DryMode {
Expand Down
110 changes: 83 additions & 27 deletions pkg/ipmasq/ipmasq.go
Expand Up @@ -15,17 +15,17 @@
package ipmasq

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"strings"
"time"

"gopkg.in/yaml.v2"
"github.com/fsnotify/fsnotify"
"k8s.io/apimachinery/pkg/util/yaml"

"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/maps/ipmasq"
Expand All @@ -38,10 +38,11 @@ var (
// ipnet is a wrapper type for net.IPNet to enable de-serialization of CIDRs
type Ipnet net.IPNet

func (c *Ipnet) UnmarshalYAML(unmarshal func(interface{}) error) error {
var str string
if err := unmarshal(&str); err != nil {
return err
func (c *Ipnet) UnmarshalJSON(json []byte) error {
str := string(json)

if json[0] != '"' {
return fmt.Errorf("Invalid CIDR: %s", str)
}

ip, n, err := net.ParseCIDR(strings.Trim(str, `"`))
Expand All @@ -54,12 +55,11 @@ func (c *Ipnet) UnmarshalYAML(unmarshal func(interface{}) error) error {

*c = Ipnet(*n)
return nil

}

// config represents the ip-masq-agent configuration file encoded as YAML
type config struct {
NonMasqCIDRs []Ipnet `yaml:"nonMasqueradeCIDRs"`
NonMasqCIDRs []Ipnet `json:"nonMasqueradeCIDRs"`
}

// IPMasqMap is an interface describing methods for manipulating an ipmasq map
Expand All @@ -75,35 +75,86 @@ type IPMasqAgent struct {
nonMasqCIDRsFromConfig map[string]net.IPNet
nonMasqCIDRsInMap map[string]net.IPNet
ipMasqMap IPMasqMap
watcher *fsnotify.Watcher
stop chan struct{}
handlerFinished chan struct{}
}

// Start starts the "ip-masq-agent" controller which is used to sync the ipmasq
// BPF maps.
func Start(configPath string, syncPeriod time.Duration) {
start(configPath, syncPeriod, &ipmasq.IPMasqBPFMap{}, controller.NewManager())
func NewIPMasqAgent(configPath string) (*IPMasqAgent, error) {
return newIPMasqAgent(configPath, &ipmasq.IPMasqBPFMap{})
}

func start(configPath string, syncPeriod time.Duration,
ipMasqMap IPMasqMap, manager *controller.Manager) {
func newIPMasqAgent(configPath string, ipMasqMap IPMasqMap) (*IPMasqAgent, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, fmt.Errorf("Failed to create fsnotify watcher: %s", err)
}

configDir := filepath.Dir(configPath)
// The directory of the config should exist at this time, otherwise
// the watcher will fail to add
if err := watcher.Add(configDir); err != nil {
watcher.Close()
return nil, fmt.Errorf("Failed to add %q dir to fsnotify watcher: %s", configDir, err)
brb marked this conversation as resolved.
Show resolved Hide resolved
}

a := &IPMasqAgent{
configPath: configPath,
nonMasqCIDRsInMap: map[string]net.IPNet{},
ipMasqMap: ipMasqMap,
watcher: watcher,
}

return a, nil
}

// Start starts the ip-masq-agent goroutine which tracks the config file and
// updates the BPF map accordingly.
func (a *IPMasqAgent) Start() {
if err := a.restore(); err != nil {
log.WithError(err).Warn("ip-masq-agent failed to restore")
log.WithError(err).Warn("Failed to restore")
}
if err := a.Update(); err != nil {
log.WithError(err).Warn("Failed to update")
}

manager.UpdateController("ip-masq-agent",
controller.ControllerParams{
DoFunc: func(ctx context.Context) error {
return a.Update()
},
RunInterval: syncPeriod,
},
)
a.stop = make(chan struct{})
a.handlerFinished = make(chan struct{})

go func() {
for {
select {
case event := <-a.watcher.Events:
log.Debugf("Received fsnotify event: %+v", event)

if event.Name != a.configPath {
continue
}

switch event.Op {
case fsnotify.Create, fsnotify.Write, fsnotify.Chmod, fsnotify.Remove, fsnotify.Rename:
if err := a.Update(); err != nil {
log.WithError(err).Warn("Failed to update")
}
default:
log.Warnf("Watcher received unknown event: %s. Ignoring.", event)
}
case err := <-a.watcher.Errors:
log.WithError(err).Warn("Watcher received an error")
case <-a.stop:
log.Info("Stopping ip-masq-agent")
close(a.handlerFinished)
return
}
}
}()
}

// Stop stops the ip-masq-agent goroutine and the watcher.
func (a *IPMasqAgent) Stop() {
close(a.stop)
<-a.handlerFinished
a.watcher.Close()
}

// Update updates the ipmasq BPF map entries with ones from the config file.
Expand Down Expand Up @@ -146,8 +197,13 @@ func (a *IPMasqAgent) readConfig() error {
return fmt.Errorf("Failed to read %s: %s", a.configPath, err)
}

if err := yaml.Unmarshal(raw, &cfg); err != nil {
return fmt.Errorf("Failed to de-serialize yaml: %s", err)
jsonStr, err := yaml.ToJSON(raw)
if err != nil {
return fmt.Errorf("Failed to convert to json: %s", err)
}

if err := json.Unmarshal(jsonStr, &cfg); err != nil {
return fmt.Errorf("Failed to de-serialize json: %s", err)
}

nonMasqCIDRs := map[string]net.IPNet{}
Expand Down
106 changes: 84 additions & 22 deletions pkg/ipmasq/ipmasq_test.go
Expand Up @@ -26,66 +26,91 @@ import (

"gopkg.in/check.v1"

"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/lock"
)

func Test(t *testing.T) {
check.TestingT(t)
}

type ipMasqMapMock struct {
lock.RWMutex
cidrs map[string]net.IPNet
}

func (m *ipMasqMapMock) Update(cidr net.IPNet) error {
m.Lock()
defer m.Unlock()

cidrStr := cidr.String()
if _, ok := m.cidrs[cidrStr]; ok {
return fmt.Errorf("CIDR already exists: %s", cidrStr)
}
m.cidrs[cidrStr] = cidr

return nil
}

func (m *ipMasqMapMock) Delete(cidr net.IPNet) error {
m.Lock()
defer m.Unlock()

cidrStr := cidr.String()
if _, ok := m.cidrs[cidrStr]; !ok {
return fmt.Errorf("CIDR not found: %s", cidrStr)
}
delete(m.cidrs, cidrStr)

return nil
}

func (m *ipMasqMapMock) Dump() ([]net.IPNet, error) {
m.RLock()
defer m.RUnlock()

cidrs := make([]net.IPNet, 0, len(m.cidrs))
for _, cidr := range m.cidrs {
cidrs = append(cidrs, cidr)
}

return cidrs, nil
}

func (m *ipMasqMapMock) dumpToSet() map[string]struct{} {
m.RLock()
defer m.RUnlock()

cidrs := make(map[string]struct{}, len(m.cidrs))
for cidrStr := range m.cidrs {
cidrs[cidrStr] = struct{}{}
}

return cidrs
}

type IPMasqTestSuite struct {
ipMasqMap *ipMasqMapMock
manager *controller.Manager
configFile *os.File
ipMasqMap *ipMasqMapMock
ipMasqAgent *IPMasqAgent
configFile *os.File
}

var _ = check.Suite(&IPMasqTestSuite{})

func (i *IPMasqTestSuite) SetUpTest(c *check.C) {
i.ipMasqMap = &ipMasqMapMock{cidrs: map[string]net.IPNet{}}
i.manager = controller.NewManager()

configFile, err := ioutil.TempFile("", "ipmasq-test")
c.Assert(err, check.IsNil)
i.configFile = configFile

start(configFile.Name(), 100*time.Millisecond, i.ipMasqMap, i.manager)
agent, err := newIPMasqAgent(configFile.Name(), i.ipMasqMap)
c.Assert(err, check.IsNil)
i.ipMasqAgent = agent
i.ipMasqAgent.Start()
}

func (i *IPMasqTestSuite) TearDownTest(c *check.C) {
err := i.manager.RemoveController("ip-masq-agent")
c.Assert(err, check.IsNil)

i.ipMasqAgent.Stop()
os.Remove(i.configFile.Name())
}

Expand All @@ -94,10 +119,11 @@ func (i *IPMasqTestSuite) TestUpdate(c *check.C) {
c.Assert(err, check.IsNil)
time.Sleep(300 * time.Millisecond)

c.Assert(len(i.ipMasqMap.cidrs), check.Equals, 2)
_, ok := i.ipMasqMap.cidrs["1.1.1.1/32"]
ipnets := i.ipMasqMap.dumpToSet()
c.Assert(len(ipnets), check.Equals, 2)
_, ok := ipnets["1.1.1.1/32"]
c.Assert(ok, check.Equals, true)
_, ok = i.ipMasqMap.cidrs["2.2.0.0/16"]
_, ok = ipnets["2.2.0.0/16"]
c.Assert(ok, check.Equals, true)

// Write new config
Expand All @@ -107,35 +133,71 @@ func (i *IPMasqTestSuite) TestUpdate(c *check.C) {
c.Assert(err, check.IsNil)
time.Sleep(300 * time.Millisecond)

c.Assert(len(i.ipMasqMap.cidrs), check.Equals, 2)
_, ok = i.ipMasqMap.cidrs["8.8.0.0/16"]
ipnets = i.ipMasqMap.dumpToSet()
c.Assert(len(ipnets), check.Equals, 2)
_, ok = ipnets["8.8.0.0/16"]
c.Assert(ok, check.Equals, true)
_, ok = i.ipMasqMap.cidrs["2.2.0.0/16"]
_, ok = ipnets["2.2.0.0/16"]
c.Assert(ok, check.Equals, true)

// Write new config in JSON
_, err = i.configFile.Seek(0, 0)
c.Assert(err, check.IsNil)
_, err = i.configFile.WriteString(`{"nonMasqueradeCIDRs": ["8.8.0.0/16", "1.1.2.3/16"]}`)
c.Assert(err, check.IsNil)
time.Sleep(300 * time.Millisecond)

ipnets = i.ipMasqMap.dumpToSet()
c.Assert(len(ipnets), check.Equals, 2)
_, ok = ipnets["8.8.0.0/16"]
c.Assert(ok, check.Equals, true)
_, ok = ipnets["1.1.0.0/16"]
c.Assert(ok, check.Equals, true)

// Delete file, should remove the CIDRs
err = os.Remove(i.configFile.Name())
c.Assert(err, check.IsNil)
err = i.configFile.Close()
c.Assert(err, check.IsNil)
time.Sleep(300 * time.Millisecond)
c.Assert(len(i.ipMasqMap.cidrs), check.Equals, 0)
ipnets = i.ipMasqMap.dumpToSet()
c.Assert(len(ipnets), check.Equals, 0)
}

func (i *IPMasqTestSuite) TestRestore(c *check.C) {
err := i.manager.RemoveController("ip-masq-agent")
c.Assert(err, check.IsNil)
// Stop ip-masq-agent goroutine (we can't use i.ipMasqMap.Stop(), as it stops
// the watcher)
close(i.ipMasqAgent.stop)

_, cidr, _ := net.ParseCIDR("3.3.3.0/24")
i.ipMasqMap.cidrs[cidr.String()] = *cidr
_, cidr, _ = net.ParseCIDR("4.4.0.0/16")
i.ipMasqMap.cidrs[cidr.String()] = *cidr

_, err = i.configFile.WriteString("nonMasqueradeCIDRs:\n- 4.4.0.0/16")
_, err := i.configFile.WriteString("nonMasqueradeCIDRs:\n- 4.4.0.0/16")
c.Assert(err, check.IsNil)

start(i.configFile.Name(), 100*time.Millisecond, i.ipMasqMap, i.manager)
i.ipMasqAgent.Start()
time.Sleep(300 * time.Millisecond)

c.Assert(len(i.ipMasqMap.cidrs), check.Equals, 1)
_, ok := i.ipMasqMap.cidrs["4.4.0.0/16"]
ipnets := i.ipMasqMap.dumpToSet()
c.Assert(len(ipnets), check.Equals, 1)
_, ok := ipnets["4.4.0.0/16"]
c.Assert(ok, check.Equals, true)

// Now stop the goroutine, and also remove the maps. It should bootstrap from
// the config
close(i.ipMasqAgent.stop)
i.ipMasqMap = &ipMasqMapMock{cidrs: map[string]net.IPNet{}}
i.ipMasqAgent.ipMasqMap = i.ipMasqMap
_, err = i.configFile.Seek(0, 0)
c.Assert(err, check.IsNil)
_, err = i.configFile.WriteString("nonMasqueradeCIDRs:\n- 3.3.0.0/16")
c.Assert(err, check.IsNil)
i.ipMasqAgent.Start()

ipnets = i.ipMasqMap.dumpToSet()
c.Assert(len(ipnets), check.Equals, 1)
_, ok = ipnets["3.3.0.0/16"]
c.Assert(ok, check.Equals, true)
}