From 8320236ef4dad5934b19cada73fcacd454f33121 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Wed, 16 Jan 2019 17:38:41 +0100 Subject: [PATCH 1/4] Replace k8s-namespace-whitelist by k8s-allow-namespace --- cluster/kubernetes/kubernetes.go | 28 ++++++++++++++-------------- cmd/fluxd/main.go | 9 ++++++--- site/daemon.md | 2 +- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index 3ee342390..61f706b88 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -95,22 +95,22 @@ type Cluster struct { syncErrors map[flux.ResourceID]error muSyncErrors sync.RWMutex - nsWhitelist []string - nsWhitelistLogged map[string]bool // to keep track of whether we've logged a problem with seeing a whitelisted ns + allowedNamespaces []string + loggedAllowedNS map[string]bool // to keep track of whether we've logged a problem with seeing an allowed namespace imageExcludeList []string mu sync.Mutex } // NewCluster returns a usable cluster. -func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing, logger log.Logger, nsWhitelist []string, imageExcludeList []string) *Cluster { +func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing, logger log.Logger, allowedNamespaces []string, imageExcludeList []string) *Cluster { c := &Cluster{ client: client, applier: applier, logger: logger, sshKeyRing: sshKeyRing, - nsWhitelist: nsWhitelist, - nsWhitelistLogged: map[string]bool{}, + allowedNamespaces: allowedNamespaces, + loggedAllowedNS: map[string]bool{}, imageExcludeList: imageExcludeList, } @@ -264,22 +264,22 @@ func (c *Cluster) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) { // getAllowedNamespaces returns a list of namespaces that the Flux instance is expected // to have access to and can look for resources inside of. -// It returns a list of all namespaces unless a namespace whitelist has been set on the Cluster -// instance, in which case it returns a list containing the namespaces from the whitelist -// that exist in the cluster. +// It returns a list of all namespaces unless an explicit list of allowed namespaces +// has been set on the Cluster instance. func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) { - if len(c.nsWhitelist) > 0 { + if len(c.allowedNamespaces) > 0 { nsList := []apiv1.Namespace{} - for _, name := range c.nsWhitelist { + for _, name := range c.allowedNamespaces { ns, err := c.client.CoreV1().Namespaces().Get(name, meta_v1.GetOptions{}) switch { case err == nil: - c.nsWhitelistLogged[name] = false // reset, so if the namespace goes away we'll log it again + c.loggedAllowedNS[name] = false // reset, so if the namespace goes away we'll log it again nsList = append(nsList, *ns) case apierrors.IsUnauthorized(err) || apierrors.IsForbidden(err) || apierrors.IsNotFound(err): - if !c.nsWhitelistLogged[name] { - c.logger.Log("warning", "whitelisted namespace inaccessible", "namespace", name, "err", err) - c.nsWhitelistLogged[name] = true + if !c.loggedAllowedNS[name] { + c.logger.Log("warning", "cannot access namespace set as allowed", + "namespace", name, "err", err) + c.loggedAllowedNS[name] = true } default: return nil, err diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index 26062515d..a4bf064a4 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -129,7 +129,8 @@ func main() { k8sSecretName = fs.String("k8s-secret-name", "flux-git-deploy", "name of the k8s secret used to store the private SSH key") k8sSecretVolumeMountPath = fs.String("k8s-secret-volume-mount-path", "/etc/fluxd/ssh", "mount location of the k8s secret storing the private SSH key") k8sSecretDataKey = fs.String("k8s-secret-data-key", "identity", "data key holding the private SSH key within the k8s secret") - k8sNamespaceWhitelist = fs.StringSlice("k8s-namespace-whitelist", []string{}, "experimental, optional: restrict the view of the cluster to the namespaces listed. All namespaces are included if this is not set.") + k8sNamespaceWhitelist = fs.StringSlice("k8s-namespace-whitelist", []string{}, "experimental, optional: restrict the view of the cluster to the namespaces listed. All namespaces are included if this is not set") + k8sAllowNamespace = fs.StringSlice("k8s-allow-namespace", []string{}, "experimental: restrict all operations to the provided namespaces") // SSH key generation sshKeyBits = optionalVar(fs, &ssh.KeyBitsValue{}, "ssh-keygen-bits", "-b argument to ssh-keygen (default unspecified)") sshKeyType = optionalVar(fs, &ssh.KeyTypeValue{}, "ssh-keygen-type", "-t argument to ssh-keygen (default unspecified)") @@ -143,6 +144,7 @@ func main() { _ = fs.Duration("registry-cache-expiry", 0, "") ) fs.MarkDeprecated("registry-cache-expiry", "no longer used; cache entries are expired adaptively according to how often they change") + fs.MarkDeprecated("k8s-namespace-whitelist", "changed to --k8s-allow-namespace, use that instead") err := fs.Parse(os.Args[1:]) switch { @@ -323,9 +325,10 @@ func main() { } logger.Log("kubectl", kubectl) - kubectlApplier := kubernetes.NewKubectl(kubectl, restClientConfig) client := kubernetes.MakeClusterClientset(clientset, dynamicClientset, integrationsClientset, discoClientset) - k8sInst := kubernetes.NewCluster(client, kubectlApplier, sshKeyRing, logger, *k8sNamespaceWhitelist, *registryExcludeImage) + kubectlApplier := kubernetes.NewKubectl(kubectl, restClientConfig) + allowedNamespaces := append(*k8sNamespaceWhitelist, *k8sAllowNamespace...) + k8sInst := kubernetes.NewCluster(client, kubectlApplier, sshKeyRing, logger, allowedNamespaces, *registryExcludeImage) k8sInst.GC = *syncGC if err := k8sInst.Ping(); err != nil { diff --git a/site/daemon.md b/site/daemon.md index a7a21ee23..70dbb3d0e 100644 --- a/site/daemon.md +++ b/site/daemon.md @@ -86,7 +86,7 @@ fluxd requires setup and offers customization though a multitude of flags. | --k8s-secret-volume-mount-path | `/etc/fluxd/ssh` | mount location of the k8s secret storing the private SSH key | --k8s-secret-data-key | `identity` | data key holding the private SSH key within the k8s secret | **k8s configuration** -| --k8s-namespace-whitelist | | Experimental, optional: restrict the view of the cluster to the namespaces listed. All namespaces are included if this is not set. +| --k8s-allow-namespace | | experimental: restrict all operations to the provided namespaces | **upstream service** | --connect | | connect to an upstream service e.g., Weave Cloud, at this base address | --token | | authentication token for upstream service From 1c815318e99fbdb52ea1e7623c18a97926cbbc05 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 17 Jan 2019 03:33:26 +0100 Subject: [PATCH 2/4] Honor allowed namespaces in all cluster/git operations --- cluster/cluster.go | 1 + cluster/kubernetes/kubernetes.go | 40 +++++++++++++++++--- cluster/kubernetes/sync.go | 64 ++++++++++++++++++++++++-------- cluster/kubernetes/sync_test.go | 6 +-- cluster/mock.go | 23 +++++++----- daemon/daemon.go | 9 ++++- daemon/daemon_test.go | 1 + release/context.go | 4 +- release/releaser_test.go | 24 ++++++------ update/filter.go | 26 ++++++------- 10 files changed, 136 insertions(+), 62 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 5e427d963..e24883eb9 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -27,6 +27,7 @@ type Cluster interface { // Get all of the services (optionally, from a specific namespace), excluding those AllWorkloads(maybeNamespace string) ([]Workload, error) SomeWorkloads([]flux.ResourceID) ([]Workload, error) + IsAllowedResource(flux.ResourceID) bool Ping() error Export() ([]byte, error) Sync(SyncSet) error diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index 61f706b88..9a577af6b 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -17,6 +17,7 @@ import ( "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" + "github.com/weaveworks/flux/cluster/kubernetes/resource" fhrclient "github.com/weaveworks/flux/integrations/client/clientset/versioned" "github.com/weaveworks/flux/ssh" ) @@ -119,12 +120,15 @@ func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing, // --- cluster.Cluster -// SomeWorkloads returns the workloads named, missing out any that aren't -// accessible in the cluster. They do not necessarily have to be returned -// in the order requested. +// SomeWorkloads returns the workloads named, missing out any that don't +// exist in the cluster or aren't in an allowed namespace. +// They do not necessarily have to be returned in the order requested. func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, err error) { var workloads []cluster.Workload for _, id := range ids { + if !c.IsAllowedResource(id) { + continue + } ns, kind, name := id.Components() resourceKind, ok := resourceKinds[kind] @@ -150,7 +154,7 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, return workloads, nil } -// AllWorkloads returns all workloads matching the criteria; that is, in +// AllWorkloads returns all workloads in allowed namespaces matching the criteria; that is, in // the namespace (or any namespace if that argument is empty) func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err error) { namespaces, err := c.getAllowedNamespaces() @@ -277,7 +281,7 @@ func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) { nsList = append(nsList, *ns) case apierrors.IsUnauthorized(err) || apierrors.IsForbidden(err) || apierrors.IsNotFound(err): if !c.loggedAllowedNS[name] { - c.logger.Log("warning", "cannot access namespace set as allowed", + c.logger.Log("warning", "cannot access allowed namespace", "namespace", name, "err", err) c.loggedAllowedNS[name] = true } @@ -295,6 +299,32 @@ func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) { return namespaces.Items, nil } +func (c *Cluster) IsAllowedResource(id flux.ResourceID) bool { + if len(c.allowedNamespaces) == 0 { + // All resources are allowed when all namespaces are allowed + return true + } + + namespace, kind, name := id.Components() + namespaceToCheck := namespace + + if namespace == resource.ClusterScope { + // All cluster-scoped resources (not namespaced) are allowed ... + if kind != "namespace" { + return true + } + // ... except namespaces themselves, whose name needs to be explicitly allowed + namespaceToCheck = name + } + + for _, allowedNS := range c.allowedNamespaces { + if namespaceToCheck == allowedNS { + return true + } + } + return false +} + // kind & apiVersion must be passed separately as the object's TypeMeta is not populated func appendYAML(buffer *bytes.Buffer, apiVersion, kind string, object interface{}) error { yamlBytes, err := k8syaml.Marshal(object) diff --git a/cluster/kubernetes/sync.go b/cluster/kubernetes/sync.go index 06a73ee83..b412e62c6 100644 --- a/cluster/kubernetes/sync.go +++ b/cluster/kubernetes/sync.go @@ -54,7 +54,7 @@ func (c *Cluster) Sync(syncSet cluster.SyncSet) error { // NB we get all resources, since we care about leaving unsynced, // _ignored_ resources alone. - clusterResources, err := c.getResourcesBySelector("") + clusterResources, err := c.getAllowedResourcesBySelector("") if err != nil { return errors.Wrap(err, "collating resources in cluster for sync") } @@ -62,7 +62,11 @@ func (c *Cluster) Sync(syncSet cluster.SyncSet) error { cs := makeChangeSet() var errs cluster.SyncError for _, res := range syncSet.Resources { - id := res.ResourceID().String() + resID := res.ResourceID() + if !c.IsAllowedResource(resID) { + continue + } + id := resID.String() // make a record of the checksum, whether we stage it to // be applied or not, so that we don't delete it later. csum := sha1.Sum(res.Bytes()) @@ -122,7 +126,7 @@ func (c *Cluster) collectGarbage( orphanedResources := makeChangeSet() - clusterResources, err := c.getGCMarkedResourcesInSyncSet(syncSet.Name) + clusterResources, err := c.getAllowedGCMarkedResourcesInSyncSet(syncSet.Name) if err != nil { return nil, errors.Wrap(err, "collating resources in cluster for calculating garbage collection") } @@ -188,7 +192,7 @@ func (r *kuberesource) GetGCMark() string { return r.obj.GetLabels()[gcMarkLabel] } -func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesource, error) { +func (c *Cluster) getAllowedResourcesBySelector(selector string) (map[string]*kuberesource, error) { listOptions := meta_v1.ListOptions{} if selector != "" { listOptions.LabelSelector = selector @@ -216,14 +220,12 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou if !contains(verbs, "list") { continue } - groupVersion, err := schema.ParseGroupVersion(resource.GroupVersion) if err != nil { return nil, err } - - resourceClient := c.client.dynamicClient.Resource(groupVersion.WithResource(apiResource.Name)) - data, err := resourceClient.List(listOptions) + gvr := groupVersion.WithResource(apiResource.Name) + list, err := c.listAllowedResources(apiResource.Namespaced, gvr, listOptions) if err != nil { if apierrors.IsForbidden(err) { // we are not allowed to list this resource but @@ -233,7 +235,7 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou return nil, err } - for i, item := range data.Items { + for i, item := range list { apiVersion := item.GetAPIVersion() kind := item.GetKind() @@ -244,7 +246,7 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou } // TODO(michael) also exclude anything that has an ownerReference (that isn't "standard"?) - res := &kuberesource{obj: &data.Items[i], namespaced: apiResource.Namespaced} + res := &kuberesource{obj: &list[i], namespaced: apiResource.Namespaced} result[res.ResourceID().String()] = res } } @@ -253,18 +255,48 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou return result, nil } -func (c *Cluster) getGCMarkedResourcesInSyncSet(syncSetName string) (map[string]*kuberesource, error) { - allGCMarkedResources, err := c.getResourcesBySelector(gcMarkLabel) // means "gcMarkLabel exists" +func (c *Cluster) listAllowedResources( + namespaced bool, gvr schema.GroupVersionResource, options meta_v1.ListOptions) ([]unstructured.Unstructured, error) { + if !namespaced || len(c.allowedNamespaces) == 0 { + // The resource is not namespaced or all the namespaces are allowed + resourceClient := c.client.dynamicClient.Resource(gvr) + data, err := resourceClient.List(options) + if err != nil { + return nil, err + } + return data.Items, nil + } + + // List resources only from the allowed namespaces + var result []unstructured.Unstructured + for _, ns := range c.allowedNamespaces { + data, err := c.client.dynamicClient.Resource(gvr).Namespace(ns).List(options) + if err != nil { + return result, err + } + result = append(result, data.Items...) + } + return result, nil +} + +func (c *Cluster) getAllowedGCMarkedResourcesInSyncSet(syncSetName string) (map[string]*kuberesource, error) { + allGCMarkedResources, err := c.getAllowedResourcesBySelector(gcMarkLabel) // means "gcMarkLabel exists" if err != nil { return nil, err } - syncSetGCMarkedResources := map[string]*kuberesource{} + allowedSyncSetGCMarkedResources := map[string]*kuberesource{} for resID, kres := range allGCMarkedResources { - if kres.GetGCMark() == makeGCMark(syncSetName, resID) { - syncSetGCMarkedResources[resID] = kres + // Discard disallowed resources + if !c.IsAllowedResource(kres.ResourceID()) { + continue + } + // Discard resources out of the Sync Set + if kres.GetGCMark() != makeGCMark(syncSetName, resID) { + continue } + allowedSyncSetGCMarkedResources[resID] = kres } - return syncSetGCMarkedResources, nil + return allowedSyncSetGCMarkedResources, nil } func applyMetadata(res resource.Resource, syncSetName, checksum string) ([]byte, error) { diff --git a/cluster/kubernetes/sync_test.go b/cluster/kubernetes/sync_test.go index ba6af3539..7c4a055df 100644 --- a/cluster/kubernetes/sync_test.go +++ b/cluster/kubernetes/sync_test.go @@ -290,8 +290,8 @@ metadata: panic(err) } - // Now check what resources remain in the sync set - actual, err := kube.getGCMarkedResourcesInSyncSet("testset") + // Now check that the resources were created + actual, err := kube.getAllowedGCMarkedResourcesInSyncSet("testset") if err != nil { t.Fatal(err) } @@ -553,7 +553,7 @@ spec: assert.NoError(t, err) // Check that our resource-getting also sees the pre-existing resource - resources, err := kube.getResourcesBySelector("") + resources, err := kube.getAllowedResourcesBySelector("") assert.NoError(t, err) assert.Contains(t, resources, "foobar:deployment/dep1") diff --git a/cluster/mock.go b/cluster/mock.go index cdce3db5f..bba43c792 100644 --- a/cluster/mock.go +++ b/cluster/mock.go @@ -10,15 +10,16 @@ import ( // Doubles as a cluster.Cluster and cluster.Manifests implementation type Mock struct { - AllWorkloadsFunc func(maybeNamespace string) ([]Workload, error) - SomeWorkloadsFunc func([]flux.ResourceID) ([]Workload, error) - PingFunc func() error - ExportFunc func() ([]byte, error) - SyncFunc func(SyncSet) error - PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error) - UpdateImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) - LoadManifestsFunc func(base string, paths []string) (map[string]resource.Resource, error) - UpdatePoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error) + AllWorkloadsFunc func(maybeNamespace string) ([]Workload, error) + SomeWorkloadsFunc func([]flux.ResourceID) ([]Workload, error) + IsAllowedResourceFunc func(flux.ResourceID) bool + PingFunc func() error + ExportFunc func() ([]byte, error) + SyncFunc func(SyncSet) error + PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error) + UpdateImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) + LoadManifestsFunc func(base string, paths []string) (map[string]resource.Resource, error) + UpdatePoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error) } func (m *Mock) AllWorkloads(maybeNamespace string) ([]Workload, error) { @@ -29,6 +30,10 @@ func (m *Mock) SomeWorkloads(s []flux.ResourceID) ([]Workload, error) { return m.SomeWorkloadsFunc(s) } +func (m *Mock) IsAllowedResource(id flux.ResourceID) bool { + return m.IsAllowedResourceFunc(id) +} + func (m *Mock) Ping() error { return m.PingFunc() } diff --git a/daemon/daemon.go b/daemon/daemon.go index a0bdebeef..4d9425eff 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -377,6 +377,11 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) updateFu var anythingAutomated bool for workloadID, u := range updates { + if d.Cluster.IsAllowedResource(workloadID) { + result.Result[workloadID] = update.WorkloadResult{ + Status: update.ReleaseStatusSkipped, + } + } if policy.Set(u.Add).Has(policy.Automated) { anythingAutomated = true } @@ -707,7 +712,7 @@ func policyEvents(us policy.Updates, now time.Time) map[string]event.Event { // policyEventTypes is a deduped list of all event types this update contains func policyEventTypes(u policy.Update) []string { types := map[string]struct{}{} - for p, _ := range u.Add { + for p := range u.Add { switch { case p == policy.Automated: types[event.EventAutomate] = struct{}{} @@ -718,7 +723,7 @@ func policyEventTypes(u policy.Update) []string { } } - for p, _ := range u.Remove { + for p := range u.Remove { switch { case p == policy.Automated: types[event.EventDeautomate] = struct{}{} diff --git a/daemon/daemon_test.go b/daemon/daemon_test.go index 4058c8611..d2e040699 100644 --- a/daemon/daemon_test.go +++ b/daemon/daemon_test.go @@ -693,6 +693,7 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *cluster.Mock, *mockEven } return []cluster.Workload{}, nil } + k8s.IsAllowedResourceFunc = func(flux.ResourceID) bool { return true } k8s.ExportFunc = func() ([]byte, error) { return testBytes, nil } k8s.PingFunc = func() error { return nil } k8s.SomeWorkloadsFunc = func([]flux.ResourceID) ([]cluster.Workload, error) { diff --git a/release/context.go b/release/context.go index 630075876..c8f59b7ec 100644 --- a/release/context.go +++ b/release/context.go @@ -82,11 +82,11 @@ func (rc *ReleaseContext) SelectWorkloads(results update.Result, prefilters, pos for _, s := range allDefined { res := s.Filter(prefilters...) if res.Error == "" { - // Give these a default value, in case we don't find them + // Give these a default value, in case we cannot access them // in the cluster. results[s.ResourceID] = update.WorkloadResult{ Status: update.ReleaseStatusSkipped, - Error: update.NotInCluster, + Error: update.NotAccessibleInCluster, } toAskClusterAbout = append(toAskClusterAbout, s.ResourceID) } else { diff --git a/release/releaser_test.go b/release/releaser_test.go index 4368318ce..5a0fe5989 100644 --- a/release/releaser_test.go +++ b/release/releaser_test.go @@ -180,7 +180,7 @@ var ignoredNotInRepo = update.WorkloadResult{ var ignoredNotInCluster = update.WorkloadResult{ Status: update.ReleaseStatusIgnored, - Error: update.NotInCluster, + Error: update.NotAccessibleInCluster, } var skippedLocked = update.WorkloadResult{ @@ -190,7 +190,7 @@ var skippedLocked = update.WorkloadResult{ var skippedNotInCluster = update.WorkloadResult{ Status: update.ReleaseStatusSkipped, - Error: update.NotInCluster, + Error: update.NotAccessibleInCluster, } var skippedNotInRepo = update.WorkloadResult{ @@ -238,7 +238,7 @@ func Test_InitContainer(t *testing.T) { initWorkloadID: update.WorkloadResult{ Status: update.ReleaseStatusSuccess, PerContainer: []update.ContainerUpdate{ - update.ContainerUpdate{ + { Container: helloContainer, Current: oldRef, Target: newHwRef, @@ -292,12 +292,12 @@ func Test_FilterLogic(t *testing.T) { flux.MustParseResourceID("default:deployment/helloworld"): update.WorkloadResult{ Status: update.ReleaseStatusSuccess, PerContainer: []update.ContainerUpdate{ - update.ContainerUpdate{ + { Container: helloContainer, Current: oldRef, Target: newHwRef, }, - update.ContainerUpdate{ + { Container: sidecarContainer, Current: sidecarRef, Target: newSidecarRef, @@ -320,12 +320,12 @@ func Test_FilterLogic(t *testing.T) { flux.MustParseResourceID("default:deployment/helloworld"): update.WorkloadResult{ Status: update.ReleaseStatusSuccess, PerContainer: []update.ContainerUpdate{ - update.ContainerUpdate{ + { Container: helloContainer, Current: oldRef, Target: newHwRef, }, - update.ContainerUpdate{ + { Container: sidecarContainer, Current: sidecarRef, Target: newSidecarRef, @@ -352,7 +352,7 @@ func Test_FilterLogic(t *testing.T) { flux.MustParseResourceID("default:deployment/helloworld"): update.WorkloadResult{ Status: update.ReleaseStatusSuccess, PerContainer: []update.ContainerUpdate{ - update.ContainerUpdate{ + { Container: helloContainer, Current: oldRef, Target: newHwRef, @@ -381,12 +381,12 @@ func Test_FilterLogic(t *testing.T) { flux.MustParseResourceID("default:deployment/helloworld"): update.WorkloadResult{ Status: update.ReleaseStatusSuccess, PerContainer: []update.ContainerUpdate{ - update.ContainerUpdate{ + { Container: helloContainer, Current: oldRef, Target: newHwRef, }, - update.ContainerUpdate{ + { Container: sidecarContainer, Current: sidecarRef, Target: newSidecarRef, @@ -413,12 +413,12 @@ func Test_FilterLogic(t *testing.T) { flux.MustParseResourceID("default:deployment/helloworld"): update.WorkloadResult{ Status: update.ReleaseStatusSuccess, PerContainer: []update.ContainerUpdate{ - update.ContainerUpdate{ + { Container: helloContainer, Current: oldRef, Target: newHwRef, }, - update.ContainerUpdate{ + { Container: sidecarContainer, Current: sidecarRef, Target: newSidecarRef, diff --git a/update/filter.go b/update/filter.go index 21717d5db..0482f90fe 100644 --- a/update/filter.go +++ b/update/filter.go @@ -7,18 +7,18 @@ import ( ) const ( - Locked = "locked" - Ignore = "ignore" - NotIncluded = "not included" - Excluded = "excluded" - DifferentImage = "a different image" - NotInCluster = "not running in cluster" - NotInRepo = "not found in repository" - ImageNotFound = "cannot find one or more images" - ImageUpToDate = "image(s) up to date" - DoesNotUseImage = "does not use image(s)" - ContainerNotFound = "container(s) not found: %s" - ContainerTagMismatch = "container(s) tag mismatch: %s" + Locked = "locked" + Ignore = "ignore" + NotIncluded = "not included" + Excluded = "excluded" + DifferentImage = "a different image" + NotAccessibleInCluster = "not accessible in cluster" + NotInRepo = "not found in repository" + ImageNotFound = "cannot find one or more images" + ImageUpToDate = "image(s) up to date" + DoesNotUseImage = "does not use image(s)" + ContainerNotFound = "container(s) not found: %s" + ContainerTagMismatch = "container(s) tag mismatch: %s" ) type SpecificImageFilter struct { @@ -30,7 +30,7 @@ func (f *SpecificImageFilter) Filter(u WorkloadUpdate) WorkloadResult { if len(u.Workload.Containers.Containers) == 0 { return WorkloadResult{ Status: ReleaseStatusIgnored, - Error: NotInCluster, + Error: NotAccessibleInCluster, } } // For each container in update From a61e22871c5e4fb55212cfdf76f8905b2845170f Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Fri, 22 Mar 2019 14:39:23 +0100 Subject: [PATCH 3/4] Fix multiple namespace filtering issues * Rename `getAllowedNamespaces()` to `getAllowedAndExistingNamespaces()` * Remove redundant namespace check * Check for namespace existence when syncing --- cluster/kubernetes/images.go | 2 +- cluster/kubernetes/kubernetes.go | 10 +++++----- cluster/kubernetes/kubernetes_test.go | 2 +- cluster/kubernetes/sync.go | 18 +++++++++--------- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/cluster/kubernetes/images.go b/cluster/kubernetes/images.go index b4c71b215..1f8190d19 100644 --- a/cluster/kubernetes/images.go +++ b/cluster/kubernetes/images.go @@ -123,7 +123,7 @@ func mergeCredentials(log func(...interface{}) error, func (c *Cluster) ImagesToFetch() registry.ImageCreds { allImageCreds := make(registry.ImageCreds) - namespaces, err := c.getAllowedNamespaces() + namespaces, err := c.getAllowedAndExistingNamespaces() if err != nil { c.logger.Log("err", errors.Wrap(err, "getting namespaces")) return allImageCreds diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index 9a577af6b..0d3d8fb0a 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -157,7 +157,7 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, // AllWorkloads returns all workloads in allowed namespaces matching the criteria; that is, in // the namespace (or any namespace if that argument is empty) func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err error) { - namespaces, err := c.getAllowedNamespaces() + namespaces, err := c.getAllowedAndExistingNamespaces() if err != nil { return nil, errors.Wrap(err, "getting namespaces") } @@ -217,7 +217,7 @@ func (c *Cluster) Ping() error { func (c *Cluster) Export() ([]byte, error) { var config bytes.Buffer - namespaces, err := c.getAllowedNamespaces() + namespaces, err := c.getAllowedAndExistingNamespaces() if err != nil { return nil, errors.Wrap(err, "getting namespaces") } @@ -266,11 +266,11 @@ func (c *Cluster) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) { return publicKey, nil } -// getAllowedNamespaces returns a list of namespaces that the Flux instance is expected -// to have access to and can look for resources inside of. +// getAllowedAndExistingNamespaces returns a list of existing namespaces that +// the Flux instance is expected to have access to and can look for resources inside of. // It returns a list of all namespaces unless an explicit list of allowed namespaces // has been set on the Cluster instance. -func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) { +func (c *Cluster) getAllowedAndExistingNamespaces() ([]apiv1.Namespace, error) { if len(c.allowedNamespaces) > 0 { nsList := []apiv1.Namespace{} for _, name := range c.allowedNamespaces { diff --git a/cluster/kubernetes/kubernetes_test.go b/cluster/kubernetes/kubernetes_test.go index 0d80fd56a..4e16c6c29 100644 --- a/cluster/kubernetes/kubernetes_test.go +++ b/cluster/kubernetes/kubernetes_test.go @@ -28,7 +28,7 @@ func testGetAllowedNamespaces(t *testing.T, namespace []string, expected []strin client := ExtendedClient{coreClient: clientset} c := NewCluster(client, nil, nil, log.NewNopLogger(), namespace, []string{}) - namespaces, err := c.getAllowedNamespaces() + namespaces, err := c.getAllowedAndExistingNamespaces() if err != nil { t.Errorf("The error should be nil, not: %s", err) } diff --git a/cluster/kubernetes/sync.go b/cluster/kubernetes/sync.go index b412e62c6..83897ee87 100644 --- a/cluster/kubernetes/sync.go +++ b/cluster/kubernetes/sync.go @@ -257,8 +257,8 @@ func (c *Cluster) getAllowedResourcesBySelector(selector string) (map[string]*ku func (c *Cluster) listAllowedResources( namespaced bool, gvr schema.GroupVersionResource, options meta_v1.ListOptions) ([]unstructured.Unstructured, error) { - if !namespaced || len(c.allowedNamespaces) == 0 { - // The resource is not namespaced or all the namespaces are allowed + if !namespaced { + // The resource is not namespaced resourceClient := c.client.dynamicClient.Resource(gvr) data, err := resourceClient.List(options) if err != nil { @@ -268,9 +268,13 @@ func (c *Cluster) listAllowedResources( } // List resources only from the allowed namespaces + namespaces, err := c.getAllowedAndExistingNamespaces() + if err != nil { + return nil, err + } var result []unstructured.Unstructured - for _, ns := range c.allowedNamespaces { - data, err := c.client.dynamicClient.Resource(gvr).Namespace(ns).List(options) + for _, ns := range namespaces { + data, err := c.client.dynamicClient.Resource(gvr).Namespace(ns.Name).List(options) if err != nil { return result, err } @@ -286,11 +290,7 @@ func (c *Cluster) getAllowedGCMarkedResourcesInSyncSet(syncSetName string) (map[ } allowedSyncSetGCMarkedResources := map[string]*kuberesource{} for resID, kres := range allGCMarkedResources { - // Discard disallowed resources - if !c.IsAllowedResource(kres.ResourceID()) { - continue - } - // Discard resources out of the Sync Set + // Discard resources whose mark doesn't match their resource ID if kres.GetGCMark() != makeGCMark(syncSetName, resID) { continue } From 0aaca0b2a301812506bdcc7f712bab9f5789ca9f Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 26 Mar 2019 18:35:18 +0100 Subject: [PATCH 4/4] Fix tests --- cluster/kubernetes/sync.go | 2 +- cluster/kubernetes/sync_test.go | 94 +++++++++++++++++++++++++-------- 2 files changed, 72 insertions(+), 24 deletions(-) diff --git a/cluster/kubernetes/sync.go b/cluster/kubernetes/sync.go index 83897ee87..9d4cda88c 100644 --- a/cluster/kubernetes/sync.go +++ b/cluster/kubernetes/sync.go @@ -258,7 +258,7 @@ func (c *Cluster) getAllowedResourcesBySelector(selector string) (map[string]*ku func (c *Cluster) listAllowedResources( namespaced bool, gvr schema.GroupVersionResource, options meta_v1.ListOptions) ([]unstructured.Unstructured, error) { if !namespaced { - // The resource is not namespaced + // The resource is not namespaced, everything is allowed resourceClient := c.client.dynamicClient.Resource(gvr) data, err := resourceClient.List(options) if err != nil { diff --git a/cluster/kubernetes/sync_test.go b/cluster/kubernetes/sync_test.go index 7c4a055df..12f042e79 100644 --- a/cluster/kubernetes/sync_test.go +++ b/cluster/kubernetes/sync_test.go @@ -21,6 +21,7 @@ import ( // k8sclient "k8s.io/client-go/kubernetes" "github.com/stretchr/testify/assert" "k8s.io/client-go/discovery" + k8sclient "k8s.io/client-go/kubernetes" corefake "k8s.io/client-go/kubernetes/fake" k8s_testing "k8s.io/client-go/testing" @@ -95,10 +96,10 @@ func fakeClients() ExtendedClient { // enough for checking whether sync operations succeeded and had the // correct effect, which is either to "upsert", or delete, resources. type fakeApplier struct { - client dynamic.Interface - discovery discovery.DiscoveryInterface - defaultNS string - commandRun bool + dynamicClient dynamic.Interface + coreClient k8sclient.Interface + defaultNS string + commandRun bool } func groupVersionResource(res *unstructured.Unstructured) schema.GroupVersionResource { @@ -126,12 +127,12 @@ func (a fakeApplier) apply(_ log.Logger, cs changeSet, errored map[flux.Resource } gvr := groupVersionResource(res) - c := a.client.Resource(gvr) + c := a.dynamicClient.Resource(gvr) // This is an approximation to what `kubectl` does in filling // in the fallback namespace (from config). In the case of // non-namespaced entities, it will be ignored by the fake // client (FIXME: make sure of this). - apiRes := findAPIResource(gvr, a.discovery) + apiRes := findAPIResource(gvr, a.coreClient.Discovery()) if apiRes == nil { panic("no APIResource found for " + gvr.String()) } @@ -159,11 +160,40 @@ func (a fakeApplier) apply(_ log.Logger, cs changeSet, errored map[flux.Resource errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) return } + if res.GetKind() == "Namespace" { + // We also create namespaces in the core fake client since the dynamic client + // and core clients don't share resources + var ns corev1.Namespace + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstruct, &ns); err != nil { + errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) + return + } + _, err := a.coreClient.CoreV1().Namespaces().Get(ns.Name, metav1.GetOptions{}) + switch { + case errors.IsNotFound(err): + _, err = a.coreClient.CoreV1().Namespaces().Create(&ns) + case err == nil: + _, err = a.coreClient.CoreV1().Namespaces().Update(&ns) + } + if err != nil { + errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) + return + } + } + } else if cmd == "delete" { if err := dc.Delete(name, &metav1.DeleteOptions{}); err != nil { errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) return } + if res.GetKind() == "Namespace" { + // We also create namespaces in the core fake client since the dynamic client + // and core clients don't share resources + if err := a.coreClient.CoreV1().Namespaces().Delete(res.GetName(), &metav1.DeleteOptions{}); err != nil { + errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) + return + } + } } else { panic("unknown action: " + cmd) } @@ -202,7 +232,7 @@ func findAPIResource(gvr schema.GroupVersionResource, disco discovery.DiscoveryI func setup(t *testing.T) (*Cluster, *fakeApplier) { clients := fakeClients() - applier := &fakeApplier{client: clients.dynamicClient, discovery: clients.coreClient.Discovery(), defaultNS: defaultTestNamespace} + applier := &fakeApplier{dynamicClient: clients.dynamicClient, coreClient: clients.coreClient, defaultNS: defaultTestNamespace} kube := &Cluster{ applier: applier, client: clients, @@ -222,6 +252,13 @@ func TestSyncNop(t *testing.T) { } func TestSync(t *testing.T) { + const ns1 = `--- +apiVersion: v1 +kind: Namespace +metadata: + name: foobar +` + const defs1 = `--- apiVersion: apps/v1 kind: Deployment @@ -238,6 +275,13 @@ metadata: namespace: foobar ` + const ns3 = `--- +apiVersion: v1 +kind: Namespace +metadata: + name: other +` + const defs3 = `--- apiVersion: apps/v1 kind: Deployment @@ -245,7 +289,6 @@ metadata: name: dep3 namespace: other ` - // checkSame is a check that a result returned from the cluster is // the same as an expected. labels and annotations may be altered // by the sync process; we'll look at the "spec" field as an @@ -320,15 +363,15 @@ metadata: // without GC on, resources persist if they are not mentioned in subsequent syncs. test(t, kube, "", "", false) - test(t, kube, defs1, defs1, false) - test(t, kube, defs1+defs2, defs1+defs2, false) - test(t, kube, defs3, defs1+defs2+defs3, false) + test(t, kube, ns1+defs1, ns1+defs1, false) + test(t, kube, ns1+defs1+defs2, ns1+defs1+defs2, false) + test(t, kube, ns3+defs3, ns1+defs1+defs2+ns3+defs3, false) // Now with GC switched on. That means if we don't include a // resource in a sync, it should be deleted. kube.GC = true - test(t, kube, defs2+defs3, defs3+defs2, false) - test(t, kube, defs1+defs2, defs1+defs2, false) + test(t, kube, ns1+defs2+ns3+defs3, ns1+defs2+ns3+defs3, false) + test(t, kube, ns1+defs1+defs2, ns1+defs1+defs2, false) test(t, kube, "", "", false) }) @@ -385,7 +428,7 @@ metadata: `, depName, depNS) // Add dep to the cluster through syncing - test(t, kube, dep, dep, false) + test(t, kube, ns1+dep, ns1+dep, false) // Add a copy of dep (including the GCmark label) with different name directly to the cluster gvr := schema.GroupVersionResource{ @@ -421,7 +464,7 @@ metadata: kube, _ := setup(t) kube.GC = true - const defs1invalid = ` + const defs1invalid = `--- apiVersion: apps/v1 kind: Deployment metadata: @@ -430,8 +473,8 @@ metadata: annotations: error: fail to apply this ` - test(t, kube, defs1, defs1, false) - test(t, kube, defs1invalid, defs1, true) + test(t, kube, ns1+defs1, ns1+defs1, false) + test(t, kube, ns1+defs1invalid, ns1+defs1invalid, true) }) t.Run("sync doesn't apply or delete manifests marked with ignore", func(t *testing.T) { @@ -459,7 +502,7 @@ metadata: ` // dep1 is created, but dep2 is ignored - test(t, kube, dep1+dep2, dep1, false) + test(t, kube, ns1+dep1+dep2, ns1+dep1, false) const dep1ignored = `--- apiVersion: apps/v1 @@ -474,11 +517,11 @@ spec: labels: {app: bar} ` // dep1 is not updated, but neither is it deleted - test(t, kube, dep1ignored+dep2, dep1, false) + test(t, kube, ns1+dep1ignored+dep2, ns1+dep1, false) }) t.Run("sync doesn't update a cluster resource marked with ignore", func(t *testing.T) { - const dep1 = ` + const dep1 = `--- apiVersion: apps/v1 kind: Deployment metadata: @@ -491,7 +534,7 @@ spec: ` kube, _ := setup(t) // This just checks the starting assumption: dep1 exists in the cluster - test(t, kube, dep1, dep1, false) + test(t, kube, ns1+dep1, ns1+dep1, false) // Now we'll mark it as ignored _in the cluster_ (i.e., the // equivalent of `kubectl annotate`) @@ -512,7 +555,7 @@ spec: t.Fatal(err) } - const mod1 = ` + const mod1 = `--- apiVersion: apps/v1 kind: Deployment metadata: @@ -525,7 +568,7 @@ spec: ` // Check that dep1, which is marked ignore in the cluster, is // neither updated or deleted - test(t, kube, mod1, dep1, false) + test(t, kube, ns1+mod1, ns1+dep1, false) }) t.Run("sync doesn't update or delete a pre-existing resource marked with ignore", func(t *testing.T) { @@ -547,7 +590,12 @@ spec: assert.NoError(t, err) dep1res := &unstructured.Unstructured{Object: dep1obj} gvr := groupVersionResource(dep1res) + var ns1obj corev1.Namespace + err = yaml.Unmarshal([]byte(ns1), &ns1obj) + assert.NoError(t, err) // Put the pre-existing resource in the cluster + _, err = kube.client.coreClient.CoreV1().Namespaces().Create(&ns1obj) + assert.NoError(t, err) dc := kube.client.dynamicClient.Resource(gvr).Namespace(dep1res.GetNamespace()) _, err = dc.Create(dep1res) assert.NoError(t, err)