From 270ac2b70c1f7ac83bce5a5b41a72b70e88ee607 Mon Sep 17 00:00:00 2001 From: Martynas Pumputis Date: Mon, 4 May 2020 17:12:05 +0200 Subject: [PATCH 1/3] ipmasq: Allow to configure in JSON In addition to YAML, from now on users are able to configure the ip-masq-agent in JSON (to replicate the vanilla ip-masq-agent behaviour). Signed-off-by: Martynas Pumputis --- pkg/ipmasq/ipmasq.go | 24 +++++++++++++++--------- pkg/ipmasq/ipmasq_test.go | 13 +++++++++++++ 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/pkg/ipmasq/ipmasq.go b/pkg/ipmasq/ipmasq.go index b4bddefdb0cb..e812189321c4 100644 --- a/pkg/ipmasq/ipmasq.go +++ b/pkg/ipmasq/ipmasq.go @@ -16,6 +16,7 @@ package ipmasq import ( "context" + "encoding/json" "fmt" "io/ioutil" "net" @@ -23,7 +24,7 @@ import ( "strings" "time" - "gopkg.in/yaml.v2" + "k8s.io/apimachinery/pkg/util/yaml" "github.com/cilium/cilium/pkg/controller" "github.com/cilium/cilium/pkg/logging" @@ -38,10 +39,11 @@ var ( // ipnet is a wrapper type for net.IPNet to enable de-serialization of CIDRs type Ipnet net.IPNet -func (c *Ipnet) UnmarshalYAML(unmarshal func(interface{}) error) error { - var str string - if err := unmarshal(&str); err != nil { - return err +func (c *Ipnet) UnmarshalJSON(json []byte) error { + str := string(json) + + if json[0] != '"' { + return fmt.Errorf("Invalid CIDR: %s", str) } ip, n, err := net.ParseCIDR(strings.Trim(str, `"`)) @@ -54,12 +56,11 @@ func (c *Ipnet) UnmarshalYAML(unmarshal func(interface{}) error) error { *c = Ipnet(*n) return nil - } // config represents the ip-masq-agent configuration file encoded as YAML type config struct { - NonMasqCIDRs []Ipnet `yaml:"nonMasqueradeCIDRs"` + NonMasqCIDRs []Ipnet `json:"nonMasqueradeCIDRs"` } // IPMasqMap is an interface describing methods for manipulating an ipmasq map @@ -146,8 +147,13 @@ func (a *IPMasqAgent) readConfig() error { return fmt.Errorf("Failed to read %s: %s", a.configPath, err) } - if err := yaml.Unmarshal(raw, &cfg); err != nil { - return fmt.Errorf("Failed to de-serialize yaml: %s", err) + jsonStr, err := yaml.ToJSON(raw) + if err != nil { + return fmt.Errorf("Failed to convert to json: %s", err) + } + + if err := json.Unmarshal([]byte(jsonStr), &cfg); err != nil { + return fmt.Errorf("Failed to de-serialize json: %s", err) } nonMasqCIDRs := map[string]net.IPNet{} diff --git a/pkg/ipmasq/ipmasq_test.go b/pkg/ipmasq/ipmasq_test.go index 436a3e66f853..2296d844360b 100644 --- a/pkg/ipmasq/ipmasq_test.go +++ b/pkg/ipmasq/ipmasq_test.go @@ -113,6 +113,19 @@ func (i *IPMasqTestSuite) TestUpdate(c *check.C) { _, ok = i.ipMasqMap.cidrs["2.2.0.0/16"] c.Assert(ok, check.Equals, true) + // Write new config in JSON + _, err = i.configFile.Seek(0, 0) + c.Assert(err, check.IsNil) + _, err = i.configFile.WriteString(`{"nonMasqueradeCIDRs": ["8.8.0.0/16", "1.1.2.3/16"]}`) + c.Assert(err, check.IsNil) + time.Sleep(300 * time.Millisecond) + + c.Assert(len(i.ipMasqMap.cidrs), check.Equals, 2) + _, ok = i.ipMasqMap.cidrs["8.8.0.0/16"] + c.Assert(ok, check.Equals, true) + _, ok = i.ipMasqMap.cidrs["1.1.0.0/16"] + c.Assert(ok, check.Equals, true) + // Delete file, should remove the CIDRs err = os.Remove(i.configFile.Name()) c.Assert(err, check.IsNil) From 345af05c6897c2e4792ed221f7d9ebf3a8f36e1d Mon Sep 17 00:00:00 2001 From: Martynas Pumputis Date: Mon, 4 May 2020 18:23:49 +0200 Subject: [PATCH 2/3] ipmasq: Fix concurrent map access in unit tests Protect the ipMasqMap by sync.RWMutex to avoid the race conditions: === RUN Test fatal error: concurrent map read and map write goroutine 33 [running]: runtime.throw(0xc67924, 0x21) /home/travis/.gimme/versions/go1.14.2.linux.amd64/src/runtime/panic.go:1116 +0x72 fp=0xc000574bb0 sp=0xc000574b80 pc=0x4370d2 runtime.mapaccess2_faststr(0xb78a00, 0xc00009e810, 0xc00003a320, 0xa, 0xc000574fb8, 0x0) /home/travis/.gimme/versions/go1.14.2.linux.amd64/src/runtime/map_faststr.go:116 +0x47c fp=0xc000574c20 sp=0xc000574bb0 pc=0x4155bc github.com/cilium/cilium/pkg/ipmasq.(*ipMasqMapMock).Update(0xc0001241c0, 0xc00003a2bc, 0x4, 0x4, 0xc00003a2b8, 0x4, 0x4, 0x0, 0x0) /home/travis/gopath/src/github.com/cilium/cilium/pkg/ipmasq/ipmasq_test.go:42 +0x6a fp=0xc000574c88 sp=0xc000574c20 pc=0xabc63a github.com/cilium/cilium/pkg/ipmasq.(*IPMasqAgent).Update(0xc00009e870, 0x22cf4b36, 0xc630b7c75d) Reported-by: Andre Martins Signed-off-by: Martynas Pumputis --- pkg/ipmasq/ipmasq_test.go | 55 ++++++++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/pkg/ipmasq/ipmasq_test.go b/pkg/ipmasq/ipmasq_test.go index 2296d844360b..7e64796a5660 100644 --- a/pkg/ipmasq/ipmasq_test.go +++ b/pkg/ipmasq/ipmasq_test.go @@ -27,6 +27,7 @@ import ( "gopkg.in/check.v1" "github.com/cilium/cilium/pkg/controller" + "github.com/cilium/cilium/pkg/lock" ) func Test(t *testing.T) { @@ -34,35 +35,60 @@ func Test(t *testing.T) { } type ipMasqMapMock struct { + lock.RWMutex cidrs map[string]net.IPNet } func (m *ipMasqMapMock) Update(cidr net.IPNet) error { + m.Lock() + defer m.Unlock() + cidrStr := cidr.String() if _, ok := m.cidrs[cidrStr]; ok { return fmt.Errorf("CIDR already exists: %s", cidrStr) } m.cidrs[cidrStr] = cidr + return nil } func (m *ipMasqMapMock) Delete(cidr net.IPNet) error { + m.Lock() + defer m.Unlock() + cidrStr := cidr.String() if _, ok := m.cidrs[cidrStr]; !ok { return fmt.Errorf("CIDR not found: %s", cidrStr) } delete(m.cidrs, cidrStr) + return nil } func (m *ipMasqMapMock) Dump() ([]net.IPNet, error) { + m.RLock() + defer m.RUnlock() + cidrs := make([]net.IPNet, 0, len(m.cidrs)) for _, cidr := range m.cidrs { cidrs = append(cidrs, cidr) } + return cidrs, nil } +func (m *ipMasqMapMock) dumpToSet() map[string]struct{} { + m.RLock() + defer m.RUnlock() + + cidrs := make(map[string]struct{}, len(m.cidrs)) + for cidrStr := range m.cidrs { + cidrs[cidrStr] = struct{}{} + } + + return cidrs +} + type IPMasqTestSuite struct { ipMasqMap *ipMasqMapMock manager *controller.Manager @@ -94,10 +120,11 @@ func (i *IPMasqTestSuite) TestUpdate(c *check.C) { c.Assert(err, check.IsNil) time.Sleep(300 * time.Millisecond) - c.Assert(len(i.ipMasqMap.cidrs), check.Equals, 2) - _, ok := i.ipMasqMap.cidrs["1.1.1.1/32"] + ipnets := i.ipMasqMap.dumpToSet() + c.Assert(len(ipnets), check.Equals, 2) + _, ok := ipnets["1.1.1.1/32"] c.Assert(ok, check.Equals, true) - _, ok = i.ipMasqMap.cidrs["2.2.0.0/16"] + _, ok = ipnets["2.2.0.0/16"] c.Assert(ok, check.Equals, true) // Write new config @@ -107,10 +134,11 @@ func (i *IPMasqTestSuite) TestUpdate(c *check.C) { c.Assert(err, check.IsNil) time.Sleep(300 * time.Millisecond) - c.Assert(len(i.ipMasqMap.cidrs), check.Equals, 2) - _, ok = i.ipMasqMap.cidrs["8.8.0.0/16"] + ipnets = i.ipMasqMap.dumpToSet() + c.Assert(len(ipnets), check.Equals, 2) + _, ok = ipnets["8.8.0.0/16"] c.Assert(ok, check.Equals, true) - _, ok = i.ipMasqMap.cidrs["2.2.0.0/16"] + _, ok = ipnets["2.2.0.0/16"] c.Assert(ok, check.Equals, true) // Write new config in JSON @@ -120,17 +148,19 @@ func (i *IPMasqTestSuite) TestUpdate(c *check.C) { c.Assert(err, check.IsNil) time.Sleep(300 * time.Millisecond) - c.Assert(len(i.ipMasqMap.cidrs), check.Equals, 2) - _, ok = i.ipMasqMap.cidrs["8.8.0.0/16"] + ipnets = i.ipMasqMap.dumpToSet() + c.Assert(len(ipnets), check.Equals, 2) + _, ok = ipnets["8.8.0.0/16"] c.Assert(ok, check.Equals, true) - _, ok = i.ipMasqMap.cidrs["1.1.0.0/16"] + _, ok = ipnets["1.1.0.0/16"] c.Assert(ok, check.Equals, true) // Delete file, should remove the CIDRs err = os.Remove(i.configFile.Name()) c.Assert(err, check.IsNil) time.Sleep(300 * time.Millisecond) - c.Assert(len(i.ipMasqMap.cidrs), check.Equals, 0) + ipnets = i.ipMasqMap.dumpToSet() + c.Assert(len(ipnets), check.Equals, 0) } func (i *IPMasqTestSuite) TestRestore(c *check.C) { @@ -148,7 +178,8 @@ func (i *IPMasqTestSuite) TestRestore(c *check.C) { start(i.configFile.Name(), 100*time.Millisecond, i.ipMasqMap, i.manager) time.Sleep(300 * time.Millisecond) - c.Assert(len(i.ipMasqMap.cidrs), check.Equals, 1) - _, ok := i.ipMasqMap.cidrs["4.4.0.0/16"] + ipnets := i.ipMasqMap.dumpToSet() + c.Assert(len(ipnets), check.Equals, 1) + _, ok := ipnets["4.4.0.0/16"] c.Assert(ok, check.Equals, true) } From ac23b20d71f47e2a5698678f6cfff350a927545b Mon Sep 17 00:00:00 2001 From: Martynas Pumputis Date: Mon, 4 May 2020 21:49:54 +0200 Subject: [PATCH 3/3] ipmasq: Use fsnotify instead of periodic polling Instead of periodically reading the ip-masq-agent config file, subscribe to inotify events which are fired when the config is updated. This allows us to save a couple of CPU cycles, and also to get rid of the sync period flag. Signed-off-by: Martynas Pumputis --- Documentation/cmdref/cilium-agent.md | 1 - daemon/cmd/daemon_main.go | 10 ++-- pkg/ipmasq/ipmasq.go | 88 ++++++++++++++++++++++------ pkg/ipmasq/ipmasq_test.go | 44 ++++++++++---- pkg/option/config.go | 5 -- 5 files changed, 105 insertions(+), 43 deletions(-) diff --git a/Documentation/cmdref/cilium-agent.md b/Documentation/cmdref/cilium-agent.md index 7bbff1bc6d65..c63236fae179 100644 --- a/Documentation/cmdref/cilium-agent.md +++ b/Documentation/cmdref/cilium-agent.md @@ -108,7 +108,6 @@ cilium-agent [flags] --install-iptables-rules Install base iptables rules for cilium to mainly interact with kube-proxy (and masquerading) (default true) --ip-allocation-timeout duration Time after which an incomplete CIDR allocation is considered failed (default 2m0s) --ip-masq-agent-config-path string ip-masq-agent configuration file path (default "/etc/config/ip-masq-agent") - --ip-masq-agent-sync-period duration ip-masq-agent configuration file synchronization period (default 1m0s) --ipam string Backend to use for IPAM (default "hostscope-legacy") --ipsec-key-file string Path to IPSec key file --ipv4-node string IPv4 address of node (default "auto") diff --git a/daemon/cmd/daemon_main.go b/daemon/cmd/daemon_main.go index 4196cacb1caf..42ee1643b542 100644 --- a/daemon/cmd/daemon_main.go +++ b/daemon/cmd/daemon_main.go @@ -547,9 +547,6 @@ func init() { flags.String(option.IPMasqAgentConfigPath, "/etc/config/ip-masq-agent", "ip-masq-agent configuration file path") option.BindEnv(option.IPMasqAgentConfigPath) - flags.Duration(option.IPMasqAgentSyncPeriod, 60*time.Second, "ip-masq-agent configuration file synchronization period") - option.BindEnv(option.IPMasqAgentSyncPeriod) - flags.Bool(option.InstallIptRules, true, "Install base iptables rules for cilium to mainly interact with kube-proxy (and masquerading)") option.BindEnv(option.InstallIptRules) @@ -1295,8 +1292,11 @@ func runDaemon() { } if option.Config.EnableIPMasqAgent { - ipmasq.Start(option.Config.IPMasqAgentConfigPath, - option.Config.IPMasqAgentSyncPeriod) + ipmasqAgent, err := ipmasq.NewIPMasqAgent(option.Config.IPMasqAgentConfigPath) + if err != nil { + log.WithError(err).Fatal("Failed to create ip-masq-agent") + } + ipmasqAgent.Start() } if !option.Config.DryMode { diff --git a/pkg/ipmasq/ipmasq.go b/pkg/ipmasq/ipmasq.go index e812189321c4..5fa0463debb1 100644 --- a/pkg/ipmasq/ipmasq.go +++ b/pkg/ipmasq/ipmasq.go @@ -15,18 +15,17 @@ package ipmasq import ( - "context" "encoding/json" "fmt" "io/ioutil" "net" "os" + "path/filepath" "strings" - "time" + "github.com/fsnotify/fsnotify" "k8s.io/apimachinery/pkg/util/yaml" - "github.com/cilium/cilium/pkg/controller" "github.com/cilium/cilium/pkg/logging" "github.com/cilium/cilium/pkg/logging/logfields" "github.com/cilium/cilium/pkg/maps/ipmasq" @@ -76,35 +75,86 @@ type IPMasqAgent struct { nonMasqCIDRsFromConfig map[string]net.IPNet nonMasqCIDRsInMap map[string]net.IPNet ipMasqMap IPMasqMap + watcher *fsnotify.Watcher + stop chan struct{} + handlerFinished chan struct{} } -// Start starts the "ip-masq-agent" controller which is used to sync the ipmasq -// BPF maps. -func Start(configPath string, syncPeriod time.Duration) { - start(configPath, syncPeriod, &ipmasq.IPMasqBPFMap{}, controller.NewManager()) +func NewIPMasqAgent(configPath string) (*IPMasqAgent, error) { + return newIPMasqAgent(configPath, &ipmasq.IPMasqBPFMap{}) } -func start(configPath string, syncPeriod time.Duration, - ipMasqMap IPMasqMap, manager *controller.Manager) { +func newIPMasqAgent(configPath string, ipMasqMap IPMasqMap) (*IPMasqAgent, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, fmt.Errorf("Failed to create fsnotify watcher: %s", err) + } + + configDir := filepath.Dir(configPath) + // The directory of the config should exist at this time, otherwise + // the watcher will fail to add + if err := watcher.Add(configDir); err != nil { + watcher.Close() + return nil, fmt.Errorf("Failed to add %q dir to fsnotify watcher: %s", configDir, err) + } a := &IPMasqAgent{ configPath: configPath, nonMasqCIDRsInMap: map[string]net.IPNet{}, ipMasqMap: ipMasqMap, + watcher: watcher, } + return a, nil +} + +// Start starts the ip-masq-agent goroutine which tracks the config file and +// updates the BPF map accordingly. +func (a *IPMasqAgent) Start() { if err := a.restore(); err != nil { - log.WithError(err).Warn("ip-masq-agent failed to restore") + log.WithError(err).Warn("Failed to restore") + } + if err := a.Update(); err != nil { + log.WithError(err).Warn("Failed to update") } - manager.UpdateController("ip-masq-agent", - controller.ControllerParams{ - DoFunc: func(ctx context.Context) error { - return a.Update() - }, - RunInterval: syncPeriod, - }, - ) + a.stop = make(chan struct{}) + a.handlerFinished = make(chan struct{}) + + go func() { + for { + select { + case event := <-a.watcher.Events: + log.Debugf("Received fsnotify event: %+v", event) + + if event.Name != a.configPath { + continue + } + + switch event.Op { + case fsnotify.Create, fsnotify.Write, fsnotify.Chmod, fsnotify.Remove, fsnotify.Rename: + if err := a.Update(); err != nil { + log.WithError(err).Warn("Failed to update") + } + default: + log.Warnf("Watcher received unknown event: %s. Ignoring.", event) + } + case err := <-a.watcher.Errors: + log.WithError(err).Warn("Watcher received an error") + case <-a.stop: + log.Info("Stopping ip-masq-agent") + close(a.handlerFinished) + return + } + } + }() +} + +// Stop stops the ip-masq-agent goroutine and the watcher. +func (a *IPMasqAgent) Stop() { + close(a.stop) + <-a.handlerFinished + a.watcher.Close() } // Update updates the ipmasq BPF map entries with ones from the config file. @@ -152,7 +202,7 @@ func (a *IPMasqAgent) readConfig() error { return fmt.Errorf("Failed to convert to json: %s", err) } - if err := json.Unmarshal([]byte(jsonStr), &cfg); err != nil { + if err := json.Unmarshal(jsonStr, &cfg); err != nil { return fmt.Errorf("Failed to de-serialize json: %s", err) } diff --git a/pkg/ipmasq/ipmasq_test.go b/pkg/ipmasq/ipmasq_test.go index 7e64796a5660..f1b4b019e9d7 100644 --- a/pkg/ipmasq/ipmasq_test.go +++ b/pkg/ipmasq/ipmasq_test.go @@ -26,7 +26,6 @@ import ( "gopkg.in/check.v1" - "github.com/cilium/cilium/pkg/controller" "github.com/cilium/cilium/pkg/lock" ) @@ -90,28 +89,28 @@ func (m *ipMasqMapMock) dumpToSet() map[string]struct{} { } type IPMasqTestSuite struct { - ipMasqMap *ipMasqMapMock - manager *controller.Manager - configFile *os.File + ipMasqMap *ipMasqMapMock + ipMasqAgent *IPMasqAgent + configFile *os.File } var _ = check.Suite(&IPMasqTestSuite{}) func (i *IPMasqTestSuite) SetUpTest(c *check.C) { i.ipMasqMap = &ipMasqMapMock{cidrs: map[string]net.IPNet{}} - i.manager = controller.NewManager() configFile, err := ioutil.TempFile("", "ipmasq-test") c.Assert(err, check.IsNil) i.configFile = configFile - start(configFile.Name(), 100*time.Millisecond, i.ipMasqMap, i.manager) + agent, err := newIPMasqAgent(configFile.Name(), i.ipMasqMap) + c.Assert(err, check.IsNil) + i.ipMasqAgent = agent + i.ipMasqAgent.Start() } func (i *IPMasqTestSuite) TearDownTest(c *check.C) { - err := i.manager.RemoveController("ip-masq-agent") - c.Assert(err, check.IsNil) - + i.ipMasqAgent.Stop() os.Remove(i.configFile.Name()) } @@ -158,28 +157,47 @@ func (i *IPMasqTestSuite) TestUpdate(c *check.C) { // Delete file, should remove the CIDRs err = os.Remove(i.configFile.Name()) c.Assert(err, check.IsNil) + err = i.configFile.Close() + c.Assert(err, check.IsNil) time.Sleep(300 * time.Millisecond) ipnets = i.ipMasqMap.dumpToSet() c.Assert(len(ipnets), check.Equals, 0) } func (i *IPMasqTestSuite) TestRestore(c *check.C) { - err := i.manager.RemoveController("ip-masq-agent") - c.Assert(err, check.IsNil) + // Stop ip-masq-agent goroutine (we can't use i.ipMasqMap.Stop(), as it stops + // the watcher) + close(i.ipMasqAgent.stop) _, cidr, _ := net.ParseCIDR("3.3.3.0/24") i.ipMasqMap.cidrs[cidr.String()] = *cidr _, cidr, _ = net.ParseCIDR("4.4.0.0/16") i.ipMasqMap.cidrs[cidr.String()] = *cidr - _, err = i.configFile.WriteString("nonMasqueradeCIDRs:\n- 4.4.0.0/16") + _, err := i.configFile.WriteString("nonMasqueradeCIDRs:\n- 4.4.0.0/16") c.Assert(err, check.IsNil) - start(i.configFile.Name(), 100*time.Millisecond, i.ipMasqMap, i.manager) + i.ipMasqAgent.Start() time.Sleep(300 * time.Millisecond) ipnets := i.ipMasqMap.dumpToSet() c.Assert(len(ipnets), check.Equals, 1) _, ok := ipnets["4.4.0.0/16"] c.Assert(ok, check.Equals, true) + + // Now stop the goroutine, and also remove the maps. It should bootstrap from + // the config + close(i.ipMasqAgent.stop) + i.ipMasqMap = &ipMasqMapMock{cidrs: map[string]net.IPNet{}} + i.ipMasqAgent.ipMasqMap = i.ipMasqMap + _, err = i.configFile.Seek(0, 0) + c.Assert(err, check.IsNil) + _, err = i.configFile.WriteString("nonMasqueradeCIDRs:\n- 3.3.0.0/16") + c.Assert(err, check.IsNil) + i.ipMasqAgent.Start() + + ipnets = i.ipMasqMap.dumpToSet() + c.Assert(len(ipnets), check.Equals, 1) + _, ok = ipnets["3.3.0.0/16"] + c.Assert(ok, check.Equals, true) } diff --git a/pkg/option/config.go b/pkg/option/config.go index b1cd49fcf22f..ff176a2f7393 100644 --- a/pkg/option/config.go +++ b/pkg/option/config.go @@ -276,9 +276,6 @@ const ( // IPMasqAgentConfigPath is the configuration file path IPMasqAgentConfigPath = "ip-masq-agent-config-path" - // IPMasqAgentSyncPeriod is the configuration file synchronization period - IPMasqAgentSyncPeriod = "ip-masq-agent-sync-period" - // InstallIptRules sets whether Cilium should install any iptables in general InstallIptRules = "install-iptables-rules" @@ -1454,7 +1451,6 @@ type DaemonConfig struct { EnableBPFMasquerade bool EnableIPMasqAgent bool IPMasqAgentConfigPath string - IPMasqAgentSyncPeriod time.Duration InstallIptRules bool MonitorAggregation string PreAllocateMaps bool @@ -2231,7 +2227,6 @@ func (c *DaemonConfig) Populate() { c.EnableBPFMasquerade = viper.GetBool(EnableBPFMasquerade) c.EnableIPMasqAgent = viper.GetBool(EnableIPMasqAgent) c.IPMasqAgentConfigPath = viper.GetString(IPMasqAgentConfigPath) - c.IPMasqAgentSyncPeriod = viper.GetDuration(IPMasqAgentSyncPeriod) c.InstallIptRules = viper.GetBool(InstallIptRules) c.IPSecKeyFile = viper.GetString(IPSecKeyFileName) c.ModePreFilter = viper.GetString(PrefilterMode)