diff --git a/cmd/containerd/builtins/builtins.go b/cmd/containerd/builtins/builtins.go index 6ed9f3b0a6bc..c89817805ad3 100644 --- a/cmd/containerd/builtins/builtins.go +++ b/cmd/containerd/builtins/builtins.go @@ -23,6 +23,7 @@ import ( _ "github.com/containerd/containerd/gc/scheduler" _ "github.com/containerd/containerd/leases/plugin" _ "github.com/containerd/containerd/metadata/plugin" + _ "github.com/containerd/containerd/pkg/net/plugin" _ "github.com/containerd/containerd/pkg/nri/plugin" _ "github.com/containerd/containerd/plugins/sandbox" _ "github.com/containerd/containerd/plugins/streaming" diff --git a/contrib/fuzz/cri_server_fuzzer.go b/contrib/fuzz/cri_server_fuzzer.go index 2b6622bf8fa5..0fa1d649ab4b 100644 --- a/contrib/fuzz/cri_server_fuzzer.go +++ b/contrib/fuzz/cri_server_fuzzer.go @@ -37,7 +37,7 @@ func FuzzCRIServer(data []byte) int { } defer client.Close() - c, err := server.NewCRIService(criconfig.Config{}, client, nil) + c, err := server.NewCRIService(criconfig.Config{}, client, nil, nil) if err != nil { panic(err) } diff --git a/integration/image_pull_timeout_test.go b/integration/image_pull_timeout_test.go index a2473b181d80..220606bfa21d 100644 --- a/integration/image_pull_timeout_test.go +++ b/integration/image_pull_timeout_test.go @@ -471,5 +471,5 @@ func initLocalCRIPlugin(client *containerd.Client, tmpDir string, registryCfg cr RootDir: filepath.Join(criWorkDir, "root"), StateDir: filepath.Join(criWorkDir, "state"), } - return criserver.NewCRIService(cfg, client, nil) + return criserver.NewCRIService(cfg, client, nil, nil) } diff --git a/metadata/gc.go b/metadata/gc.go index aba72d56b47b..497103a6afea 100644 --- a/metadata/gc.go +++ b/metadata/gc.go @@ -49,6 +49,8 @@ const ( resourceEnd // ResourceStream specifies a stream ResourceStream + // ResourceNetwork specifies a network resource + ResourceNetwork ) const ( diff --git a/pkg/cri/cri.go b/pkg/cri/cri.go index a5e01b39945b..2caf9a808f0e 100644 --- a/pkg/cri/cri.go +++ b/pkg/cri/cri.go @@ -25,6 +25,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/log" "github.com/containerd/containerd/pkg/cri/sbserver" + "github.com/containerd/containerd/pkg/net/compat" "github.com/containerd/containerd/pkg/nri" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/plugin" @@ -48,6 +49,7 @@ func init() { plugin.EventPlugin, plugin.ServicePlugin, plugin.NRIApiPlugin, + plugin.NetworkPlugin, }, InitFn: initCRIService, }) @@ -88,6 +90,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { var s server.CRIService var nrip nri.API + var netp compat.API if os.Getenv("ENABLE_CRI_SANDBOXES") != "" { log.G(ctx).Info("using experimental CRI Sandbox server - unset ENABLE_CRI_SANDBOXES to disable") s, err = sbserver.NewCRIService(c, client) @@ -99,7 +102,12 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { log.G(ctx).Info("NRI service not found, disabling NRI support") } - s, err = server.NewCRIService(c, client, nrip) + netp, err = getNetworkPlugin(ic) + if err == nil { + log.G(ctx).Info("using experimental network plugin") + } + + s, err = server.NewCRIService(c, client, nrip, netp) } if err != nil { return nil, fmt.Errorf("failed to create CRI service: %w", err) @@ -166,3 +174,28 @@ func getNRIPlugin(ic *plugin.InitContext) (nri.API, error) { return api, nil } + +// Get the Network plugin and verify its type. +func getNetworkPlugin(ic *plugin.InitContext) (compat.API, error) { + const ( + pluginType = plugin.NetworkPlugin + pluginName = "cni" + ) + + if os.Getenv("ENABLE_NETWORK_SRV") == "" { + return nil, nil + } + + p, err := ic.GetByID(pluginType, pluginName) + if err != nil { + return nil, err + } + + api, ok := p.(compat.API) + if !ok { + return nil, fmt.Errorf("network plugin (%s, %q) has incompatible type %T", + pluginType, pluginName, api) + } + + return api, nil +} diff --git a/pkg/cri/server/cni_conf_syncer.go b/pkg/cri/server/cni_conf_syncer.go index 9e2a459ec6ea..8a029785389f 100644 --- a/pkg/cri/server/cni_conf_syncer.go +++ b/pkg/cri/server/cni_conf_syncer.go @@ -22,7 +22,8 @@ import ( "path/filepath" "sync" - cni "github.com/containerd/go-cni" + "github.com/containerd/containerd/pkg/cri/util" + "github.com/containerd/containerd/pkg/net/compat" "github.com/fsnotify/fsnotify" "github.com/sirupsen/logrus" ) @@ -36,12 +37,12 @@ type cniNetConfSyncer struct { watcher *fsnotify.Watcher confDir string - netPlugin cni.CNI - loadOpts []cni.Opt + netPlugin compat.CNI + loadOpts []compat.LoadOpt } // newCNINetConfSyncer creates cni network conf syncer. -func newCNINetConfSyncer(confDir string, netPlugin cni.CNI, loadOpts []cni.Opt) (*cniNetConfSyncer, error) { +func newCNINetConfSyncer(confDir string, netPlugin compat.CNI, loadOpts []compat.LoadOpt) (*cniNetConfSyncer, error) { watcher, err := fsnotify.NewWatcher() if err != nil { return nil, fmt.Errorf("failed to create fsnotify watcher: %w", err) @@ -69,7 +70,7 @@ func newCNINetConfSyncer(confDir string, netPlugin cni.CNI, loadOpts []cni.Opt) loadOpts: loadOpts, } - if err := syncer.netPlugin.Load(syncer.loadOpts...); err != nil { + if err := syncer.netPlugin.Load(util.NamespacedContext(), syncer.loadOpts...); err != nil { logrus.WithError(err).Error("failed to load cni during init, please check CRI plugin status before setting up network for pods") syncer.updateLastStatus(err) } @@ -97,7 +98,7 @@ func (syncer *cniNetConfSyncer) syncLoop() error { } logrus.Debugf("receiving change event from cni conf dir: %s", event) - lerr := syncer.netPlugin.Load(syncer.loadOpts...) + lerr := syncer.netPlugin.Load(util.NamespacedContext(), syncer.loadOpts...) if lerr != nil { logrus.WithError(lerr). Errorf("failed to reload cni configuration after receiving fs change event(%s)", event) diff --git a/pkg/cri/server/netapi_adaptor.go b/pkg/cri/server/netapi_adaptor.go new file mode 100644 index 000000000000..37107fd4ac63 --- /dev/null +++ b/pkg/cri/server/netapi_adaptor.go @@ -0,0 +1,224 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package server + +import ( + "context" + "reflect" + "runtime" + "strings" + + "github.com/containerd/containerd/pkg/net" + "github.com/containerd/containerd/pkg/net/compat" + "github.com/containerd/go-cni" +) + +// cniAdaptor is created to adapt the APIs of network plugin to their +// counterparts in go-cni. +type cniAdaptor struct { + adapt bool + g cni.CNI + c compat.CNI +} + +var _ compat.CNI = (*cniAdaptor)(nil) + +//nolint:nolintlint,unused +func newCNIAdaptor(netAPI compat.API, name string, opts ...compat.Opt) (*cniAdaptor, error) { + var err error + + c := &cniAdaptor{ + adapt: false, + } + + if netAPI == nil { + c.adapt = true + } + + if c.adapt { + dopts, err := convertOpts(opts) + if err != nil { + return nil, err + } + if c.g, err = cni.New(dopts...); err != nil { + return nil, err + } + } else { + if c.c, err = netAPI.NewCNI(name, opts...); err != nil { + return nil, err + } + } + + return c, nil +} + +func (c *cniAdaptor) Setup(ctx context.Context, id string, path string, opts ...net.AttachmentOpt) (*compat.Result, error) { + if !c.adapt { + return c.c.Setup(ctx, id, path, opts...) + } + + dopts, err := convertNamespaceOpts(opts) + if err != nil { + return nil, err + } + + r, err := c.g.Setup(ctx, id, path, dopts...) + if err != nil { + return nil, err + } + + return compat.WrapResult(r), nil +} + +func (c *cniAdaptor) SetupSerially(ctx context.Context, id string, path string, opts ...net.AttachmentOpt) (*compat.Result, error) { + if !c.adapt { + return c.c.SetupSerially(ctx, id, path, opts...) + } + + dopts, err := convertNamespaceOpts(opts) + if err != nil { + return nil, err + } + + r, err := c.g.Setup(ctx, id, path, dopts...) + if err != nil { + return nil, err + } + + return compat.WrapResult(r), nil +} + +func (c *cniAdaptor) Remove(ctx context.Context, id string, path string, opts ...net.AttachmentOpt) error { + if !c.adapt { + return c.c.Remove(ctx, id, path, opts...) + } + + dopts, err := convertNamespaceOpts(opts) + if err != nil { + return err + } + + return c.g.Remove(ctx, id, path, dopts...) +} + +func (c *cniAdaptor) Check(ctx context.Context, id string, path string, opts ...net.AttachmentOpt) error { + if !c.adapt { + return c.c.Check(ctx, id, path, opts...) + } + + dopts, err := convertNamespaceOpts(opts) + if err != nil { + return err + } + + return c.g.Check(ctx, id, path, dopts...) +} + +func (c *cniAdaptor) Load(ctx context.Context, opts ...compat.LoadOpt) error { + if !c.adapt { + return c.c.Load(ctx, opts...) + } + + var dopts []cni.Opt + for _, o := range opts { + name := getFunctionName(o) + sl := strings.Split(name, "/") + switch sl[len(sl)-1] { + case "compat.WithLoNetwork": + dopts = append(dopts, cni.WithLoNetwork) + case "compat.WithDefaultConf": + dopts = append(dopts, cni.WithDefaultConf) + } + } + + return c.g.Load(dopts...) +} + +func (c *cniAdaptor) Status(ctx context.Context) error { + if !c.adapt { + return c.c.Status(ctx) + } + return c.g.Status() +} + +func (c *cniAdaptor) GetConfig(ctx context.Context) *cni.ConfigResult { + if !c.adapt { + return c.c.GetConfig(ctx) + } + return c.g.GetConfig() +} + +//nolint:nolintlint,unused +func convertOpts(opts []compat.Opt) ([]cni.Opt, error) { + var ( + cfg compat.Config + dopts []cni.Opt + ) + + for _, o := range opts { + if err := o(&cfg); err != nil { + return dopts, err + } + } + + if len(cfg.PluginDirs) > 0 { + dopts = append(dopts, cni.WithPluginDir(cfg.PluginDirs)) + } + if len(cfg.PluginConfDir) > 0 { + dopts = append(dopts, cni.WithPluginConfDir(cfg.PluginConfDir)) + } + if cfg.PluginMaxConfNum > 0 { + dopts = append(dopts, cni.WithPluginMaxConfNum(cfg.PluginMaxConfNum)) + } + if len(cfg.Prefix) > 0 { + dopts = append(dopts, cni.WithInterfacePrefix(cfg.Prefix)) + } + if cfg.NetworkCount > 0 { + dopts = append(dopts, cni.WithMinNetworkCount(cfg.NetworkCount)) + } + + return dopts, nil +} + +func convertNamespaceOpts(opts []net.AttachmentOpt) ([]cni.NamespaceOpts, error) { + var dopts []cni.NamespaceOpts + + args := net.AttachmentArgs{ + CapabilityArgs: make(map[string]interface{}), + PluginArgs: make(map[string]string), + } + + for _, o := range opts { + if err := o(&args); err != nil { + return dopts, err + } + } + + for k, v := range args.PluginArgs { + dopts = append(dopts, cni.WithArgs(k, v)) + } + + for k, v := range args.CapabilityArgs { + dopts = append(dopts, cni.WithCapability(k, v)) + } + + return dopts, nil +} + +func getFunctionName(i interface{}) string { + return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() +} diff --git a/pkg/cri/server/sandbox_run.go b/pkg/cri/server/sandbox_run.go index 5380a2664d19..c310986ae0a2 100644 --- a/pkg/cri/server/sandbox_run.go +++ b/pkg/cri/server/sandbox_run.go @@ -27,7 +27,9 @@ import ( "strings" "time" - cni "github.com/containerd/go-cni" + "github.com/containerd/containerd/pkg/net" + cni "github.com/containerd/containerd/pkg/net/compat" + gocni "github.com/containerd/go-cni" "github.com/containerd/typeurl" "github.com/davecgh/go-spew/spew" selinux "github.com/opencontainers/selinux/go-selinux" @@ -544,6 +546,13 @@ func (c *criService) setupPodNetwork(ctx context.Context, sandbox *sandboxstore. return errors.New("cni config not initialized") } + // a lease for the network resources are required until the gc lables are set to the owning container + ctx, done, err := c.client.WithLease(ctx) + if err != nil { + return err + } + defer done(ctx) + opts, err := cniNamespaceOpts(id, config) if err != nil { return fmt.Errorf("get cni namespace options: %w", err) @@ -561,26 +570,34 @@ func (c *criService) setupPodNetwork(ctx context.Context, sandbox *sandboxstore. networkPluginOperationsErrors.WithValues(networkSetUpOp).Inc() return err } + logDebugCNIResult(ctx, id, result) + + // set the gc labels + if len(result.Labels) > 0 { + if _, err = sandbox.Container.SetLabels(ctx, result.Labels); err != nil { + return err + } + } // Check if the default interface has IP config if configs, ok := result.Interfaces[defaultIfName]; ok && len(configs.IPConfigs) > 0 { sandbox.IP, sandbox.AdditionalIPs = selectPodIPs(ctx, configs.IPConfigs, c.config.IPPreference) - sandbox.CNIResult = result + sandbox.CNIResult = &result.Result return nil } return fmt.Errorf("failed to find network info for sandbox %q", id) } // cniNamespaceOpts get CNI namespace options from sandbox config. -func cniNamespaceOpts(id string, config *runtime.PodSandboxConfig) ([]cni.NamespaceOpts, error) { - opts := []cni.NamespaceOpts{ - cni.WithLabels(toCNILabels(id, config)), - cni.WithCapability(annotations.PodAnnotations, config.Annotations), +func cniNamespaceOpts(id string, config *runtime.PodSandboxConfig) ([]net.AttachmentOpt, error) { + opts := []net.AttachmentOpt{ + net.WithLabels(toCNILabels(id, config)), + net.WithCapability(annotations.PodAnnotations, config.Annotations), } portMappings := toCNIPortMappings(config.GetPortMappings()) if len(portMappings) > 0 { - opts = append(opts, cni.WithCapabilityPortMap(portMappings)) + opts = append(opts, net.WithCapabilityPortMap(portMappings)) } // Will return an error if the bandwidth limitation has the wrong unit @@ -590,12 +607,12 @@ func cniNamespaceOpts(id string, config *runtime.PodSandboxConfig) ([]cni.Namesp return nil, err } if bandWidth != nil { - opts = append(opts, cni.WithCapabilityBandWidth(*bandWidth)) + opts = append(opts, net.WithCapabilityBandWidth(*bandWidth)) } dns := toCNIDNS(config.GetDnsConfig()) if dns != nil { - opts = append(opts, cni.WithCapabilityDNS(*dns)) + opts = append(opts, net.WithCapabilityDNS(*dns)) } return opts, nil @@ -613,7 +630,7 @@ func toCNILabels(id string, config *runtime.PodSandboxConfig) map[string]string } // toCNIBandWidth converts CRI annotations to CNI bandwidth. -func toCNIBandWidth(annotations map[string]string) (*cni.BandWidth, error) { +func toCNIBandWidth(annotations map[string]string) (*gocni.BandWidth, error) { ingress, egress, err := bandwidth.ExtractPodBandwidthResources(annotations) if err != nil { return nil, fmt.Errorf("reading pod bandwidth annotations: %w", err) @@ -623,7 +640,7 @@ func toCNIBandWidth(annotations map[string]string) (*cni.BandWidth, error) { return nil, nil } - bandWidth := &cni.BandWidth{} + bandWidth := &gocni.BandWidth{} if ingress != nil { bandWidth.IngressRate = uint64(ingress.Value()) @@ -639,13 +656,13 @@ func toCNIBandWidth(annotations map[string]string) (*cni.BandWidth, error) { } // toCNIPortMappings converts CRI port mappings to CNI. -func toCNIPortMappings(criPortMappings []*runtime.PortMapping) []cni.PortMapping { - var portMappings []cni.PortMapping +func toCNIPortMappings(criPortMappings []*runtime.PortMapping) []gocni.PortMapping { + var portMappings []gocni.PortMapping for _, mapping := range criPortMappings { if mapping.HostPort <= 0 { continue } - portMappings = append(portMappings, cni.PortMapping{ + portMappings = append(portMappings, gocni.PortMapping{ HostPort: mapping.HostPort, ContainerPort: mapping.ContainerPort, Protocol: strings.ToLower(mapping.Protocol.String()), @@ -656,11 +673,11 @@ func toCNIPortMappings(criPortMappings []*runtime.PortMapping) []cni.PortMapping } // toCNIDNS converts CRI DNSConfig to CNI. -func toCNIDNS(dns *runtime.DNSConfig) *cni.DNS { +func toCNIDNS(dns *runtime.DNSConfig) *gocni.DNS { if dns == nil { return nil } - return &cni.DNS{ + return &gocni.DNS{ Servers: dns.GetServers(), Searches: dns.GetSearches(), Options: dns.GetOptions(), @@ -668,11 +685,11 @@ func toCNIDNS(dns *runtime.DNSConfig) *cni.DNS { } // selectPodIPs select an ip from the ip list. -func selectPodIPs(ctx context.Context, configs []*cni.IPConfig, preference string) (string, []string) { +func selectPodIPs(ctx context.Context, configs []*gocni.IPConfig, preference string) (string, []string) { if len(configs) == 1 { return ipString(configs[0]), nil } - toStrings := func(ips []*cni.IPConfig) (o []string) { + toStrings := func(ips []*gocni.IPConfig) (o []string) { for _, i := range ips { o = append(o, ipString(i)) } @@ -705,7 +722,7 @@ func selectPodIPs(ctx context.Context, configs []*cni.IPConfig, preference strin return all[0], all[1:] } -func ipString(ip *cni.IPConfig) string { +func ipString(ip *gocni.IPConfig) string { return ip.IP.String() } diff --git a/pkg/cri/server/service.go b/pkg/cri/server/service.go index 74f0636191d5..26cadb226af8 100644 --- a/pkg/cri/server/service.go +++ b/pkg/cri/server/service.go @@ -30,10 +30,10 @@ import ( "github.com/containerd/containerd/oci" "github.com/containerd/containerd/pkg/cri/streaming" "github.com/containerd/containerd/pkg/kmutex" + "github.com/containerd/containerd/pkg/net/compat" "github.com/containerd/containerd/pkg/nri" "github.com/containerd/containerd/plugin" runtime_alpha "github.com/containerd/containerd/third_party/k8s.io/cri-api/pkg/apis/runtime/v1alpha2" - cni "github.com/containerd/go-cni" "github.com/sirupsen/logrus" "google.golang.org/grpc" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -98,7 +98,7 @@ type criService struct { // snapshotStore stores information of all snapshots. snapshotStore *snapshotstore.Store // netPlugin is used to setup and teardown network when run/stop pod sandbox. - netPlugin map[string]cni.CNI + netPlugin map[string]compat.CNI // client is an instance of the containerd client client *containerd.Client // streamServer is the streaming server serves container streaming request. @@ -122,13 +122,16 @@ type criService struct { unpackDuplicationSuppressor kmutex.KeyedLocker nri *nriAPI + + netAPI compat.API + // containerEventsChan is used to capture container events and send them // to the caller of GetContainerEvents. containerEventsChan chan runtime.ContainerEventResponse } // NewCRIService returns a new instance of CRIService -func NewCRIService(config criconfig.Config, client *containerd.Client, nrip nri.API) (CRIService, error) { +func NewCRIService(config criconfig.Config, client *containerd.Client, nrip nri.API, netp compat.API) (CRIService, error) { var err error labels := label.NewStore() c := &criService{ @@ -142,7 +145,8 @@ func NewCRIService(config criconfig.Config, client *containerd.Client, nrip nri. sandboxNameIndex: registrar.NewRegistrar(), containerNameIndex: registrar.NewRegistrar(), initialized: atomic.NewBool(false), - netPlugin: make(map[string]cni.CNI), + netPlugin: make(map[string]compat.CNI), + netAPI: netp, unpackDuplicationSuppressor: kmutex.New(), } diff --git a/pkg/cri/server/service_linux.go b/pkg/cri/server/service_linux.go index 612bff87dd63..e016087d8b7f 100644 --- a/pkg/cri/server/service_linux.go +++ b/pkg/cri/server/service_linux.go @@ -21,8 +21,8 @@ import ( "github.com/container-orchestrated-devices/container-device-interface/pkg/cdi" "github.com/containerd/containerd/pkg/cap" + cni "github.com/containerd/containerd/pkg/net/compat" "github.com/containerd/containerd/pkg/userns" - cni "github.com/containerd/go-cni" "github.com/opencontainers/selinux/go-selinux" "github.com/sirupsen/logrus" ) @@ -71,13 +71,16 @@ func (c *criService) initPlatform() (err error) { // hence networkAttachCount is 2. If there are more network configs the // pod will be attached to all the networks but we will only use the ip // of the default network interface as the pod IP. - i, err := cni.New(cni.WithMinNetworkCount(networkAttachCount), + i, err := newCNIAdaptor(c.netAPI, name, + cni.WithMinNetworkCount(networkAttachCount), cni.WithPluginConfDir(dir), cni.WithPluginMaxConfNum(max), cni.WithPluginDir([]string{c.config.NetworkPluginBinDir})) + if err != nil { return fmt.Errorf("failed to initialize cni: %w", err) } + c.netPlugin[name] = i } @@ -100,6 +103,6 @@ func (c *criService) initPlatform() (err error) { } // cniLoadOptions returns cni load options for the linux. -func (c *criService) cniLoadOptions() []cni.Opt { - return []cni.Opt{cni.WithLoNetwork, cni.WithDefaultConf} +func (c *criService) cniLoadOptions() []cni.LoadOpt { + return []cni.LoadOpt{cni.WithLoNetwork, cni.WithDefaultConf} } diff --git a/pkg/cri/server/service_other.go b/pkg/cri/server/service_other.go index 40e864a027b2..d21a88a4dad4 100644 --- a/pkg/cri/server/service_other.go +++ b/pkg/cri/server/service_other.go @@ -19,7 +19,7 @@ package server import ( - cni "github.com/containerd/go-cni" + cni "github.com/containerd/containerd/pkg/net/compat" ) // initPlatform handles initialization for the CRI service on non-Windows and non-Linux @@ -29,6 +29,6 @@ func (c *criService) initPlatform() error { } // cniLoadOptions returns cni load options for non-Windows and non-Linux platforms. -func (c *criService) cniLoadOptions() []cni.Opt { - return []cni.Opt{} +func (c *criService) cniLoadOptions() []cni.LoadOpt { + return []cni.LoadOpt{} } diff --git a/pkg/cri/server/service_test.go b/pkg/cri/server/service_test.go index 3143cedf6c62..dbf737262eed 100644 --- a/pkg/cri/server/service_test.go +++ b/pkg/cri/server/service_test.go @@ -22,7 +22,6 @@ import ( "testing" "github.com/containerd/containerd/oci" - "github.com/containerd/go-cni" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,6 +32,7 @@ import ( "github.com/containerd/containerd/pkg/cri/store/label" sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" snapshotstore "github.com/containerd/containerd/pkg/cri/store/snapshot" + "github.com/containerd/containerd/pkg/net/compat" ostesting "github.com/containerd/containerd/pkg/os/testing" "github.com/containerd/containerd/pkg/registrar" ) @@ -40,6 +40,10 @@ import ( // newTestCRIService creates a fake criService for test. func newTestCRIService() *criService { labels := label.NewStore() + netp := &cniAdaptor{ + adapt: true, + g: servertesting.NewFakeCNIPlugin(), + } return &criService{ config: testConfig, imageFSPath: testImageFSPath, @@ -50,8 +54,8 @@ func newTestCRIService() *criService { sandboxNameIndex: registrar.NewRegistrar(), containerStore: containerstore.NewStore(labels), containerNameIndex: registrar.NewRegistrar(), - netPlugin: map[string]cni.CNI{ - defaultNetworkPlugin: servertesting.NewFakeCNIPlugin(), + netPlugin: map[string]compat.CNI{ + defaultNetworkPlugin: netp, }, } } diff --git a/pkg/cri/server/service_windows.go b/pkg/cri/server/service_windows.go index e71373164662..6107f6f5a345 100644 --- a/pkg/cri/server/service_windows.go +++ b/pkg/cri/server/service_windows.go @@ -19,7 +19,7 @@ package server import ( "fmt" - cni "github.com/containerd/go-cni" + cni "github.com/containerd/containerd/pkg/net/compat" ) // windowsNetworkAttachCount is the minimum number of networks the PodSandbox @@ -50,7 +50,8 @@ func (c *criService) initPlatform() error { // If there are more network configs the pod will be attached to all the // networks but we will only use the ip of the default network interface // as the pod IP. - i, err := cni.New(cni.WithMinNetworkCount(windowsNetworkAttachCount), + i, err := newCNIAdaptor(c.netAPI, name, + cni.WithMinNetworkCount(windowsNetworkAttachCount), cni.WithPluginConfDir(dir), cni.WithPluginMaxConfNum(max), cni.WithPluginDir([]string{c.config.NetworkPluginBinDir})) @@ -64,6 +65,6 @@ func (c *criService) initPlatform() error { } // cniLoadOptions returns cni load options for the windows. -func (c *criService) cniLoadOptions() []cni.Opt { - return []cni.Opt{cni.WithDefaultConf} +func (c *criService) cniLoadOptions() []cni.LoadOpt { + return []cni.LoadOpt{cni.WithDefaultConf} } diff --git a/pkg/cri/server/status.go b/pkg/cri/server/status.go index 7a702c57caaa..cb51bec32384 100644 --- a/pkg/cri/server/status.go +++ b/pkg/cri/server/status.go @@ -44,7 +44,7 @@ func (c *criService) Status(ctx context.Context, r *runtime.StatusRequest) (*run netPlugin := c.netPlugin[defaultNetworkPlugin] // Check the status of the cni initialization if netPlugin != nil { - if err := netPlugin.Status(); err != nil { + if err := netPlugin.Status(ctx); err != nil { networkCondition.Status = false networkCondition.Reason = networkNotReadyReason networkCondition.Message = fmt.Sprintf("Network plugin returns error: %v", err) @@ -71,7 +71,7 @@ func (c *criService) Status(ctx context.Context, r *runtime.StatusRequest) (*run resp.Info["golang"] = string(versionByt) if netPlugin != nil { - cniConfig, err := json.Marshal(netPlugin.GetConfig()) + cniConfig, err := json.Marshal(netPlugin.GetConfig(ctx)) if err != nil { log.G(ctx).WithError(err).Errorf("Failed to marshal CNI config %v", err) } diff --git a/pkg/cri/server/update_runtime_config.go b/pkg/cri/server/update_runtime_config.go index 52246746c824..733534784b5e 100644 --- a/pkg/cri/server/update_runtime_config.go +++ b/pkg/cri/server/update_runtime_config.go @@ -77,7 +77,7 @@ func (c *criService) UpdateRuntimeConfig(ctx context.Context, r *runtime.UpdateR } netStart := time.Now() - err = netPlugin.Status() + err = netPlugin.Status(ctx) networkPluginOperations.WithValues(networkStatusOp).Inc() networkPluginOperationsLatency.WithValues(networkStatusOp).UpdateSince(netStart) if err == nil { @@ -85,7 +85,7 @@ func (c *criService) UpdateRuntimeConfig(ctx context.Context, r *runtime.UpdateR return &runtime.UpdateRuntimeConfigResponse{}, nil } networkPluginOperationsErrors.WithValues(networkStatusOp).Inc() - if err := netPlugin.Load(c.cniLoadOptions()...); err == nil { + if err := netPlugin.Load(ctx, c.cniLoadOptions()...); err == nil { log.G(ctx).Infof("CNI config is successfully loaded, skip generating cni config from template %q", confTemplate) return &runtime.UpdateRuntimeConfigResponse{}, nil } diff --git a/pkg/cri/server/update_runtime_config_test.go b/pkg/cri/server/update_runtime_config_test.go index 6cb535ca4c8b..d3f54e6e2b91 100644 --- a/pkg/cri/server/update_runtime_config_test.go +++ b/pkg/cri/server/update_runtime_config_test.go @@ -119,8 +119,8 @@ func TestUpdateRuntimeConfig(t *testing.T) { req.RuntimeConfig.NetworkConfig.PodCidr = "" } if !test.networkReady { - c.netPlugin[defaultNetworkPlugin].(*servertesting.FakeCNIPlugin).StatusErr = errors.New("random error") - c.netPlugin[defaultNetworkPlugin].(*servertesting.FakeCNIPlugin).LoadErr = errors.New("random error") + c.netPlugin[defaultNetworkPlugin].(*cniAdaptor).g.(*servertesting.FakeCNIPlugin).StatusErr = errors.New("random error") + c.netPlugin[defaultNetworkPlugin].(*cniAdaptor).g.(*servertesting.FakeCNIPlugin).LoadErr = errors.New("random error") } _, err = c.UpdateRuntimeConfig(context.Background(), req) assert.NoError(t, err) diff --git a/pkg/net/api.go b/pkg/net/api.go new file mode 100644 index 000000000000..150762cbe72e --- /dev/null +++ b/pkg/net/api.go @@ -0,0 +1,115 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +import ( + "context" + + "github.com/containernetworking/cni/libcni" + types100 "github.com/containernetworking/cni/pkg/types/100" +) + +// API defines the top level interfaces exposed by the Networks Plugin +// Clients will these interfaces to create or locate Network Managers +type API interface { + // NetManager creates a Network Manager identified by the given name + NewManager(name string, opts ...ManagerOpt) (Manager, error) + + // Manager returns a Network Manager identified by the given name + Manager(name string) Manager +} + +// Manager defines the interfaces of a Network Manager. A network manager +// manages networks and attachments. +type Manager interface { + // Name returns the name of the network manager + Name() string + + // Create a network definition + Create(ctx context.Context, opts ...NetworkOpt) (Network, error) + + // Returns a network identified by the name + Network(ctx context.Context, name string) (Network, error) + + // List all network definition managed by this manager instance + List(ctx context.Context) []Network + + // Attachment returns a network attachment identified by id + Attachment(ctx context.Context, id string) (Attachment, error) +} + +// Network defines the interfaces of a Network definition. +// A network is a CNI concept that is modeled by a conf/conflist json file +type Network interface { + // Name returns the name of the network + Name() string + + // Manager returns the network manager that manages this network + Manager() string + + // Config returns the CNI NetworkConfigList that describes the network + Config() *libcni.NetworkConfigList + + // Update the network configuration + Update(ctx context.Context, opts ...NetworkOpt) error + + // Delete the network + Delete(ctx context.Context) error + + // Labels returns the labels associated with the network + Labels() map[string]string + + // Attach the network to a container/sandbox + Attach(ctx context.Context, opts ...AttachmentOpt) (Attachment, error) + + // List all attachments for this network + List(ctx context.Context) []Attachment +} + +// Attachment is a CNI concept that allows a container to “join” a network +type Attachment interface { + // ID returns the id of the attachment + ID() string + + // Manager returns the network manager that owns the attachment + Manager() string + + // Network returns the name of the network that this attachment is created for + Network() string + + // Container returns the id of the container/sandbox for this attachment + Container() string + + // IFName returns the interface name inside the NS namespace for this attachment + IFName() string + + // NSPath returns the network namespace for this attachment + NSPath() string + + // Result returns the CNI attachment result + Result() *types100.Result + + // GCOwnerLables returns the labels that should be applied to the owner object + // such as container/sandbox to establish ownership models for garbage collection + GCOwnerLables() map[string]string + + // Remove the attachment + Remove(ctx context.Context) error + + // Check whether the current status of the attachment is as expected + Check(ctx context.Context) error +} diff --git a/pkg/net/attachment.go b/pkg/net/attachment.go new file mode 100644 index 000000000000..74509351fbe2 --- /dev/null +++ b/pkg/net/attachment.go @@ -0,0 +1,131 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +import ( + "context" + "fmt" + "strings" + + "github.com/containerd/containerd/log" + "github.com/containernetworking/cni/libcni" + types100 "github.com/containernetworking/cni/pkg/types/100" + "github.com/sirupsen/logrus" +) + +type attachment struct { + attachmentRecord + cni libcni.CNI + store Store +} + +var _ Attachment = (*attachment)(nil) + +func (a *attachment) ID() string { + return a.id +} + +func (a *attachment) Manager() string { + return a.manager +} + +func (a *attachment) Network() string { + return a.network.Name +} + +func (a *attachment) Container() string { + return a.args.ContainerID +} + +func (a *attachment) IFName() string { + return a.args.IFName +} + +func (a *attachment) NSPath() string { + return a.args.NSPath +} + +func (a *attachment) Result() *types100.Result { + return a.result +} + +func (a *attachment) GCOwnerLables() map[string]string { + key := fmt.Sprintf("%s.%s/%s", GCRefPrefix, a.network.Name, a.args.IFName) + val := fmt.Sprintf("%s/%s", a.manager, a.id) + return map[string]string{key: val} +} + +func (a *attachment) Remove(ctx context.Context) error { + log.G(ctx).WithFields(logrus.Fields{ + "manager": a.Manager(), + "network": a.Network(), + "id": a.ID(), + }).Debugf("remove") + + return a.store.DeleteAttachment(ctx, a.manager, a.ID(), + func(ctx context.Context) error { + if err := a.cni.DelNetworkList(ctx, a.network, a.args.config()); err != nil { + // we can ignore not found error + if isNotFoundDelError(a.NSPath(), err) { + return nil + } + return err + } + return nil + }) +} + +func (a *attachment) Check(ctx context.Context) error { + log.G(ctx).WithFields(logrus.Fields{ + "manager": a.Manager(), + "network": a.Network(), + "id": a.ID(), + }).Debugf("check") + return a.cni.CheckNetworkList(ctx, a.network, a.args.config()) +} + +func attachmentFromRecord(r *attachmentRecord, cni libcni.CNI, store Store) *attachment { + return &attachment{ + attachmentRecord: *r, + cni: cni, + store: store, + } +} + +func createAttachmentID(net, ctrid, ifname string) string { + return strings.Join([]string{net, ctrid, ifname}, "/") +} + +func createAttachmentRecord(id, manager string, network *libcni.NetworkConfigList) *attachmentRecord { + return &attachmentRecord{ + id: id, + manager: manager, + network: network, + labels: make(map[string]string), + args: AttachmentArgs{ + CapabilityArgs: make(map[string]interface{}), + PluginArgs: make(map[string]string), + }, + } +} + +// isNotFoundError returns if the err returned by DelNetworkList is due to +// non-existing network resource +// see: https://github.com/containerd/go-cni/blob/f108694a587347b7b24ec27a1f9b709423faafd3/cni.go#L252 +func isNotFoundDelError(nsPath string, err error) bool { + return (nsPath == "" && strings.Contains(err.Error(), "no such file or directory")) || strings.Contains(err.Error(), "not found") +} diff --git a/pkg/net/attachment_test.go b/pkg/net/attachment_test.go new file mode 100644 index 000000000000..73a79e3c85e7 --- /dev/null +++ b/pkg/net/attachment_test.go @@ -0,0 +1,93 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +import ( + "context" + "testing" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAttachmentFromRecord(t *testing.T) { + rec := testAttachmentRecord(t) + att := attachmentFromRecord(rec, testCNI(t), testMemStore(t)) + assert.NotNil(t, att) + assert.Equal(t, att.attachmentRecord, *rec) +} + +func TestAttachmentObject(t *testing.T) { + obj := testAttachment(t) + assert.Equal(t, obj.ID(), TestAttachment) + assert.Equal(t, obj.Manager(), TestManager) + assert.Equal(t, obj.Network(), TestNetwork) + assert.Equal(t, obj.Container(), TestContainer) + assert.Equal(t, obj.IFName(), TestInterface) + assert.Equal(t, obj.NSPath(), TestNSPath) + if assert.Contains(t, obj.GCOwnerLables(), TestGCLBKey) { + assert.Equal(t, obj.GCOwnerLables()[TestGCLBKey], TestGCLBVal) + } +} + +func TestAttachmentRemove(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + att := testAttachment(t) + + c, ok := att.cni.(*mockCNI) + require.True(t, ok) + s, ok := att.store.(*mockStore) + require.True(t, ok) + + c.err = nil + assert.Error(t, att.Remove(ctx)) + + c.err = nil + s.atts[att.id] = &att.attachmentRecord + assert.NoError(t, att.Remove(ctx)) + assert.NotContains(t, s.atts, att.id) + + c.err = errdefs.ErrNotFound + s.atts[att.id] = &att.attachmentRecord + assert.NoError(t, att.Remove(ctx)) + assert.NotContains(t, s.atts, att.id) + + c.err = errdefs.ErrUnknown + s.atts[att.id] = &att.attachmentRecord + assert.Error(t, att.Remove(ctx)) + assert.Contains(t, s.atts, att.id) +} + +func TestAttachmentCheck(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + att := testAttachment(t) + + c, ok := att.cni.(*mockCNI) + require.True(t, ok) + + c.err = nil + assert.NoError(t, att.Check(ctx)) + + c.err = errdefs.ErrUnknown + assert.Error(t, att.Check(ctx)) +} + +func testAttachment(t *testing.T) *attachment { + return attachmentFromRecord(testAttachmentRecord(t), testCNI(t), testMemStore(t)) +} diff --git a/pkg/net/compat/gocni.go b/pkg/net/compat/gocni.go new file mode 100644 index 000000000000..2f52b7f4920a --- /dev/null +++ b/pkg/net/compat/gocni.go @@ -0,0 +1,315 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compat + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/containerd/containerd/pkg/net" + gocni "github.com/containerd/go-cni" + types100 "github.com/containernetworking/cni/pkg/types/100" +) + +const ( + CNIPluginName = "cni" + DefaultNetDir = "/etc/cni/net.d" + DefaultCNIDir = "/opt/cni/bin" + DefaultMaxConfNum = 1 + VendorCNIDirTemplate = "%s/opt/%s/bin" + DefaultPrefix = "eth" + DefaultManagerName = "default" +) + +// API defines the APIs for clients to create/locate +// go-cni style objects +type API interface { + NewCNI(name string, opts ...Opt) (CNI, error) + CNI(name string) CNI +} + +// CNI defines an interface which closely models the go-cni library. +// This interface is created to ease the effort of adapting the +// existing CRI implementation to the Network Plugin. +type CNI interface { + Setup(ctx context.Context, id string, path string, opts ...net.AttachmentOpt) (*Result, error) + SetupSerially(ctx context.Context, id string, path string, opts ...net.AttachmentOpt) (*Result, error) + Remove(ctx context.Context, id string, path string, opts ...net.AttachmentOpt) error + Check(ctx context.Context, id string, path string, opts ...net.AttachmentOpt) error + Load(ctx context.Context, opts ...LoadOpt) error + Status(ctx context.Context) error + GetConfig(ctx context.Context) *gocni.ConfigResult +} + +type Config struct { + PluginDirs []string + PluginConfDir string + PluginMaxConfNum int + Prefix string + NetworkCount int // minimum network plugin configurations needed to initialize cni +} + +type impl struct { + Config + m net.Manager + sync.RWMutex +} + +var _ CNI = (*impl)(nil) + +func defaultCNIConfig() *impl { + return &impl{ + Config: Config{ + PluginDirs: []string{DefaultCNIDir}, + PluginConfDir: DefaultNetDir, + PluginMaxConfNum: DefaultMaxConfNum, + Prefix: DefaultPrefix, + NetworkCount: 1, + }, + } +} + +func New(m net.Manager, config ...Opt) (CNI, error) { + cni := defaultCNIConfig() + + var err error + for _, c := range config { + if err = c(&cni.Config); err != nil { + return nil, err + } + } + + cni.m = m + return cni, nil +} + +func (c *impl) Setup(ctx context.Context, id string, path string, opts ...net.AttachmentOpt) (*Result, error) { + if err := c.Status(ctx); err != nil { + return nil, err + } + + opts = append(opts, net.WithContainer(id), net.WithNSPath(path)) + + results, err := c.attachNetworks(ctx, opts...) + if err != nil { + return nil, err + } + + return c.createResult(results) +} + +type attachResult struct { + r *types100.Result + l map[string]string +} + +type asynchAttachResult struct { + index int + res attachResult + err error +} + +func (c *impl) asynchAttach(ctx context.Context, index int, n net.Network, wg *sync.WaitGroup, rc chan asynchAttachResult, opts ...net.AttachmentOpt) { + defer wg.Done() + r, err := n.Attach(ctx, opts...) + + if err != nil { + rc <- asynchAttachResult{index: index, err: err} + return + } + + rc <- asynchAttachResult{ + index: index, + res: attachResult{ + r: r.Result(), + l: r.GCOwnerLables(), + }, + err: err, + } +} + +func (c *impl) attachNetworks(ctx context.Context, opts ...net.AttachmentOpt) ([]*attachResult, error) { + var wg sync.WaitGroup + var firstError error + + netlist := c.m.List(ctx) + results := make([]*attachResult, len(netlist)) + rc := make(chan asynchAttachResult) + + for i, n := range netlist { + wg.Add(1) + // make a copy below so that `ifname` may be appended to the duplicated slice + dopts := make([]net.AttachmentOpt, len(opts)) + copy(dopts, opts) + if ifname, ok := n.Labels()["ifname"]; ok { + dopts = append(dopts, net.WithIFName(ifname)) + } + go c.asynchAttach(ctx, i, n, &wg, rc, dopts...) + } + + for range netlist { + rs := <-rc + if rs.err != nil && firstError == nil { + firstError = rs.err + } + results[rs.index] = &rs.res + } + wg.Wait() + + return results, firstError +} + +func (c *impl) SetupSerially(ctx context.Context, id string, path string, opts ...net.AttachmentOpt) (*Result, error) { + if err := c.Status(ctx); err != nil { + return nil, err + } + opts = append(opts, net.WithContainer(id), net.WithNSPath(path)) + + var results []*attachResult + netlist := c.m.List(ctx) + + for _, n := range netlist { + // make a copy below so that `ifname` may be appended to the duplicated slice + dopts := make([]net.AttachmentOpt, len(opts)) + copy(dopts, opts) + if ifname, ok := n.Labels()["ifname"]; ok { + dopts = append(dopts, net.WithIFName(ifname)) + } + att, err := n.Attach(ctx, dopts...) + if err != nil { + return nil, err + } + results = append(results, &attachResult{r: att.Result(), l: att.GCOwnerLables()}) + } + return c.createResult(results) +} + +func (c *impl) Remove(ctx context.Context, id string, path string, opts ...net.AttachmentOpt) error { + if err := c.Status(ctx); err != nil { + return err + } + netlist := c.m.List(ctx) + + for _, n := range netlist { + attlist := n.List(ctx) + + for _, att := range attlist { + if att.Container() != id { + continue + } + if err := att.Remove(ctx); err != nil { + // Based on CNI spec v0.7.0, empty network namespace is allowed to + // do best effort cleanup. However, it is not handled consistently + // right now: + // https://github.com/containernetworking/plugins/issues/210 + // TODO(random-liu): Remove the error handling when the issue is + // fixed and the CNI spec v0.6.0 support is deprecated. + // NOTE(claudiub): Some CNIs could return a "not found" error, which could mean that + // it was already deleted. + if (att.NSPath() == "" && strings.Contains(err.Error(), "no such file or directory")) || strings.Contains(err.Error(), "not found") { + continue + } + return err + } + } + } + return nil +} + +func (c *impl) Check(ctx context.Context, id string, path string, opts ...net.AttachmentOpt) error { + if err := c.Status(ctx); err != nil { + return err + } + netlist := c.m.List(ctx) + for _, n := range netlist { + attlist := n.List(ctx) + for _, att := range attlist { + if att.Container() != id { + continue + } + err := att.Check(ctx) + if err != nil { + return err + } + } + } + return nil +} + +func (c *impl) Load(ctx context.Context, opts ...LoadOpt) error { + var err error + c.Lock() + defer c.Unlock() + // Reset the net on a load operation to ensure + // config happens on a clean slate + c.reset(ctx) + + for _, o := range opts { + if err = o(ctx, c); err != nil { + return fmt.Errorf("cni config load failed: %v: %w", err, gocni.ErrLoad) + } + } + return nil +} + +func (c *impl) Status(ctx context.Context) error { + c.RLock() + defer c.RUnlock() + nets := c.m.List(ctx) + if len(nets) < c.NetworkCount { + return gocni.ErrCNINotInitialized + } + return nil +} + +func (c *impl) GetConfig(ctx context.Context) *gocni.ConfigResult { + c.RLock() + defer c.RUnlock() + r := &gocni.ConfigResult{ + PluginDirs: c.Config.PluginDirs, + PluginConfDir: c.Config.PluginConfDir, + PluginMaxConfNum: c.Config.PluginMaxConfNum, + Prefix: c.Config.Prefix, + } + for _, network := range c.m.List(ctx) { + conf := &gocni.NetworkConfList{ + Name: network.Config().Name, + CNIVersion: network.Config().CNIVersion, + Source: string(network.Config().Bytes), + } + for _, plugin := range network.Config().Plugins { + conf.Plugins = append(conf.Plugins, &gocni.NetworkConf{ + Network: plugin.Network, + Source: string(plugin.Bytes), + }) + } + r.Networks = append(r.Networks, &gocni.ConfNetwork{ + Config: conf, + IFName: network.Labels()["ifname"], + }) + } + return r +} + +func (c *impl) reset(ctx context.Context) { + netList := c.m.List(ctx) + for _, n := range netList { + n.Delete(ctx) + } +} diff --git a/pkg/net/compat/helper.go b/pkg/net/compat/helper.go new file mode 100644 index 000000000000..ee6c3b099507 --- /dev/null +++ b/pkg/net/compat/helper.go @@ -0,0 +1,41 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compat + +import ( + "fmt" + + types100 "github.com/containernetworking/cni/pkg/types/100" +) + +func validateInterfaceConfig(ipConf *types100.IPConfig, ifs int) error { + if ipConf == nil { + return fmt.Errorf("invalid IP configuration (nil)") + } + if ipConf.Interface != nil && *ipConf.Interface > ifs { + return fmt.Errorf("invalid IP configuration (interface number %d is > number of interfaces %d)", *ipConf.Interface, ifs) + } + return nil +} + +func getIfName(prefix string, i int) string { + return fmt.Sprintf("%s%d", prefix, i) +} + +func defaultInterface(prefix string) string { + return getIfName(prefix, 0) +} diff --git a/pkg/net/compat/opts.go b/pkg/net/compat/opts.go new file mode 100644 index 000000000000..4800e4198fbf --- /dev/null +++ b/pkg/net/compat/opts.go @@ -0,0 +1,267 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compat + +import ( + "context" + "fmt" + "sort" + "strings" + + "github.com/containerd/containerd/pkg/net" + gocni "github.com/containerd/go-cni" + cnilibrary "github.com/containernetworking/cni/libcni" +) + +type Opt func(*Config) error +type LoadOpt func(context.Context, *impl) error + +// WithInterfacePrefix sets the prefix for network interfaces +// e.g. eth or wlan +func WithInterfacePrefix(prefix string) Opt { + return func(c *Config) error { + c.Prefix = prefix + return nil + } +} + +// WithPluginDir can be used to set the locations of +// the cni plugin binaries +func WithPluginDir(dirs []string) Opt { + return func(c *Config) error { + c.PluginDirs = dirs + return nil + } +} + +// WithPluginConfDir can be used to configure the +// cni configuration directory. +func WithPluginConfDir(dir string) Opt { + return func(c *Config) error { + c.PluginConfDir = dir + return nil + } +} + +// WithPluginMaxConfNum can be used to configure the +// max cni plugin config file num. +func WithPluginMaxConfNum(max int) Opt { + return func(c *Config) error { + c.PluginMaxConfNum = max + return nil + } +} + +// WithMinNetworkCount can be used to configure the +// minimum networks to be configured and initialized +// for the status to report success. By default its 1. +func WithMinNetworkCount(count int) Opt { + return func(c *Config) error { + c.NetworkCount = count + return nil + } +} + +// WithLoNetwork can be used to load the loopback +// network config. +func WithLoNetwork(ctx context.Context, c *impl) error { + loConfig, _ := cnilibrary.ConfListFromBytes([]byte(`{ +"cniVersion": "0.3.1", +"name": "cni-loopback", +"plugins": [{ + "type": "loopback" +}] +}`)) + + _, err := c.m.Create(ctx, + net.WithConflist(loConfig), + net.WithNetworkLabels(map[string]string{ + "ifname": "lo", + })) + return err +} + +// WithConf can be used to load config directly +// from byte. +func WithConf(bytes []byte) LoadOpt { + return WithConfIndex(bytes, 0) +} + +// WithConfIndex can be used to load config directly +// from byte and set the interface name's index. +func WithConfIndex(bytes []byte, index int) LoadOpt { + return func(ctx context.Context, c *impl) error { + conf, err := cnilibrary.ConfFromBytes(bytes) + if err != nil { + return err + } + confList, err := cnilibrary.ConfListFromConf(conf) + if err != nil { + return err + } + _, err = c.m.Create(ctx, + net.WithConflist(confList), + net.WithNetworkLabels(map[string]string{ + "ifname": getIfName(c.Prefix, index), + })) + return err + } +} + +// WithConfFile can be used to load network config +// from an .conf file. Supported with absolute fileName +// with path only. +func WithConfFile(fileName string) LoadOpt { + return func(ctx context.Context, c *impl) error { + conf, err := cnilibrary.ConfFromFile(fileName) + if err != nil { + return err + } + // upconvert to conf list + confList, err := cnilibrary.ConfListFromConf(conf) + if err != nil { + return err + } + _, err = c.m.Create(ctx, + net.WithConflist(confList), + net.WithNetworkLabels(map[string]string{ + "ifname": getIfName(c.Prefix, 0), + })) + return err + } +} + +// WithConfListBytes can be used to load network config list directly +// from byte +func WithConfListBytes(bytes []byte) LoadOpt { + return func(ctx context.Context, c *impl) error { + confList, err := cnilibrary.ConfListFromBytes(bytes) + if err != nil { + return err + } + i := len(c.m.List(ctx)) + _, err = c.m.Create(ctx, + net.WithConflist(confList), + net.WithNetworkLabels(map[string]string{ + "ifname": getIfName(c.Prefix, i), + })) + return err + } +} + +// WithConfListFile can be used to load network config +// from an .conflist file. Supported with absolute fileName +// with path only. +func WithConfListFile(fileName string) LoadOpt { + return func(ctx context.Context, c *impl) error { + confList, err := cnilibrary.ConfListFromFile(fileName) + if err != nil { + return err + } + i := len(c.m.List(ctx)) + _, err = c.m.Create(ctx, + net.WithConflist(confList), + net.WithNetworkLabels(map[string]string{ + "ifname": getIfName(c.Prefix, i), + })) + return err + } +} + +// WithDefaultConf can be used to detect the default network +// config file from the configured cni config directory and load +// it. +// Since the CNI spec does not specify a way to detect default networks, +// the convention chosen is - the first network configuration in the sorted +// list of network conf files as the default network. +func WithDefaultConf(ctx context.Context, c *impl) error { + return loadFromConfDir(ctx, c, c.PluginMaxConfNum) +} + +// WithAllConf can be used to detect all network config +// files from the configured cni config directory and load +// them. +func WithAllConf(ctx context.Context, c *impl) error { + return loadFromConfDir(ctx, c, 0) +} + +// loadFromConfDir detects network config files from the +// configured cni config directory and load them. max is +// the maximum network config to load (max i<= 0 means no limit). +func loadFromConfDir(ctx context.Context, c *impl, max int) error { + files, err := cnilibrary.ConfFiles(c.PluginConfDir, []string{".conf", ".conflist", ".json"}) + switch { + case err != nil: + return fmt.Errorf("failed to read config file: %v: %w", err, gocni.ErrRead) + case len(files) == 0: + return fmt.Errorf("no network config found in %s: %w", c.PluginConfDir, gocni.ErrCNINotInitialized) + } + + // files contains the network config files associated with cni network. + // Use lexicographical way as a defined order for network config files. + sort.Strings(files) + // Since the CNI spec does not specify a way to detect default networks, + // the convention chosen is - the first network configuration in the sorted + // list of network conf files as the default network and choose the default + // interface provided during init as the network interface for this default + // network. For every other network use a generated interface id. + i := 0 + //var networks []*Network + for _, confFile := range files { + var confList *cnilibrary.NetworkConfigList + if strings.HasSuffix(confFile, ".conflist") { + confList, err = cnilibrary.ConfListFromFile(confFile) + if err != nil { + return fmt.Errorf("failed to load CNI config list file %s: %v: %w", confFile, err, gocni.ErrInvalidConfig) + } + } else { + conf, err := cnilibrary.ConfFromFile(confFile) + if err != nil { + return fmt.Errorf("failed to load CNI config file %s: %v: %w", confFile, err, gocni.ErrInvalidConfig) + } + // Ensure the config has a "type" so we know what plugin to run. + // Also catches the case where somebody put a conflist into a conf file. + if conf.Network.Type == "" { + return fmt.Errorf("network type not found in %s: %w", confFile, gocni.ErrInvalidConfig) + } + + confList, err = cnilibrary.ConfListFromConf(conf) + if err != nil { + return fmt.Errorf("failed to convert CNI config file %s to CNI config list: %v: %w", confFile, err, gocni.ErrInvalidConfig) + } + } + if len(confList.Plugins) == 0 { + return fmt.Errorf("CNI config list in config file %s has no networks, skipping: %w", confFile, gocni.ErrInvalidConfig) + + } + if _, err := c.m.Create(ctx, + net.WithConflist(confList), + net.WithNetworkLabels(map[string]string{ + "ifname": getIfName(c.Prefix, i), + })); err != nil { + return err + } + i++ + if i == max { + break + } + } + if len(c.m.List(ctx)) == 0 { + return fmt.Errorf("no valid networks found in %s: %w", c.PluginDirs, gocni.ErrCNINotInitialized) + } + return nil +} diff --git a/pkg/net/compat/result.go b/pkg/net/compat/result.go new file mode 100644 index 000000000000..efc2bc1bf4bc --- /dev/null +++ b/pkg/net/compat/result.go @@ -0,0 +1,106 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compat + +import ( + "fmt" + + gocni "github.com/containerd/go-cni" + types100 "github.com/containernetworking/cni/pkg/types/100" +) + +type Result struct { + gocni.Result + Labels map[string]string + raw []*types100.Result +} + +// Raw returns the raw CNI results of multiple networks. +func (r *Result) Raw() []*types100.Result { + return r.raw +} + +// createResult creates a Result from the given slice of types100.Result, adding +// structured data containing the interface configuration for each of the +// interfaces created in the namespace. It returns an error if validation of +// results fails, or if a network could not be found. +func (c *impl) createResult(results []*attachResult) (*Result, error) { + c.RLock() + defer c.RUnlock() + r := &Result{ + Result: gocni.Result{ + Interfaces: make(map[string]*gocni.Config), + }, + Labels: make(map[string]string), + //raw: results, + } + + // Plugins may not need to return Interfaces in result if + // there are no multiple interfaces created. In that case + // all configs should be applied against default interface + r.Interfaces[defaultInterface(c.Prefix)] = &gocni.Config{} + + // Walk through all the results + for _, result := range results { + // Walk through all the interface in each result + for _, intf := range result.r.Interfaces { + r.Interfaces[intf.Name] = &gocni.Config{ + Mac: intf.Mac, + Sandbox: intf.Sandbox, + } + } + // Walk through all the IPs in the result and attach it to corresponding + // interfaces + for _, ipConf := range result.r.IPs { + if err := validateInterfaceConfig(ipConf, len(result.r.Interfaces)); err != nil { + return nil, fmt.Errorf("invalid interface config: %v: %w", err, gocni.ErrInvalidResult) + } + name := c.getInterfaceName(result.r.Interfaces, ipConf) + r.Interfaces[name].IPConfigs = append(r.Interfaces[name].IPConfigs, + &gocni.IPConfig{IP: ipConf.Address.IP, Gateway: ipConf.Gateway}) + } + r.DNS = append(r.DNS, result.r.DNS) + r.Routes = append(r.Routes, result.r.Routes...) + + // Walk through all owner labels in the result + for k, v := range result.l { + r.Labels[k] = v + } + } + if _, ok := r.Interfaces[defaultInterface(c.Prefix)]; !ok { + return nil, fmt.Errorf("default network not found for: %s: %w", defaultInterface(c.Prefix), gocni.ErrNotFound) + } + return r, nil +} + +// getInterfaceName returns the interface name if the plugins +// return the result with associated interfaces. If interface +// is not present then default interface name is used +func (c *impl) getInterfaceName(interfaces []*types100.Interface, + ipConf *types100.IPConfig) string { + if ipConf.Interface != nil { + return interfaces[*ipConf.Interface].Name + } + return defaultInterface(c.Prefix) +} + +func WrapResult(result *gocni.Result) *Result { + return &Result{ + Result: *result, + raw: result.Raw(), + } +} diff --git a/pkg/net/gc.go b/pkg/net/gc.go new file mode 100644 index 000000000000..6ec163e50676 --- /dev/null +++ b/pkg/net/gc.go @@ -0,0 +1,260 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +import ( + "context" + "fmt" + "strings" + + "github.com/containerd/containerd/gc" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/namespaces" + bolt "go.etcd.io/bbolt" +) + +const ( + GCRefPrefix string = "containerd.io/gc.ref.network" +) + +type resourceCollector struct { + api API + db DB +} + +type gcContext struct { + api API + tx *bolt.Tx + removed []gc.Node +} + +func NewResourceCollector(api API, db DB) metadata.Collector { + return &resourceCollector{ + api: api, + db: db, + } +} + +func (rc *resourceCollector) StartCollection(ctx context.Context) (metadata.CollectionContext, error) { + log.G(ctx).Tracef("[GC] start network collection") + tx, err := rc.db.Begin(true) + if err != nil { + return nil, err + } + return &gcContext{ + api: rc.api, + tx: tx, + }, nil +} + +func (rc *resourceCollector) ReferenceLabel() string { + return "network" +} + +func (c *gcContext) All(fn func(gc.Node)) { + // db version + vbkt := c.tx.Bucket(bucketKeyVersion) + if vbkt == nil { + return + } + + vbkt.ForEach(func(nKey, _ []byte) error { + // namespace + nbkt := vbkt.Bucket(nKey) + if nbkt == nil { + return nil + } + + nbkt.ForEach(func(mKey, _ []byte) error { + // manager + mbkt := nbkt.Bucket(mKey) + if mbkt == nil { + return nil + } + + // attachments + abkt := mbkt.Bucket(bucketKeyAttachments) + if abkt == nil { + return nil + } + + abkt.ForEach(func(aKey, _ []byte) error { + // attachment + fn(gcnode(metadata.ResourceNetwork, string(nKey), createKey(string(mKey), string(aKey)))) + return nil + }) + return nil + }) + return nil + }) +} + +func (c *gcContext) Active(string, func(gc.Node)) { + // noop, use the default gc that scans labels of existing containers +} + +func (c *gcContext) Leased(namespace, lease string, fn func(gc.Node)) { + // namespace + nbkt := getBucket(c.tx, []byte(namespace)) + if nbkt == nil { + return + } + + nbkt.ForEach(func(mKey, _ []byte) error { + // manager + mbkt := nbkt.Bucket(mKey) + if mbkt == nil { + return nil + } + + // leases + lbkt := mbkt.Bucket(bucketKeyLeases) + if lbkt == nil { + return nil + } + + // target lease + bkt := lbkt.Bucket([]byte(lease)) + if bkt == nil { + return nil + } + + bkt.ForEach(func(aKey, _ []byte) error { + // attachment + fn(gcnode(metadata.ResourceNetwork, namespace, createKey(string(mKey), string(aKey)))) + return nil + }) + + return nil + }) +} + +func (c *gcContext) Remove(n gc.Node) { + if n.Type == metadata.ResourceNetwork { + log.L.Debugf("[GC] remove %v", n) + c.removed = append(c.removed, n) + } +} + +func (c *gcContext) Cancel() error { + log.L.Tracef("[GC] cancel network collection") + c.tx.Rollback() + c.removed = nil + return nil +} + +func (c *gcContext) Finish() error { + log.L.Tracef("[GC] finish network collection") + + for _, node := range c.removed { + ctx := withTransactionContext(namespaces.WithNamespace(context.Background(), node.Namespace), c.tx) + mgr, attachID := decomposeKey(node.Key) + + m := c.api.Manager(mgr) + if m == nil { + log.L.Errorf("manager %q not found", mgr) + continue + } + + att, err := m.Attachment(ctx, attachID) + if err != nil { + log.L.Warnf("attachment %q not found", attachID) + } + + if err := att.Remove(ctx); err != nil { + log.G(ctx).WithError(err).Errorf("failed to detach %q", attachID) + } + } + + // make sure empty namespaces are deleted in db + cleanupNamespaces(c.tx) + + c.tx.Commit() + c.removed = nil + return nil +} + +func cleanupNamespaces(tx *bolt.Tx) error { + log.L.Debugf("checking and cleaning up namespaces") + // version + vbkt := tx.Bucket(bucketKeyVersion) + if vbkt == nil { + log.L.Errorf("version bucket not found") + return nil + } + + var nsl [][]byte + // iterate through namespace buckets + nc := vbkt.Cursor() + for nk, _ := nc.First(); nk != nil; nk, _ = nc.Next() { + empty := true + + // namespace + nbkt := vbkt.Bucket(nk) + if nbkt == nil { + continue + } + + // iterate through manager buckets + c := nbkt.Cursor() + for mk, _ := c.First(); mk != nil; mk, _ = c.Next() { + // manager + mbkt := nbkt.Bucket(mk) + // networks + bkt := mbkt.Bucket(bucketKeyNetworks) + if !bucketEmpty(bkt) { + empty = false + break + } + // attachments + abkt := mbkt.Bucket(bucketKeyAttachments) + if !bucketEmpty(abkt) { + empty = false + break + } + } + if empty { + nsl = append(nsl, nk) + } + } + + for _, ns := range nsl { + if err := vbkt.DeleteBucket(ns); err != nil { + log.L.WithError(err).Debugf("failed to delete namespace bucket %s", string(ns)) + } + } + + return nil +} + +func gcnode(t gc.ResourceType, ns, key string) gc.Node { + return gc.Node{ + Type: t, + Namespace: ns, + Key: key, + } +} + +func createKey(manager, attachID string) string { + return fmt.Sprintf("%s/%s", manager, attachID) +} + +func decomposeKey(key string) (string, string) { + idx := strings.Index(key, "/") + return key[:idx], key[idx+1:] +} diff --git a/pkg/net/gc_test.go b/pkg/net/gc_test.go new file mode 100644 index 000000000000..a81113f0786e --- /dev/null +++ b/pkg/net/gc_test.go @@ -0,0 +1,125 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +import ( + "context" + "strings" + "testing" + + "github.com/containerd/containerd/gc" + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/namespaces" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + bolt "go.etcd.io/bbolt" +) + +func TestGC(t *testing.T) { + ctx := leases.WithLease(namespaces.WithNamespace(context.Background(), TestNamespace), TestLease) + store := testDBStore(t) + api := &mockAPI{m: testManager(t, store)} + collector := NewResourceCollector(api, store.db) + assert.Equal(t, collector.ReferenceLabel(), "network") + + manager, err := api.NewManager(TestManager) + require.NoError(t, err) + + nw, err := manager.Create(ctx, WithConflist(testNetworkConfList(t))) + require.NoError(t, err) + _, err = nw.Attach(ctx, WithContainer(TestContainer), WithNSPath(TestNSPath), WithIFName(TestInterface)) + require.NoError(t, err) + + gctx, err := collector.StartCollection(ctx) + require.NoError(t, err) + + node := gcnode(metadata.ResourceNetwork, TestNamespace, strings.Join([]string{TestManager, TestAttachment}, "/")) + found := func(n gc.Node) { + assert.Equal(t, n, node) + } + notfound := func(n gc.Node) { + assert.Fail(t, "not expected to be reached") + } + + gctx.All(found) + gctx.Active(TestNamespace, found) + gctx.Active("non-existing-ns", notfound) + gctx.Leased(TestNamespace, TestLease, found) + gctx.Leased("non-existing-ns", TestLease, notfound) + gctx.Leased(TestNamespace, "non-existing-lease", notfound) + gctx.Remove(node) + assert.NoError(t, gctx.Finish()) + + _, err = manager.Attachment(ctx, TestAttachment) + assert.Error(t, err) +} + +func TestGCAfterRemove(t *testing.T) { + ctx := leases.WithLease(namespaces.WithNamespace(context.Background(), TestNamespace), TestLease) + store := testDBStore(t) + api := &mockAPI{m: testManager(t, store)} + collector := NewResourceCollector(api, store.db) + assert.Equal(t, collector.ReferenceLabel(), "network") + + manager, err := api.NewManager(TestManager) + require.NoError(t, err) + + nw, err := manager.Create(ctx, WithConflist(testNetworkConfList(t))) + require.NoError(t, err) + att, err := nw.Attach(ctx, WithContainer(TestContainer), WithNSPath(TestNSPath), WithIFName(TestInterface)) + require.NoError(t, err) + require.NoError(t, att.Remove(ctx)) + + gctx, err := collector.StartCollection(ctx) + require.NoError(t, err) + + notfound := func(n gc.Node) { + assert.Fail(t, "not expected to be reached") + } + + gctx.All(notfound) + gctx.Active(TestNamespace, notfound) + gctx.Leased(TestNamespace, TestLease, notfound) + assert.NoError(t, gctx.Finish()) +} + +func TestGCNamespaceCleanup(t *testing.T) { + ctx := leases.WithLease(namespaces.WithNamespace(context.Background(), TestNamespace), TestLease) + store := testDBStore(t) + api := &mockAPI{m: testManager(t, store)} + collector := NewResourceCollector(api, store.db) + assert.Equal(t, collector.ReferenceLabel(), "network") + + manager, err := api.NewManager(TestManager) + require.NoError(t, err) + nw, err := manager.Create(ctx, WithConflist(testNetworkConfList(t))) + require.NoError(t, err) + att, err := nw.Attach(ctx, WithContainer(TestContainer), WithNSPath(TestNSPath), WithIFName(TestInterface)) + require.NoError(t, err) + require.NoError(t, att.Remove(ctx)) + require.NoError(t, nw.Delete(ctx)) + + gctx, err := collector.StartCollection(ctx) + require.NoError(t, err) + require.NoError(t, gctx.Finish()) + + store.db.View(func(tx *bolt.Tx) error { + assert.Nil(t, getBucket(tx, []byte(TestNamespace))) + return nil + }) +} diff --git a/pkg/net/manager.go b/pkg/net/manager.go new file mode 100644 index 000000000000..ccb3c1aae9e3 --- /dev/null +++ b/pkg/net/manager.go @@ -0,0 +1,133 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +import ( + "context" + "os" + + "github.com/containerd/containerd/log" + "github.com/containernetworking/cni/libcni" + "github.com/containernetworking/cni/pkg/invoke" + "github.com/containernetworking/cni/pkg/version" + "github.com/sirupsen/logrus" +) + +const ( + DefaultCNIDir = "/opt/cni/bin" +) + +type manager struct { + name string + cni libcni.CNI + store Store +} + +var _ Manager = (*manager)(nil) + +func NewManager(name string, store Store, opts ...ManagerOpt) (Manager, error) { + cfg := ManagerConfig{ + PluginDirs: []string{DefaultCNIDir}, + } + for _, o := range opts { + if err := o(&cfg); err != nil { + return nil, err + } + } + + return &manager{ + name: name, + store: store, + cni: libcni.NewCNIConfig( + cfg.PluginDirs, + &invoke.DefaultExec{ + RawExec: &invoke.RawExec{Stderr: os.Stderr}, + PluginDecoder: version.PluginDecoder{}, + }, + ), + }, nil +} + +func (m *manager) Name() string { + return m.name +} + +func (m *manager) Create(ctx context.Context, opts ...NetworkOpt) (Network, error) { + c := NetworkConfig{} + + for _, o := range opts { + if err := o(&c); err != nil { + return nil, err + } + } + + if err := c.validate(); err != nil { + return nil, err + } + + log.G(ctx).WithFields(logrus.Fields{"manager": m.name, "network": c.Conflist.Name}).Debugf("create network") + + r := &networkRecord{ + manager: m.name, + config: c.Conflist, + labels: c.Labels, + } + + if err := m.store.CreateNetwork(ctx, r); err != nil { + return nil, err + } + + return networkFromRecord(r, m.cni, m.store), nil +} + +func (m *manager) Network(ctx context.Context, name string) (Network, error) { + log.G(ctx).WithField("manager", m.name).WithField("network", name).Debugf("get network") + + r, err := m.store.GetNetwork(ctx, m.name, name) + if err != nil { + return nil, err + } + + return networkFromRecord(r, m.cni, m.store), nil +} + +func (m *manager) List(ctx context.Context) []Network { + log.G(ctx).WithField("manager", m.name).Debugf("list networks") + + var networks []Network + + if err := m.store.WalkNetworks(ctx, m.name, func(r *networkRecord) error { + networks = append(networks, networkFromRecord(r, m.cni, m.store)) + return nil + + }); err != nil { + log.G(ctx).WithError(err).Errorf("failed to walk networks") + return nil + } + + return networks +} + +func (m *manager) Attachment(ctx context.Context, id string) (Attachment, error) { + log.G(ctx).WithFields(logrus.Fields{"manager": m.name, "attachment": id}).Debugf("get attachment") + + ar, err := m.store.GetAttachment(ctx, m.name, id) + if err != nil { + return nil, err + } + return attachmentFromRecord(ar, m.cni, m.store), nil +} diff --git a/pkg/net/manager_test.go b/pkg/net/manager_test.go new file mode 100644 index 000000000000..86473a21dfaf --- /dev/null +++ b/pkg/net/manager_test.go @@ -0,0 +1,311 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +import ( + "context" + "testing" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" + "github.com/containernetworking/cni/libcni" + "github.com/containernetworking/cni/pkg/types" + types100 "github.com/containernetworking/cni/pkg/types/100" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestManagerObject(t *testing.T) { + m := testManager(t, testMemStore(t)) + assert.Equal(t, m.Name(), TestManager) +} + +func TestManagerCreate(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + s := testMemStore(t) + m := testManager(t, s) + n, err := m.Create(ctx, WithConflist(testNetworkConfList(t))) + assert.NoError(t, err) + assert.Equal(t, len(s.nets), 1) + _, err = m.Create(ctx, WithConflist(testNetworkConfList(t))) + assert.Error(t, err) + require.NoError(t, n.Delete(ctx)) + assert.Empty(t, s.nets) +} + +func TestManagerNetwork(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + s := testMemStore(t) + m := testManager(t, s) + _, err := m.Network(ctx, TestNetwork) + assert.Error(t, err) + + net := testNetwork(t) + s.nets[TestNetwork] = &net.networkRecord + nr, err := m.Network(ctx, TestNetwork) + if assert.NoError(t, err) { + assert.Equal(t, net.networkRecord, nr.(*network).networkRecord) + } + delete(s.nets, TestNetwork) + _, err = m.Network(ctx, TestNetwork) + assert.Error(t, err) +} + +func TestManagerList(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + s := testMemStore(t) + m := testManager(t, s) + _, err := m.Network(ctx, TestNetwork) + assert.Error(t, err) + n1, err := m.Create(ctx, WithConflist(testNetworkConfList(t))) + require.NoError(t, err) + ls := m.List(ctx) + assert.Equal(t, len(ls), 1) + for _, n := range ls { + assert.Equal(t, n1, n) + } + assert.NoError(t, n1.Delete(ctx)) + ls = m.List(ctx) + assert.Empty(t, ls) +} + +func TestManagerAttachment(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + s := testMemStore(t) + m := testManager(t, s) + + att := testAttachment(t) + s.atts[TestAttachment] = &att.attachmentRecord + s.nets[TestNetwork] = createNetworkRecord(att.manager, att.network) + + ar, err := m.Attachment(ctx, att.ID()) + if assert.NoError(t, err) { + assert.Equal(t, att.attachmentRecord, ar.(*attachment).attachmentRecord) + assert.Equal(t, att.network, ar.(*attachment).network) + } + + delete(s.nets, att.Network()) + // we expect to use the cached network config associated with the + // attachment even if the original network definition is deleted + _, err = m.Attachment(ctx, TestAttachment) + assert.NoError(t, err) + + delete(s.atts, TestAttachment) + _, err = m.Attachment(ctx, TestAttachment) + assert.Error(t, err) +} + +type mockAPI struct { + m *manager +} + +type mockCNI struct { + err error + res *types100.Result + cl *libcni.NetworkConfigList + rt *libcni.RuntimeConf +} + +type mockStore struct { + err error + nets map[string]*networkRecord + atts map[string]*attachmentRecord +} + +var ( + _ API = (*mockAPI)(nil) + _ libcni.CNI = (*mockCNI)(nil) + _ Store = (*mockStore)(nil) +) + +func (api *mockAPI) NewManager(name string, opts ...ManagerOpt) (Manager, error) { + return api.m, nil +} + +func (api *mockAPI) Manager(name string) Manager { + return api.m +} + +func (c *mockCNI) AddNetworkList(ctx context.Context, net *libcni.NetworkConfigList, rt *libcni.RuntimeConf) (types.Result, error) { + if c.err != nil { + return nil, c.err + } + c.cl, c.rt = net, rt + return c.res, nil +} + +func (c *mockCNI) CheckNetworkList(ctx context.Context, net *libcni.NetworkConfigList, rt *libcni.RuntimeConf) error { + c.cl, c.rt = net, rt + return c.err +} + +func (c *mockCNI) DelNetworkList(ctx context.Context, net *libcni.NetworkConfigList, rt *libcni.RuntimeConf) error { + c.cl, c.rt = net, rt + return c.err +} + +func (c *mockCNI) GetNetworkListCachedResult(net *libcni.NetworkConfigList, rt *libcni.RuntimeConf) (types.Result, error) { + return nil, errdefs.ErrNotImplemented +} + +func (c *mockCNI) GetNetworkListCachedConfig(net *libcni.NetworkConfigList, rt *libcni.RuntimeConf) ([]byte, *libcni.RuntimeConf, error) { + return nil, nil, errdefs.ErrNotImplemented +} + +func (c *mockCNI) AddNetwork(ctx context.Context, net *libcni.NetworkConfig, rt *libcni.RuntimeConf) (types.Result, error) { + return nil, errdefs.ErrNotImplemented +} + +func (c *mockCNI) CheckNetwork(ctx context.Context, net *libcni.NetworkConfig, rt *libcni.RuntimeConf) error { + return errdefs.ErrNotImplemented +} + +func (c *mockCNI) DelNetwork(ctx context.Context, net *libcni.NetworkConfig, rt *libcni.RuntimeConf) error { + return errdefs.ErrNotImplemented +} + +func (c *mockCNI) GetNetworkCachedResult(net *libcni.NetworkConfig, rt *libcni.RuntimeConf) (types.Result, error) { + return nil, errdefs.ErrNotImplemented +} + +func (c *mockCNI) GetNetworkCachedConfig(net *libcni.NetworkConfig, rt *libcni.RuntimeConf) ([]byte, *libcni.RuntimeConf, error) { + return nil, nil, errdefs.ErrNotImplemented +} + +func (c *mockCNI) ValidateNetworkList(ctx context.Context, net *libcni.NetworkConfigList) ([]string, error) { + return nil, errdefs.ErrNotImplemented +} + +func (c *mockCNI) ValidateNetwork(ctx context.Context, net *libcni.NetworkConfig) ([]string, error) { + return nil, errdefs.ErrNotImplemented +} + +func (s *mockStore) CreateNetwork(ctx context.Context, r *networkRecord) error { + if _, ok := s.nets[r.config.Name]; ok { + return errdefs.ErrAlreadyExists + } + if s.err != nil { + return s.err + } + s.nets[r.config.Name] = r + return nil +} + +func (s *mockStore) UpdateNetwork(ctx context.Context, r *networkRecord) error { + if s.err != nil { + return s.err + } + s.nets[r.config.Name] = r + return nil +} + +func (s *mockStore) GetNetwork(ctx context.Context, manager, id string) (*networkRecord, error) { + if s.err != nil { + return nil, s.err + } + r, ok := s.nets[id] + if !ok { + return nil, errdefs.ErrNotFound + } + return r, nil +} + +func (s *mockStore) DeleteNetwork(ctx context.Context, manager, id string) error { + _, err := s.GetNetwork(ctx, manager, id) + if err != nil { + return err + } + delete(s.nets, id) + return nil +} + +func (s *mockStore) WalkNetworks(ctx context.Context, manager string, fn func(*networkRecord) error) error { + for _, v := range s.nets { + fn(v) + } + return nil +} + +func (s *mockStore) CreateAttachment(ctx context.Context, r *attachmentRecord, creator Creator, deleter Deleter) error { + if _, ok := s.atts[r.id]; ok { + return errdefs.ErrAlreadyExists + } + res, err := creator(ctx) + if err != nil { + return err + } + r.result = res + if s.err != nil { + deleter(ctx) + return s.err + } + s.atts[r.id] = r + return nil +} + +func (s *mockStore) UpdateAttachment(ctx context.Context, r *attachmentRecord, updater Updater) error { + return errdefs.ErrNotImplemented +} + +func (s *mockStore) GetAttachment(ctx context.Context, manager, id string) (*attachmentRecord, error) { + if s.err != nil { + return nil, s.err + } + att, ok := s.atts[id] + if !ok { + return nil, errdefs.ErrNotFound + } + return att, nil +} + +func (s *mockStore) DeleteAttachment(ctx context.Context, manager, id string, deleter Deleter) error { + _, err := s.GetAttachment(ctx, manager, id) + if err != nil { + return err + } + if err := deleter(ctx); err != nil { + return err + } + delete(s.atts, id) + return nil +} + +func (s *mockStore) WalkAttachments(ctx context.Context, manager, network string, fn func(*attachmentRecord) error) error { + for _, v := range s.atts { + fn(v) + } + return nil +} + +func testManager(t *testing.T, store Store) *manager { + return &manager{ + name: TestManager, + cni: testCNI(t), + store: store, + } +} + +func testCNI(t *testing.T) libcni.CNI { + return &mockCNI{res: testAttachmentResult(t)} +} + +func testMemStore(t *testing.T) *mockStore { + return &mockStore{ + nets: make(map[string]*networkRecord), + atts: make(map[string]*attachmentRecord), + } +} diff --git a/pkg/net/network.go b/pkg/net/network.go new file mode 100644 index 000000000000..9431d837e366 --- /dev/null +++ b/pkg/net/network.go @@ -0,0 +1,190 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +import ( + "context" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/log" + "github.com/containernetworking/cni/libcni" + types100 "github.com/containernetworking/cni/pkg/types/100" + "github.com/sirupsen/logrus" +) + +type network struct { + networkRecord + cni libcni.CNI + store Store +} + +var _ Network = (*network)(nil) + +func (n *network) Name() string { + return n.config.Name +} + +func (n *network) Manager() string { + return n.manager +} + +func (n *network) Config() *libcni.NetworkConfigList { + return n.config +} + +func (n *network) Labels() map[string]string { + r := make(map[string]string) + for k, v := range n.labels { + r[k] = v + } + return r +} + +func (n *network) Update(ctx context.Context, opts ...NetworkOpt) error { + log.G(ctx).WithFields(logrus.Fields{"manager:": n.manager, "network:": n.config.Name}).Debugf("update network") + + c := NetworkConfig{} + for _, o := range opts { + if err := o(&c); err != nil { + return err + } + } + if c.Conflist.Name != n.Name() { + return errdefs.ErrInvalidArgument + } + if err := c.validate(); err != nil { + return err + } + + update := networkRecord{ + manager: n.manager, + config: c.Conflist, + labels: c.Labels, + } + if err := n.store.UpdateNetwork(ctx, &update); err != nil { + return err + } + n.networkRecord = update + + return nil +} + +func (n *network) Delete(ctx context.Context) error { + log.G(ctx).WithField("manager", n.manager).WithField("network", n.config.Name).Debugf("delete network") + + return n.store.DeleteNetwork(ctx, n.manager, n.config.Name) +} + +func (n *network) Attach(ctx context.Context, opts ...AttachmentOpt) (Attachment, error) { + rec := createAttachmentRecord("", n.manager, n.config) + for _, o := range opts { + if err := o(&rec.args); err != nil { + return nil, err + } + } + + if err := rec.args.validate(); err != nil { + return nil, err + } + + rec.id = createAttachmentID(n.config.Name, rec.args.ContainerID, rec.args.IFName) + + log.G(ctx).WithFields(logrus.Fields{ + "manager": rec.manager, + "id": rec.id, + "nspath": rec.args.NSPath, + }).Debugf("attach") + + leaseID, ok := leases.FromContext(ctx) + if ok { + rec.labels["lease"] = leaseID + } + + creator := func(ctx context.Context) (*types100.Result, error) { + result, err := n.cni.AddNetworkList(ctx, n.config, rec.args.config()) + if err != nil { + return nil, err + } + return types100.NewResultFromResult(result) + } + + deleter := func(ctx context.Context) error { + if err := n.cni.DelNetworkList(ctx, n.config, rec.args.config()); err != nil { + // ignore not found error + if isNotFoundDelError(rec.args.NSPath, err) { + return nil + } + return err + } + return nil + } + + if err := n.store.CreateAttachment(ctx, rec, creator, deleter); err != nil { + return nil, err + } + + return attachmentFromRecord(rec, n.cni, n.store), nil +} + +func (n *network) List(ctx context.Context) []Attachment { + var attachList []Attachment + + if err := n.Walk(ctx, func(r *attachment) error { + attachList = append(attachList, r) + return nil + + }); err != nil { + return nil + } + + return attachList +} + +func (n *network) Walk(ctx context.Context, fn func(*attachment) error) error { + var attachList []*attachment + + if err := n.store.WalkAttachments(ctx, n.manager, n.config.Name, func(rec *attachmentRecord) error { + attachList = append(attachList, attachmentFromRecord(rec, n.cni, n.store)) + return nil + + }); err != nil { + return err + } + + for _, att := range attachList { + fn(att) + } + + return nil +} + +func networkFromRecord(r *networkRecord, cni libcni.CNI, store Store) *network { + return &network{ + networkRecord: *r, + cni: cni, + store: store, + } +} + +func createNetworkRecord(manager string, conflist *libcni.NetworkConfigList) *networkRecord { + return &networkRecord{ + manager: manager, + config: conflist, + labels: make(map[string]string), + } +} diff --git a/pkg/net/network_test.go b/pkg/net/network_test.go new file mode 100644 index 000000000000..96ddd556e9f5 --- /dev/null +++ b/pkg/net/network_test.go @@ -0,0 +1,158 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +import ( + "context" + "testing" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" + "github.com/containernetworking/cni/libcni" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNetworkFromRecord(t *testing.T) { + rec := testNetworkRecord(t) + n := networkFromRecord(rec, testCNI(t), testMemStore(t)) + assert.NotNil(t, n) + assert.Equal(t, n.networkRecord, *rec) +} + +func TestNetworkObject(t *testing.T) { + obj := testNetwork(t) + assert.Equal(t, obj.Name(), TestNetwork) + assert.Equal(t, obj.Manager(), TestManager) + assert.Equal(t, obj.Config(), testNetworkConfList(t)) +} + +func TestNetworkUpdate(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + n := testNetwork(t) + c, ok := n.cni.(*mockCNI) + require.True(t, ok) + att, err := n.Attach(ctx, WithContainer(TestContainer), WithIFName(TestInterface)) + require.NoError(t, err) + + old := n.Config() + updated, err := libcni.ConfListFromBytes([]byte(`{ + "cniVersion": "0.3.1", + "name": "testnet", + "plugins": [{ + "type": "bridge", + "bridge": "cni0", + "ipam": { + "type": "host-local", + "subnet": "10.10.0.0/16" + } + }] + }`)) + require.NoError(t, err) + + // In case of network definition update, we expect existing + // attachments to use the original network definitions + // for check/delete operations + assert.NoError(t, n.Update(ctx, WithConflist(updated))) + assert.NoError(t, att.Remove(ctx)) + assert.Equal(t, old, c.cl) + + notfound, err := libcni.ConfListFromBytes([]byte(`{ + "cniVersion": "0.3.1", + "name": "notfound", + "plugins": [{ + "type": "bridge", + "bridge": "cni0", + "ipam": { + "type": "host-local", + "subnet": "10.10.0.0/16" + } + }] + }`)) + require.NoError(t, err) + assert.Error(t, n.Update(ctx, WithConflist(notfound))) +} + +func TestNetworkDelete(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + n := testNetwork(t) + + c, ok := n.cni.(*mockCNI) + require.True(t, ok) + s, ok := n.store.(*mockStore) + require.True(t, ok) + s.nets[n.Name()] = &n.networkRecord + + att, err := n.Attach(ctx, WithContainer(TestContainer), WithIFName(TestInterface)) + require.NoError(t, err) + assert.NoError(t, n.Delete(ctx)) + assert.NoError(t, att.Remove(ctx)) + assert.Equal(t, n.Config(), c.cl) +} + +func TestNetworkAttachment(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + n := testNetwork(t) + c, ok := n.cni.(*mockCNI) + require.True(t, ok) + s, ok := n.store.(*mockStore) + require.True(t, ok) + s.err, c.err = nil, nil + + _, err := n.Attach(ctx) + assert.Error(t, err) + _, err = n.Attach(ctx, WithContainer(TestContainer)) + assert.Error(t, err) + _, err = n.Attach(ctx, WithContainer(TestContainer), WithNSPath(TestNSPath)) + assert.Error(t, err) + att, err := n.Attach(ctx, WithContainer(TestContainer), WithIFName(TestInterface), WithNSPath(TestNSPath)) + assert.NoError(t, err) + require.NoError(t, att.Remove(ctx)) + + c.err = errdefs.ErrUnknown + _, err = n.Attach(ctx, WithContainer(TestContainer), WithIFName(TestInterface), WithNSPath(TestNSPath)) + assert.Error(t, err) + + s.err, c.err = errdefs.ErrAlreadyExists, nil + _, err = n.Attach(ctx, WithContainer(TestContainer), WithIFName(TestInterface), WithNSPath(TestNSPath)) + assert.Error(t, err) +} + +func TestNetworkList(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + n := testNetwork(t) + + ls := n.List(ctx) + assert.Empty(t, ls) + + att, err := n.Attach(ctx, WithContainer(TestContainer), WithIFName(TestInterface)) + require.NoError(t, err) + ls = n.List(ctx) + if assert.Equal(t, len(ls), 1) { + assert.Equal(t, ls[0], att) + } + + require.NoError(t, att.Remove(ctx)) + ls = n.List(ctx) + assert.Empty(t, ls) +} + +func testNetwork(t *testing.T) *network { + n := networkFromRecord(testNetworkRecord(t), testCNI(t), testMemStore(t)) + require.NotNil(t, n) + return n +} diff --git a/pkg/net/opts.go b/pkg/net/opts.go new file mode 100644 index 000000000000..59ed7cfa24d5 --- /dev/null +++ b/pkg/net/opts.go @@ -0,0 +1,206 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +import ( + "fmt" + + gocni "github.com/containerd/go-cni" + "github.com/containernetworking/cni/libcni" +) + +type AttachmentOpt func(Args *AttachmentArgs) error +type ManagerOpt func(cfg *ManagerConfig) error +type NetworkOpt func(cfg *NetworkConfig) error + +type AttachmentArgs struct { + ContainerID string + NSPath string + IFName string + CapabilityArgs map[string]interface{} + PluginArgs map[string]string +} + +type ManagerConfig struct { + PluginDirs []string +} + +type NetworkConfig struct { + Conflist *libcni.NetworkConfigList + Labels map[string]string +} + +func WithPluginDirs(dirs ...string) ManagerOpt { + return func(c *ManagerConfig) error { + c.PluginDirs = append(c.PluginDirs, dirs...) + return nil + } +} + +func WithConflist(conflist *libcni.NetworkConfigList) NetworkOpt { + return func(c *NetworkConfig) error { + c.Conflist = conflist + return nil + } +} + +func WithNetworkLabels(labels map[string]string) NetworkOpt { + return func(c *NetworkConfig) error { + if c.Labels == nil { + c.Labels = make(map[string]string) + } + for k, v := range labels { + c.Labels[k] = v + } + return nil + } +} + +func WithContainer(container string) AttachmentOpt { + return func(c *AttachmentArgs) error { + c.ContainerID = container + return nil + } +} + +func WithNSPath(nsPath string) AttachmentOpt { + return func(c *AttachmentArgs) error { + c.NSPath = nsPath + return nil + } +} + +func WithIFName(ifName string) AttachmentOpt { + return func(c *AttachmentArgs) error { + c.IFName = ifName + return nil + } +} + +// WithCapabilityPortMap adds support for port mappings +func WithCapabilityPortMap(portMapping []gocni.PortMapping) AttachmentOpt { + return func(c *AttachmentArgs) error { + if c.CapabilityArgs == nil { + c.CapabilityArgs = make(map[string]interface{}) + } + c.CapabilityArgs["portMappings"] = portMapping + return nil + } +} + +// WithCapabilityIPRanges adds support for ip ranges +func WithCapabilityIPRanges(ipRanges []gocni.IPRanges) AttachmentOpt { + return func(c *AttachmentArgs) error { + if c.CapabilityArgs == nil { + c.CapabilityArgs = make(map[string]interface{}) + } + c.CapabilityArgs["ipRanges"] = ipRanges + return nil + } +} + +// WithCapabilityBandWitdh adds support for bandwidth limits +func WithCapabilityBandWidth(bandWidth gocni.BandWidth) AttachmentOpt { + return func(c *AttachmentArgs) error { + if c.CapabilityArgs == nil { + c.CapabilityArgs = make(map[string]interface{}) + } + c.CapabilityArgs["bandwidth"] = bandWidth + return nil + } +} + +// WithCapabilityDNS adds support for dns +func WithCapabilityDNS(dns gocni.DNS) AttachmentOpt { + return func(c *AttachmentArgs) error { + if c.CapabilityArgs == nil { + c.CapabilityArgs = make(map[string]interface{}) + } + c.CapabilityArgs["dns"] = dns + return nil + } +} + +// WithCapability support well-known capabilities +// https://www.cni.dev/docs/conventions/#well-known-capabilities +func WithCapability(name string, capability interface{}) AttachmentOpt { + return func(c *AttachmentArgs) error { + if c.CapabilityArgs == nil { + c.CapabilityArgs = make(map[string]interface{}) + } + c.CapabilityArgs[name] = capability + return nil + } +} + +// Args +func WithLabels(labels map[string]string) AttachmentOpt { + return func(c *AttachmentArgs) error { + if c.PluginArgs == nil { + c.PluginArgs = make(map[string]string) + } + for k, v := range labels { + c.PluginArgs[k] = v + } + return nil + } +} + +func WithArgs(k, v string) AttachmentOpt { + return func(c *AttachmentArgs) error { + if c.PluginArgs == nil { + c.PluginArgs = make(map[string]string) + } + c.PluginArgs[k] = v + return nil + } +} + +func (a *AttachmentArgs) config() *libcni.RuntimeConf { + c := &libcni.RuntimeConf{ + ContainerID: a.ContainerID, + NetNS: a.NSPath, + IfName: a.IFName, + } + for k, v := range a.PluginArgs { + c.Args = append(c.Args, [2]string{k, v}) + } + c.CapabilityArgs = a.CapabilityArgs + return c +} + +func (a *AttachmentArgs) validate() error { + if len(a.ContainerID) == 0 { + return fmt.Errorf("empty container ID") + } + if len(a.IFName) == 0 { + return fmt.Errorf("empty ifname") + } + return nil +} + +func (c *NetworkConfig) validate() error { + if len(c.Conflist.Name) == 0 { + return fmt.Errorf("empty network name %q", c.Conflist.Name) + } + + if len(c.Conflist.Plugins) == 0 { + return fmt.Errorf("zero plugins") + } + + return nil +} diff --git a/pkg/net/opts_test.go b/pkg/net/opts_test.go new file mode 100644 index 000000000000..17d584f5cffd --- /dev/null +++ b/pkg/net/opts_test.go @@ -0,0 +1,93 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +import ( + "testing" + + gocni "github.com/containerd/go-cni" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestManagerOpts(t *testing.T) { + opts := []ManagerOpt{ + WithPluginDirs("dir1", "dir2"), + } + c := ManagerConfig{} + for _, o := range opts { + require.NoError(t, o(&c)) + } + assert.Equal(t, c.PluginDirs, []string{"dir1", "dir2"}) +} + +func TestNetworkOpts(t *testing.T) { + opts := []NetworkOpt{ + WithConflist(testNetworkConfList(t)), + WithNetworkLabels(map[string]string{"lkey": "lval"}), + } + c := NetworkConfig{} + for _, o := range opts { + require.NoError(t, o(&c)) + } + assert.Equal(t, c.Labels, map[string]string{"lkey": "lval"}) + assert.Equal(t, c.Conflist, testNetworkConfList(t)) +} + +func TestAttachmentOpts(t *testing.T) { + portMaps := []gocni.PortMapping{{HostPort: 80, ContainerPort: 8080, Protocol: "tcp", HostIP: "192.168.1.1"}} + ipRanges := []gocni.IPRanges{{Subnet: "172.0.1.0/24", RangeStart: "172.0.1.10", RangeEnd: "172.0.1.128", Gateway: "172.0.1.1"}} + bandWidth := gocni.BandWidth{IngressRate: 10, IngressBurst: 50, EgressRate: 20, EgressBurst: 80} + dns := gocni.DNS{Servers: []string{"8.8.8.8"}, Searches: []string{"example.com"}, Options: []string{"option1"}} + + opts := []AttachmentOpt{ + WithContainer(TestContainer), + WithNSPath(TestNSPath), + WithIFName(TestInterface), + WithCapabilityPortMap(portMaps), + WithCapabilityIPRanges(ipRanges), + WithCapabilityBandWidth(bandWidth), + WithCapabilityDNS(dns), + WithCapability("testcap", "testcapvalue"), + WithLabels(map[string]string{"lkey": "lval"}), + WithArgs("testarg", "testargval"), + } + + c := AttachmentArgs{} + for _, o := range opts { + require.NoError(t, o(&c)) + } + + tgt := AttachmentArgs{ + ContainerID: TestContainer, + NSPath: TestNSPath, + IFName: TestInterface, + CapabilityArgs: map[string]interface{}{ + "portMappings": portMaps, + "ipRanges": ipRanges, + "bandwidth": bandWidth, + "dns": dns, + "testcap": "testcapvalue", + }, + PluginArgs: map[string]string{ + "lkey": "lval", + "testarg": "testargval", + }, + } + + assert.Equal(t, c, tgt) +} diff --git a/pkg/net/plugin/plugin.go b/pkg/net/plugin/plugin.go new file mode 100644 index 000000000000..971542792ca4 --- /dev/null +++ b/pkg/net/plugin/plugin.go @@ -0,0 +1,192 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package plugin + +import ( + "os" + "path/filepath" + "sync" + "time" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/pkg/net" + "github.com/containerd/containerd/pkg/net/compat" + "github.com/containerd/containerd/plugin" + bolt "go.etcd.io/bbolt" +) + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.NetworkPlugin, + ID: "cni", + Requires: []plugin.Type{ + plugin.MetadataPlugin, + }, + InitFn: initFunc, + }) +} + +func initFunc(ic *plugin.InitContext) (interface{}, error) { + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + + db, err := initDB(ic) + if err != nil { + return nil, err + } + + i := &impl{ + db: db, + mgrs: make(map[string]net.Manager), + cnis: make(map[string]compat.CNI), + } + + collector := net.NewResourceCollector(i, db) + m.(*metadata.DB).RegisterCollectibleResource(metadata.ResourceNetwork, collector) + + return i, nil +} + +// impl models both network plugin and go-cni style interfaces +// client can freely choose any interface that's better suits +// the use cases. +type impl struct { + db net.DB + mgrs map[string]net.Manager + cnis map[string]compat.CNI + lock sync.Mutex +} + +// impl should implement the new network API, and also be compabitable with the gocni API +var _ net.API = (*impl)(nil) +var _ compat.API = (*impl)(nil) + +func (i *impl) NewManager(name string, opts ...net.ManagerOpt) (net.Manager, error) { + i.lock.Lock() + defer i.lock.Unlock() + + if _, ok := i.mgrs[name]; ok { + return nil, errdefs.ErrAlreadyExists + } + + store, err := net.NewStore(i.db) + if err != nil { + return nil, err + } + + m, err := net.NewManager(name, store, opts...) + if err != nil { + return nil, err + } + + i.mgrs[name] = m + return m, nil +} + +func (i *impl) Manager(name string) net.Manager { + i.lock.Lock() + defer i.lock.Unlock() + + if m, ok := i.mgrs[name]; ok { + return m + } + return nil +} + +func (i *impl) NewCNI(name string, opts ...compat.Opt) (compat.CNI, error) { + i.lock.Lock() + defer i.lock.Unlock() + + if _, ok := i.cnis[name]; ok { + return nil, errdefs.ErrAlreadyExists + } + if _, ok := i.mgrs[name]; ok { + return nil, errdefs.ErrAlreadyExists + } + + c := compat.Config{} + for _, o := range opts { + if err := o(&c); err != nil { + return nil, err + } + } + + store, err := net.NewStore(i.db) + if err != nil { + return nil, err + } + + m, err := net.NewManager(name, store, net.WithPluginDirs(c.PluginDirs...)) + if err != nil { + return nil, err + } + + cni, err := compat.New(m, opts...) + if err != nil { + return nil, err + } + + i.cnis[name] = cni + i.mgrs[name] = m + + return cni, nil +} + +func (i *impl) CNI(name string) compat.CNI { + i.lock.Lock() + defer i.lock.Unlock() + + if cni, ok := i.cnis[name]; ok { + return cni + } + return nil +} + +func initDB(ic *plugin.InitContext) (*bolt.DB, error) { + if err := os.MkdirAll(ic.State, 0711); err != nil { + return nil, err + } + path := filepath.Join(ic.State, "net.db") + options := *bolt.DefaultOptions + // Reading bbolt's freelist sometimes fails when the file has a data corruption. + // Disabling freelist sync reduces the chance of the breakage. + // https://github.com/etcd-io/bbolt/pull/1 + // https://github.com/etcd-io/bbolt/pull/6 + options.NoFreelistSync = true + // Without the timeout, bbolt.Open would block indefinitely due to flock(2). + options.Timeout = 1 + + doneCh := make(chan struct{}) + go func() { + t := time.NewTimer(10 * time.Second) + defer t.Stop() + select { + case <-t.C: + log.G(ic.Context).WithField("plugin", "bolt").Warn("waiting for response from boltdb open") + case <-doneCh: + return + } + }() + + db, err := bolt.Open(path, 0644, &options) + close(doneCh) + return db, err +} diff --git a/pkg/net/store.go b/pkg/net/store.go new file mode 100644 index 000000000000..018bb9e412fe --- /dev/null +++ b/pkg/net/store.go @@ -0,0 +1,788 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +/* Schema +v1 +|--- + |--- + |--- networks + |--- + |--- conflist: + |--- labels + |--- : + |--- createdat: + |--- updatedat: + |--- attachments + |--- // + |--- args: + |--- conflist: + |--- labels + |--- lease: + |--- result: + |--- createdat: + |--- updatedat: + |--- leases + |--- + |--- +*/ + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "time" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/metadata/boltutil" + "github.com/containerd/containerd/namespaces" + "github.com/containernetworking/cni/libcni" + types100 "github.com/containernetworking/cni/pkg/types/100" + "github.com/sirupsen/logrus" + bolt "go.etcd.io/bbolt" +) + +const ( + schemaVersion = "v1" +) + +var ( + bucketKeyVersion = []byte(schemaVersion) + bucketKeyNetworks = []byte("networks") + bucketKeyAttachments = []byte("attachments") + bucketKeyLeases = []byte("leases") + bucketKeyLabels = []byte("labels") + bucketKeyArgs = []byte("args") + bucketKeyConflist = []byte("conflist") + bucketKeyResult = []byte("result") +) + +type transactionKey struct{} + +type DB interface { + View(func(tx *bolt.Tx) error) error + Update(func(tx *bolt.Tx) error) error + Begin(writable bool) (*bolt.Tx, error) +} + +type Store interface { + CreateNetwork(ctx context.Context, r *networkRecord) error + UpdateNetwork(ctx context.Context, r *networkRecord) error + GetNetwork(ctx context.Context, manager, id string) (*networkRecord, error) + DeleteNetwork(ctx context.Context, manager, id string) error + WalkNetworks(ctx context.Context, manager string, fn func(*networkRecord) error) error + CreateAttachment(ctx context.Context, r *attachmentRecord, creator Creator, deleter Deleter) error + UpdateAttachment(ctx context.Context, r *attachmentRecord, updater Updater) error + GetAttachment(ctx context.Context, manager, id string) (*attachmentRecord, error) + DeleteAttachment(ctx context.Context, manager, id string, deleter Deleter) error + WalkAttachments(ctx context.Context, manager, network string, fn func(*attachmentRecord) error) error +} + +type networkRecord struct { + manager string + config *libcni.NetworkConfigList + labels map[string]string + createdAt time.Time + updatedAt time.Time +} + +type attachmentRecord struct { + id string + manager string + network *libcni.NetworkConfigList + result *types100.Result + labels map[string]string + args AttachmentArgs + createdAt time.Time + updatedAt time.Time +} + +// resource mutation callbacks +type Creator func(context.Context) (*types100.Result, error) +type Updater func(context.Context) error +type Deleter func(context.Context) error + +func NewStore(db DB) (Store, error) { + s := &store{ + db: db, + } + if err := s.Init(); err != nil { + return nil, err + } + return s, nil +} + +type store struct { + db DB +} + +var _ Store = (*store)(nil) + +func (s *store) Init() error { + return s.db.Update(func(tx *bolt.Tx) error { + if _, err := tx.CreateBucketIfNotExists(bucketKeyVersion); err != nil { + return err + } + return nil + }) +} + +func (s *store) CreateNetwork(ctx context.Context, r *networkRecord) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + return update(ctx, s.db, func(tx *bolt.Tx) error { + bkt, err := createNetworksBucket(tx, namespace, r.manager) + if err != nil { + return err + } + + nbkt, err := bkt.CreateBucket([]byte(r.config.Name)) + if err != nil { + if err == bolt.ErrBucketExists { + err = errdefs.ErrAlreadyExists + } + return err + } + + r.createdAt = time.Now().UTC() + r.updatedAt = r.createdAt + + if err := writeNetwork(nbkt, r); err != nil { + log.G(ctx).WithFields(logrus.Fields{ + "namespace": namespace, + "manager": r.manager, + "network": r.config.Name, + }).WithError(err).Debugf("network creation failed") + + return err + } + + return nil + }) +} + +func (s *store) UpdateNetwork(ctx context.Context, r *networkRecord) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + var updated networkRecord + + return update(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getNetworksBucket(tx, namespace, r.manager) + if bkt == nil { + return errdefs.ErrNotFound + } + + nbkt := bkt.Bucket([]byte(r.config.Name)) + if nbkt == nil { + return errdefs.ErrNotFound + } + + if err := readNetwork(&updated, nbkt); err != nil { + return err + } + + updated.updatedAt = time.Now().UTC() + updated.config = r.config + + return writeNetwork(nbkt, &updated) + }) +} + +func (s *store) GetNetwork(ctx context.Context, manager, id string) (*networkRecord, error) { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + r := createNetworkRecord(manager, nil) + + if err := view(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getNetworksBucket(tx, namespace, manager) + if bkt == nil { + return errdefs.ErrNotFound + } + + nbkt := bkt.Bucket([]byte(id)) + if nbkt == nil { + return errdefs.ErrNotFound + } + + return readNetwork(r, nbkt) + + }); err != nil { + return nil, err + } + + return r, nil +} + +func (s *store) DeleteNetwork(ctx context.Context, manager, id string) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + return update(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getNetworksBucket(tx, namespace, manager) + if bkt == nil { + return errdefs.ErrNotFound + } + + if err := bkt.DeleteBucket([]byte(id)); err != nil { + if err == bolt.ErrBucketNotFound { + err = errdefs.ErrNotFound + } + return err + } + + return nil + }) +} + +func (s *store) WalkNetworks(ctx context.Context, manager string, fn func(*networkRecord) error) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + return view(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getNetworksBucket(tx, namespace, manager) + + if bkt == nil { + log.G(ctx).WithFields(logrus.Fields{ + "namespace": namespace, + "manager": manager, + }).Debugf("manager bucket not found") + return nil + } + + return bkt.ForEach(func(k, v []byte) error { + nbkt := bkt.Bucket(k) + if nbkt == nil { + return nil + } + + r := createNetworkRecord(manager, nil) + + if err := readNetwork(r, nbkt); err != nil { + return err + } + + fn(r) + + return nil + }) + }) +} + +func (s *store) CreateAttachment(ctx context.Context, r *attachmentRecord, creator Creator, deleter Deleter) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + return update(ctx, s.db, func(tx *bolt.Tx) error { + var ( + err error + bkt, abkt, lbkt *bolt.Bucket + res *types100.Result + ) + ctx := withTransactionContext(ctx, tx) + + bkt, err = createAttachmentsBucket(tx, namespace, r.manager) + if err != nil { + return err + } + + abkt, err = bkt.CreateBucket([]byte(r.id)) + if err != nil { + if err == bolt.ErrBucketExists { + err = errdefs.ErrAlreadyExists + } + return err + } + + res, err = creator(ctx) + if err != nil { + return err + } + defer func() { + if err != nil { + if derr := deleter(ctx); derr != nil { + log.G(ctx).WithError(derr).Errorf("failed to clean up attachment %q", r.id) + } + } + }() + + r.createdAt = time.Now().UTC() + r.updatedAt = r.createdAt + r.result = res + + if err = writeAttachment(abkt, r); err != nil { + return err + } + + if leaseID, ok := r.labels["lease"]; ok { + lbkt, err = createLeasesBucket(tx, namespace, r.manager) + if err != nil { + return err + } + + if err = writeLease(lbkt, leaseID, r.id); err != nil { + return err + } + } + + return nil + }) +} + +func (s *store) UpdateAttachment(ctx context.Context, r *attachmentRecord, updater Updater) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + updated := createAttachmentRecord(r.id, r.manager, r.network) + + return update(ctx, s.db, func(tx *bolt.Tx) error { + ctx := withTransactionContext(ctx, tx) + + bkt := getAttachmentsBucket(tx, namespace, r.manager) + if bkt == nil { + return errdefs.ErrNotFound + } + + abkt := bkt.Bucket([]byte(r.id)) + if abkt == nil { + return errdefs.ErrNotFound + } + + if err := readAttachment(updated, abkt); err != nil { + return err + } + + if err := validateAttachmentUpdate(updated, r); err != nil { + return err + } + + updated.result = r.result + updated.updatedAt = time.Now().UTC() + + if err := writeAttachment(abkt, updated); err != nil { + return err + } + + return updater(ctx) + }) +} + +func (s *store) GetAttachment(ctx context.Context, manager, id string) (*attachmentRecord, error) { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + r := createAttachmentRecord(id, manager, nil) + + if err := view(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getAttachmentsBucket(tx, namespace, manager) + + if bkt == nil { + return errdefs.ErrNotFound + } + + abkt := bkt.Bucket([]byte(id)) + if abkt == nil { + return errdefs.ErrNotFound + } + + return readAttachment(r, abkt) + + }); err != nil { + return nil, err + } + + return r, nil +} + +func (s *store) DeleteAttachment(ctx context.Context, manager, id string, deleter Deleter) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + return update(ctx, s.db, func(tx *bolt.Tx) error { + ctx := withTransactionContext(ctx, tx) + + bkt := getAttachmentsBucket(tx, namespace, manager) + if bkt == nil { + return errdefs.ErrNotFound + } + + if leaseID := queryLease(bkt, id); len(leaseID) > 0 { + if lbkt := getLeasesBucket(tx, namespace, manager); lbkt != nil { + removeLease(lbkt, leaseID, id) + } + } + + if err := bkt.DeleteBucket([]byte(id)); err != nil { + if err == bolt.ErrBucketNotFound { + err = errdefs.ErrNotFound + } + return err + } + + return deleter(ctx) + }) +} + +func (s *store) WalkAttachments(ctx context.Context, manager, network string, fn func(*attachmentRecord) error) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + return view(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getAttachmentsBucket(tx, namespace, manager) + if bkt == nil { + return nil + } + + prefix := []byte(network + "/") + c := bkt.Cursor() + + for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() { + abkt := bkt.Bucket(k) + if abkt == nil { + continue + } + + r := createAttachmentRecord(string(k), manager, nil) + + if err := readAttachment(r, abkt); err != nil { + return err + } + + fn(r) + } + + return nil + }) +} + +func writeNetwork(bkt *bolt.Bucket, r *networkRecord) error { + if err := boltutil.WriteTimestamps(bkt, r.createdAt, r.updatedAt); err != nil { + return err + } + + if err := boltutil.WriteLabels(bkt, r.labels); err != nil { + return err + } + + if r.config == nil { + return fmt.Errorf("conflist is nil") + } + if err := bkt.Put(bucketKeyConflist, r.config.Bytes); err != nil { + return err + } + + return nil +} + +func readNetwork(r *networkRecord, bkt *bolt.Bucket) error { + if err := boltutil.ReadTimestamps(bkt, &r.createdAt, &r.updatedAt); err != nil { + return err + } + + labels, err := boltutil.ReadLabels(bkt) + if err != nil { + return err + } + r.labels = labels + + buf := bkt.Get(bucketKeyConflist) + if buf == nil { + return fmt.Errorf("conflist not found") + } + + cbuf := make([]byte, len(buf)) + copy(cbuf, buf) + conflist, err := libcni.ConfListFromBytes(cbuf) + if err != nil { + return err + } + r.config = conflist + + return err +} + +func readAttachment(r *attachmentRecord, bkt *bolt.Bucket) error { + // attachment args + if abuf := bkt.Get(bucketKeyArgs); abuf != nil { + if r.args.CapabilityArgs == nil { + r.args.CapabilityArgs = make(map[string]interface{}) + } + if r.args.PluginArgs == nil { + r.args.PluginArgs = make(map[string]string) + } + if err := json.Unmarshal(abuf, &r.args); err != nil { + return fmt.Errorf("failed to unmarshal attachment args: %w", err) + } + } else { + return fmt.Errorf("attachment args key not found") + } + + // network + if cbuf := bkt.Get(bucketKeyConflist); cbuf != nil { + tbuf := make([]byte, len(cbuf)) + copy(tbuf, cbuf) + conflist, err := libcni.ConfListFromBytes(tbuf) + if err != nil { + return fmt.Errorf("failed to load conflist: %w", err) + } + r.network = conflist + } else { + return fmt.Errorf("network conflist key not found") + } + + // attachment result + if rbuf := bkt.Get(bucketKeyResult); rbuf != nil { + result := types100.Result{} + if err := json.Unmarshal(rbuf, &result); err != nil { + return fmt.Errorf("failed to unmarshal attachment result: %w", err) + } + r.result = &result + } + + // labels + labels, err := boltutil.ReadLabels(bkt) + if err != nil { + return err + } + r.labels = labels + + // timestamps + if err := boltutil.ReadTimestamps(bkt, &r.createdAt, &r.updatedAt); err != nil { + return err + } + + return nil +} + +func writeAttachment(bkt *bolt.Bucket, r *attachmentRecord) error { + // attachment args + abuf, err := json.Marshal(&r.args) + if err != nil { + return fmt.Errorf("failed to marshal attachment args: %w", err) + } + if err := bkt.Put(bucketKeyArgs, abuf); err != nil { + return fmt.Errorf("failed to write args, %w", err) + } + + // network conflist + if err := bkt.Put(bucketKeyConflist, r.network.Bytes); err != nil { + return fmt.Errorf("failed to write network, %w", err) + } + + // attachment result + if r.result != nil { + rbuf, err := json.Marshal(r.result) + if err != nil { + return fmt.Errorf("failed to marshal attachment result %w", err) + } + if err := bkt.Put(bucketKeyResult, rbuf); err != nil { + return fmt.Errorf("failed to write result %w", err) + } + } + + // labels + if err := boltutil.WriteLabels(bkt, r.labels); err != nil { + return err + } + + // timestamps + if err := boltutil.WriteTimestamps(bkt, r.createdAt, r.updatedAt); err != nil { + return err + } + + return nil +} + +func validateAttachmentUpdate(oldrc, newrc *attachmentRecord) error { + if oldrc.id != newrc.id { + return fmt.Errorf("ID changed: %w", errdefs.ErrInvalidArgument) + } + + if oldrc.createdAt != newrc.createdAt { + return fmt.Errorf("created changed: %w", errdefs.ErrInvalidArgument) + } + + return nil +} + +func createBucketIfNotExists(tx *bolt.Tx, keys ...[]byte) (*bolt.Bucket, error) { + bkt, err := tx.CreateBucketIfNotExists(keys[0]) + if err != nil { + return nil, err + } + + for _, key := range keys[1:] { + bkt, err = bkt.CreateBucketIfNotExists(key) + if err != nil { + return nil, err + } + } + + return bkt, nil +} + +func getBucket(tx *bolt.Tx, names ...[]byte) *bolt.Bucket { + bkt := tx.Bucket(bucketKeyVersion) + if bkt == nil { + return nil + } + + for _, name := range names { + bkt = bkt.Bucket(name) + if bkt == nil { + return nil + } + } + + return bkt +} + +func bucketEmpty(bkt *bolt.Bucket) bool { + if bkt == nil { + return true + } + + c := bkt.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + return false + } + + return true +} + +func createNetworksBucket(tx *bolt.Tx, namespace, manager string) (*bolt.Bucket, error) { + return createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), []byte(manager), bucketKeyNetworks) +} + +func createAttachmentsBucket(tx *bolt.Tx, namespace, manager string) (*bolt.Bucket, error) { + return createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), []byte(manager), bucketKeyAttachments) +} + +func createLeasesBucket(tx *bolt.Tx, namespace, manager string) (*bolt.Bucket, error) { + return createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), []byte(manager), bucketKeyLeases) +} + +func getNetworksBucket(tx *bolt.Tx, namespace, manager string) *bolt.Bucket { + return getBucket(tx, []byte(namespace), []byte(manager), bucketKeyNetworks) +} + +func getAttachmentsBucket(tx *bolt.Tx, namespace, manager string) *bolt.Bucket { + return getBucket(tx, []byte(namespace), []byte(manager), bucketKeyAttachments) +} + +func getLeasesBucket(tx *bolt.Tx, namespace, manager string) *bolt.Bucket { + return getBucket(tx, []byte(namespace), []byte(manager), bucketKeyLeases) +} + +func writeLease(bkt *bolt.Bucket, leaseID string, attachID string) error { + lbkt, err := bkt.CreateBucketIfNotExists([]byte(leaseID)) + if err != nil { + return err + } + if val := lbkt.Get([]byte(attachID)); val != nil { + return errdefs.ErrAlreadyExists + } + return lbkt.Put([]byte(attachID), []byte("")) +} + +func removeLease(bkt *bolt.Bucket, leaseID string, attachID string) error { + lbkt := bkt.Bucket([]byte(leaseID)) + if lbkt == nil { + return nil + } + + if err := lbkt.Delete([]byte(attachID)); err != nil { + return err + } + + if bucketEmpty(lbkt) { + err := bkt.DeleteBucket([]byte(leaseID)) + if err != nil { + return err + } + } + + return nil +} + +func queryLease(bkt *bolt.Bucket, attachID string) string { + abkt := bkt.Bucket([]byte(attachID)) + if abkt == nil { + return "" + } + + lbkt := abkt.Bucket(bucketKeyLabels) + if lbkt == nil { + return "" + } + + v := lbkt.Get([]byte("lease")) + if v == nil { + return "" + } + + return string(v) +} + +func withTransactionContext(ctx context.Context, tx *bolt.Tx) context.Context { + return context.WithValue(ctx, transactionKey{}, tx) +} + +// view gets a bolt db transaction either from the context +// or starts a new one with the provided bolt database. +func view(ctx context.Context, db DB, fn func(*bolt.Tx) error) error { + tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx) + if !ok { + return db.View(fn) + } + return fn(tx) +} + +// update gets a writable bolt db transaction either from the context +// or starts a new one with the provided bolt database. +func update(ctx context.Context, db DB, fn func(*bolt.Tx) error) error { + tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx) + if !ok { + return db.Update(fn) + } else if !tx.Writable() { + return fmt.Errorf("unable to use transaction from context: %w", bolt.ErrTxNotWritable) + } + return fn(tx) +} diff --git a/pkg/net/store_test.go b/pkg/net/store_test.go new file mode 100644 index 000000000000..e72e4cff394b --- /dev/null +++ b/pkg/net/store_test.go @@ -0,0 +1,448 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package net + +import ( + "bytes" + "context" + "net" + "path/filepath" + "testing" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" + "github.com/containernetworking/cni/libcni" + "github.com/containernetworking/cni/pkg/types" + types100 "github.com/containernetworking/cni/pkg/types/100" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + bolt "go.etcd.io/bbolt" +) + +const ( + TestNamespace string = "testns" + TestManager string = "testmgr" + TestNetwork string = "testnet" + TestLease string = "testlease" + TestContainer string = "testcontainer" + TestInterface string = "eth0" + TestNSPath string = "/test/nspath" + TestAttachment string = "testnet/testcontainer/eth0" + TestGCLBKey string = "containerd.io/gc.ref.network.testnet/eth0" + TestGCLBVal string = "testmgr/testnet/testcontainer/eth0" +) + +func TestCreateNetwork(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + store := testDBStore(t) + rec := testNetworkRecord(t) + err := store.CreateNetwork(ctx, rec) + assert.NoError(t, err) + err = store.CreateNetwork(ctx, rec) + assert.Error(t, err) +} + +func TestUpdateNetwork(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + store := testDBStore(t) + rec := testNetworkRecord(t) + err := store.CreateNetwork(ctx, rec) + require.NoError(t, err) + + err = store.UpdateNetwork(ctx, rec) + assert.NoError(t, err) + + conflist, err := libcni.ConfListFromBytes([]byte(`{ + "cniVersion": "0.3.1", + "name": "notfound", + "plugins": [{ + "type": "bridge", + "bridge": "cni0", + "ipam": { + "type": "host-local", + "subnet": "10.1.0.0/16" + } + }] + }`)) + require.NoError(t, err) + notfound := &networkRecord{ + manager: TestManager, + config: conflist, + } + err = store.UpdateNetwork(ctx, notfound) + assert.Error(t, err) +} + +func TestGetNetwork(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + store := testDBStore(t) + rec := testNetworkRecord(t) + err := store.CreateNetwork(ctx, rec) + require.NoError(t, err) + + rec2, err := store.GetNetwork(ctx, rec.manager, rec.config.Name) + assert.NoError(t, err) + assert.Equal(t, rec.manager, rec2.manager) + assert.Equal(t, rec.config.Bytes, rec2.config.Bytes) + + rec3, err := store.GetNetwork(ctx, rec.manager, "notfound") + assert.Error(t, err) + assert.Nil(t, rec3) +} + +func TestDeleteNetwork(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + store := testDBStore(t) + rec := testNetworkRecord(t) + err := store.CreateNetwork(ctx, rec) + require.NoError(t, err) + + err = store.DeleteNetwork(ctx, rec.manager, rec.config.Name) + assert.NoError(t, err) + _, err = store.GetNetwork(ctx, rec.manager, rec.config.Name) + assert.Error(t, err) + err = store.DeleteNetwork(ctx, rec.manager, rec.config.Name) + assert.Error(t, err) +} + +func TestWalkNetworks(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + store := testDBStore(t) + + desc := []string{ + `{ + "cniVersion": "0.3.1", + "name": "net1", + "plugins": [{ + "type": "loopback" + }] + }`, + `{ + "cniVersion": "0.3.1", + "name": "net2", + "plugins": [{ + "type": "loopback" + }] + }`, + } + + nets := make(map[string]*libcni.NetworkConfigList) + for _, d := range desc { + c, err := libcni.ConfListFromBytes([]byte(d)) + require.NoError(t, err) + nets[c.Name] = c + nr := &networkRecord{ + manager: TestManager, + config: c, + } + err = store.CreateNetwork(ctx, nr) + require.NoError(t, err) + } + + nc := 0 + err := store.WalkNetworks(ctx, TestManager, func(nr *networkRecord) error { + nc++ + ns, ok := nets[nr.config.Name] + if assert.True(t, ok) { + assert.Equal(t, ns.Bytes, nr.config.Bytes) + } + return nil + }) + + assert.Equal(t, nc, len(nets)) + assert.NoError(t, err) +} + +func TestCreateAttachment(t *testing.T) { + var err error + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + store := testDBStore(t) + rec := testAttachmentRecord(t) + deleted := false + + creators := []Creator{ + func(ctx context.Context) (*types100.Result, error) { + return testAttachmentResult(t), nil + }, + func(ctx context.Context) (*types100.Result, error) { + return nil, errdefs.ErrAlreadyExists + }, + func(ctx context.Context) (*types100.Result, error) { + return nil, errdefs.ErrInvalidArgument + }, + } + + deleter := func(ctx context.Context) error { + deleted = true + return nil + } + + // creator failure + err = store.CreateAttachment(ctx, rec, creators[1], deleter) + assert.Error(t, err) + assert.False(t, deleted) + + // creator failure + err = store.CreateAttachment(ctx, rec, creators[2], deleter) + assert.Error(t, err) + assert.False(t, deleted) + + // lease already exists + err = store.db.Update(func(tx *bolt.Tx) error { + lbkt, e := createLeasesBucket(tx, TestNamespace, TestManager) + require.NoError(t, e) + e = writeLease(lbkt, TestLease, rec.id) + require.NoError(t, e) + return nil + }) + require.NoError(t, err) + err = store.CreateAttachment(ctx, rec, creators[0], deleter) + assert.Error(t, err) + assert.True(t, deleted) + deleted = false + + // success + err = store.db.Update(func(tx *bolt.Tx) error { + lbkt := getLeasesBucket(tx, TestNamespace, TestManager) + require.NotNil(t, lbkt) + e := lbkt.DeleteBucket([]byte(TestLease)) + require.NoError(t, e) + return nil + }) + require.NoError(t, err) + err = store.CreateAttachment(ctx, rec, creators[0], deleter) + assert.NoError(t, err) + assert.False(t, leaseExists(store.db, TestLease)) + + // duplicate + err = store.CreateAttachment(ctx, rec, creators[0], deleter) + assert.Error(t, err) +} + +func TestUpdateAttachment(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + store := testDBStore(t) + rec := testAttachmentRecord(t) + + err := store.CreateAttachment(ctx, rec, + func(ctx context.Context) (*types100.Result, error) { + return testAttachmentResult(t), nil + }, + func(ctx context.Context) error { + return nil + }, + ) + require.NoError(t, err) + + updater := func(ctx context.Context) error { + return nil + } + + err = store.UpdateAttachment(ctx, rec, updater) + assert.NoError(t, err) + + rec.id = "not-found" + err = store.UpdateAttachment(ctx, rec, updater) + assert.Error(t, err) +} + +func TestGetAttachment(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + store := testDBStore(t) + rec := testAttachmentRecord(t) + + err := store.CreateAttachment(ctx, rec, + func(ctx context.Context) (*types100.Result, error) { + return testAttachmentResult(t), nil + }, + func(ctx context.Context) error { + return nil + }, + ) + require.NoError(t, err) + + arec, err := store.GetAttachment(ctx, rec.manager, rec.id) + assert.NoError(t, err) + + res1, res2 := rec.result, arec.result + rec.result, arec.result = nil, nil + assert.Equal(t, rec, arec) + assert.True(t, attachmentResultEqual(res1, res2)) + + _, err = store.GetAttachment(ctx, rec.manager, "not-found") + assert.Error(t, err) +} + +func TestDeleteAttachment(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + store := testDBStore(t) + rec := testAttachmentRecord(t) + + creator := func(ctx context.Context) (*types100.Result, error) { + return testAttachmentResult(t), nil + } + deleters := []Deleter{ + func(ctx context.Context) error { + return nil + }, + func(ctx context.Context) error { + return errdefs.ErrInvalidArgument + }, + } + + err := store.CreateAttachment(ctx, rec, creator, deleters[0]) + require.NoError(t, err) + err = store.DeleteAttachment(ctx, rec.manager, rec.id, deleters[1]) + assert.Error(t, err) + _, err = store.GetAttachment(ctx, rec.manager, rec.id) + assert.NoError(t, err) + err = store.DeleteAttachment(ctx, rec.manager, rec.id, deleters[0]) + assert.NoError(t, err) + _, err = store.GetAttachment(ctx, rec.manager, rec.id) + assert.Error(t, err) + assert.False(t, leaseExists(store.db, TestLease)) +} + +func TestWalkAttachments(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), TestNamespace) + store := testDBStore(t) + rec := testAttachmentRecord(t) + + err := store.CreateAttachment(ctx, rec, + func(ctx context.Context) (*types100.Result, error) { + return testAttachmentResult(t), nil + }, + func(ctx context.Context) error { + return nil + }, + ) + require.NoError(t, err) + + cnt := 0 + res1 := rec.result + rec.result = nil + err = store.WalkAttachments(ctx, TestManager, rec.network.Name, + func(att *attachmentRecord) error { + cnt++ + res2 := att.result + att.result = nil + assert.Equal(t, rec, att) + assert.True(t, attachmentResultEqual(res1, res2)) + return nil + }, + ) + + assert.NoError(t, err) + assert.Equal(t, cnt, 1) +} + +func testDB(t *testing.T) DB { + db, err := bolt.Open(filepath.Join(t.TempDir(), "testnet.db"), 0644, nil) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, db.Close()) + }) + return db +} + +func testDBStore(t *testing.T) *store { + s, err := NewStore(testDB(t)) + require.NoError(t, err) + r, ok := s.(*store) + require.True(t, ok) + return r +} + +func testNetworkConfList(t *testing.T) *libcni.NetworkConfigList { + conflist, err := libcni.ConfListFromBytes([]byte(`{ + "cniVersion": "0.3.1", + "name": "testnet", + "plugins": [{ + "type": "bridge", + "bridge": "cni0", + "ipam": { + "type": "host-local", + "subnet": "10.1.0.0/16" + } + }] + }`)) + require.NoError(t, err) + return conflist +} + +func testNetworkRecord(t *testing.T) *networkRecord { + return &networkRecord{ + manager: TestManager, + config: testNetworkConfList(t), + } +} + +func testAttachmentRecord(t *testing.T) *attachmentRecord { + return &attachmentRecord{ + id: TestAttachment, + manager: TestManager, + network: testNetworkConfList(t), + labels: map[string]string{ + "lease": TestLease, + }, + args: AttachmentArgs{ + ContainerID: TestContainer, + NSPath: TestNSPath, + IFName: TestInterface, + CapabilityArgs: map[string]interface{}{ + "ca": "ca value", + }, + PluginArgs: map[string]string{ + "pa": "pa value", + }, + }, + } +} + +func testAttachmentResult(t *testing.T) *types100.Result { + zero := 0 + ip, ipnet, err := net.ParseCIDR("127.0.0.1/24") + require.NoError(t, err) + return &types100.Result{ + CNIVersion: "1.0.0", + Interfaces: []*types100.Interface{{Name: "lo", Mac: "mac", Sandbox: "sandbox"}}, + IPs: []*types100.IPConfig{{Interface: &zero, Address: *ipnet, Gateway: ip}}, + Routes: []*types.Route{{Dst: *ipnet, GW: ip}}, + DNS: types.DNS{Nameservers: []string{"ns1"}, Domain: "abc.com", Search: []string{"search1"}, Options: []string{"option1"}}, + } +} + +func attachmentResultEqual(r1, r2 *types100.Result) bool { + var buf1, buf2 bytes.Buffer + r1.PrintTo(&buf1) + r2.PrintTo(&buf2) + return bytes.Equal(buf1.Bytes(), buf2.Bytes()) +} + +func leaseExists(db DB, lease string) bool { + res := false + db.View(func(tx *bolt.Tx) error { + bkt := getLeasesBucket(tx, TestNamespace, TestManager) + if bkt != nil { + if bkt.Get([]byte(lease)) != nil { + res = true + } + } + return nil + }) + return res +} diff --git a/plugin/plugin.go b/plugin/plugin.go index 0bc59d76f954..50ecd6737728 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -88,6 +88,8 @@ const ( TransferPlugin Type = "io.containerd.transfer.v1" // SandboxStorePlugin implements a sandbox store SandboxStorePlugin Type = "io.containerd.sandbox.store.v1" + // NetworkPlugin implements a networks service + NetworkPlugin Type = "io.containerd.network.v1" ) const (