Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1668 from 2opremio/1471-extend-ns-filtering
Browse files Browse the repository at this point in the history
Extend namespace filtering to all operations on namespaced resources
  • Loading branch information
Alfonso Acosta committed Mar 27, 2019
2 parents ed186fa + 0aaca0b commit 0e601a6
Show file tree
Hide file tree
Showing 14 changed files with 234 additions and 109 deletions.
1 change: 1 addition & 0 deletions cluster/cluster.go
Expand Up @@ -27,6 +27,7 @@ type Cluster interface {
// Get all of the services (optionally, from a specific namespace), excluding those
AllWorkloads(maybeNamespace string) ([]Workload, error)
SomeWorkloads([]flux.ResourceID) ([]Workload, error)
IsAllowedResource(flux.ResourceID) bool
Ping() error
Export() ([]byte, error)
Sync(SyncSet) error
Expand Down
2 changes: 1 addition & 1 deletion cluster/kubernetes/images.go
Expand Up @@ -123,7 +123,7 @@ func mergeCredentials(log func(...interface{}) error,
func (c *Cluster) ImagesToFetch() registry.ImageCreds {
allImageCreds := make(registry.ImageCreds)

namespaces, err := c.getAllowedNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces()
if err != nil {
c.logger.Log("err", errors.Wrap(err, "getting namespaces"))
return allImageCreds
Expand Down
76 changes: 53 additions & 23 deletions cluster/kubernetes/kubernetes.go
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes/resource"
fhrclient "github.com/weaveworks/flux/integrations/client/clientset/versioned"
"github.com/weaveworks/flux/ssh"
)
Expand Down Expand Up @@ -95,22 +96,22 @@ type Cluster struct {
syncErrors map[flux.ResourceID]error
muSyncErrors sync.RWMutex

nsWhitelist []string
nsWhitelistLogged map[string]bool // to keep track of whether we've logged a problem with seeing a whitelisted ns
allowedNamespaces []string
loggedAllowedNS map[string]bool // to keep track of whether we've logged a problem with seeing an allowed namespace

imageExcludeList []string
mu sync.Mutex
}

// NewCluster returns a usable cluster.
func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing, logger log.Logger, nsWhitelist []string, imageExcludeList []string) *Cluster {
func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing, logger log.Logger, allowedNamespaces []string, imageExcludeList []string) *Cluster {
c := &Cluster{
client: client,
applier: applier,
logger: logger,
sshKeyRing: sshKeyRing,
nsWhitelist: nsWhitelist,
nsWhitelistLogged: map[string]bool{},
allowedNamespaces: allowedNamespaces,
loggedAllowedNS: map[string]bool{},
imageExcludeList: imageExcludeList,
}

Expand All @@ -119,12 +120,15 @@ func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing,

// --- cluster.Cluster

// SomeWorkloads returns the workloads named, missing out any that aren't
// accessible in the cluster. They do not necessarily have to be returned
// in the order requested.
// SomeWorkloads returns the workloads named, missing out any that don't
// exist in the cluster or aren't in an allowed namespace.
// They do not necessarily have to be returned in the order requested.
func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, err error) {
var workloads []cluster.Workload
for _, id := range ids {
if !c.IsAllowedResource(id) {
continue
}
ns, kind, name := id.Components()

resourceKind, ok := resourceKinds[kind]
Expand All @@ -150,10 +154,10 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload,
return workloads, nil
}

// AllWorkloads returns all workloads matching the criteria; that is, in
// AllWorkloads returns all workloads in allowed namespaces matching the criteria; that is, in
// the namespace (or any namespace if that argument is empty)
func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err error) {
namespaces, err := c.getAllowedNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces()
if err != nil {
return nil, errors.Wrap(err, "getting namespaces")
}
Expand Down Expand Up @@ -213,7 +217,7 @@ func (c *Cluster) Ping() error {
func (c *Cluster) Export() ([]byte, error) {
var config bytes.Buffer

namespaces, err := c.getAllowedNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces()
if err != nil {
return nil, errors.Wrap(err, "getting namespaces")
}
Expand Down Expand Up @@ -262,24 +266,24 @@ func (c *Cluster) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) {
return publicKey, nil
}

// getAllowedNamespaces returns a list of namespaces that the Flux instance is expected
// to have access to and can look for resources inside of.
// It returns a list of all namespaces unless a namespace whitelist has been set on the Cluster
// instance, in which case it returns a list containing the namespaces from the whitelist
// that exist in the cluster.
func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) {
if len(c.nsWhitelist) > 0 {
// getAllowedAndExistingNamespaces returns a list of existing namespaces that
// the Flux instance is expected to have access to and can look for resources inside of.
// It returns a list of all namespaces unless an explicit list of allowed namespaces
// has been set on the Cluster instance.
func (c *Cluster) getAllowedAndExistingNamespaces() ([]apiv1.Namespace, error) {
if len(c.allowedNamespaces) > 0 {
nsList := []apiv1.Namespace{}
for _, name := range c.nsWhitelist {
for _, name := range c.allowedNamespaces {
ns, err := c.client.CoreV1().Namespaces().Get(name, meta_v1.GetOptions{})
switch {
case err == nil:
c.nsWhitelistLogged[name] = false // reset, so if the namespace goes away we'll log it again
c.loggedAllowedNS[name] = false // reset, so if the namespace goes away we'll log it again
nsList = append(nsList, *ns)
case apierrors.IsUnauthorized(err) || apierrors.IsForbidden(err) || apierrors.IsNotFound(err):
if !c.nsWhitelistLogged[name] {
c.logger.Log("warning", "whitelisted namespace inaccessible", "namespace", name, "err", err)
c.nsWhitelistLogged[name] = true
if !c.loggedAllowedNS[name] {
c.logger.Log("warning", "cannot access allowed namespace",
"namespace", name, "err", err)
c.loggedAllowedNS[name] = true
}
default:
return nil, err
Expand All @@ -295,6 +299,32 @@ func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) {
return namespaces.Items, nil
}

func (c *Cluster) IsAllowedResource(id flux.ResourceID) bool {
if len(c.allowedNamespaces) == 0 {
// All resources are allowed when all namespaces are allowed
return true
}

namespace, kind, name := id.Components()
namespaceToCheck := namespace

if namespace == resource.ClusterScope {
// All cluster-scoped resources (not namespaced) are allowed ...
if kind != "namespace" {
return true
}
// ... except namespaces themselves, whose name needs to be explicitly allowed
namespaceToCheck = name
}

for _, allowedNS := range c.allowedNamespaces {
if namespaceToCheck == allowedNS {
return true
}
}
return false
}

// kind & apiVersion must be passed separately as the object's TypeMeta is not populated
func appendYAML(buffer *bytes.Buffer, apiVersion, kind string, object interface{}) error {
yamlBytes, err := k8syaml.Marshal(object)
Expand Down
2 changes: 1 addition & 1 deletion cluster/kubernetes/kubernetes_test.go
Expand Up @@ -28,7 +28,7 @@ func testGetAllowedNamespaces(t *testing.T, namespace []string, expected []strin
client := ExtendedClient{coreClient: clientset}
c := NewCluster(client, nil, nil, log.NewNopLogger(), namespace, []string{})

namespaces, err := c.getAllowedNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces()
if err != nil {
t.Errorf("The error should be nil, not: %s", err)
}
Expand Down
64 changes: 48 additions & 16 deletions cluster/kubernetes/sync.go
Expand Up @@ -54,15 +54,19 @@ func (c *Cluster) Sync(syncSet cluster.SyncSet) error {

// NB we get all resources, since we care about leaving unsynced,
// _ignored_ resources alone.
clusterResources, err := c.getResourcesBySelector("")
clusterResources, err := c.getAllowedResourcesBySelector("")
if err != nil {
return errors.Wrap(err, "collating resources in cluster for sync")
}

cs := makeChangeSet()
var errs cluster.SyncError
for _, res := range syncSet.Resources {
id := res.ResourceID().String()
resID := res.ResourceID()
if !c.IsAllowedResource(resID) {
continue
}
id := resID.String()
// make a record of the checksum, whether we stage it to
// be applied or not, so that we don't delete it later.
csum := sha1.Sum(res.Bytes())
Expand Down Expand Up @@ -122,7 +126,7 @@ func (c *Cluster) collectGarbage(

orphanedResources := makeChangeSet()

clusterResources, err := c.getGCMarkedResourcesInSyncSet(syncSet.Name)
clusterResources, err := c.getAllowedGCMarkedResourcesInSyncSet(syncSet.Name)
if err != nil {
return nil, errors.Wrap(err, "collating resources in cluster for calculating garbage collection")
}
Expand Down Expand Up @@ -188,7 +192,7 @@ func (r *kuberesource) GetGCMark() string {
return r.obj.GetLabels()[gcMarkLabel]
}

func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesource, error) {
func (c *Cluster) getAllowedResourcesBySelector(selector string) (map[string]*kuberesource, error) {
listOptions := meta_v1.ListOptions{}
if selector != "" {
listOptions.LabelSelector = selector
Expand Down Expand Up @@ -216,14 +220,12 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
if !contains(verbs, "list") {
continue
}

groupVersion, err := schema.ParseGroupVersion(resource.GroupVersion)
if err != nil {
return nil, err
}

resourceClient := c.client.dynamicClient.Resource(groupVersion.WithResource(apiResource.Name))
data, err := resourceClient.List(listOptions)
gvr := groupVersion.WithResource(apiResource.Name)
list, err := c.listAllowedResources(apiResource.Namespaced, gvr, listOptions)
if err != nil {
if apierrors.IsForbidden(err) {
// we are not allowed to list this resource but
Expand All @@ -233,7 +235,7 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
return nil, err
}

for i, item := range data.Items {
for i, item := range list {
apiVersion := item.GetAPIVersion()
kind := item.GetKind()

Expand All @@ -244,7 +246,7 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
}
// TODO(michael) also exclude anything that has an ownerReference (that isn't "standard"?)

res := &kuberesource{obj: &data.Items[i], namespaced: apiResource.Namespaced}
res := &kuberesource{obj: &list[i], namespaced: apiResource.Namespaced}
result[res.ResourceID().String()] = res
}
}
Expand All @@ -253,18 +255,48 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
return result, nil
}

func (c *Cluster) getGCMarkedResourcesInSyncSet(syncSetName string) (map[string]*kuberesource, error) {
allGCMarkedResources, err := c.getResourcesBySelector(gcMarkLabel) // means "gcMarkLabel exists"
func (c *Cluster) listAllowedResources(
namespaced bool, gvr schema.GroupVersionResource, options meta_v1.ListOptions) ([]unstructured.Unstructured, error) {
if !namespaced {
// The resource is not namespaced, everything is allowed
resourceClient := c.client.dynamicClient.Resource(gvr)
data, err := resourceClient.List(options)
if err != nil {
return nil, err
}
return data.Items, nil
}

// List resources only from the allowed namespaces
namespaces, err := c.getAllowedAndExistingNamespaces()
if err != nil {
return nil, err
}
var result []unstructured.Unstructured
for _, ns := range namespaces {
data, err := c.client.dynamicClient.Resource(gvr).Namespace(ns.Name).List(options)
if err != nil {
return result, err
}
result = append(result, data.Items...)
}
return result, nil
}

func (c *Cluster) getAllowedGCMarkedResourcesInSyncSet(syncSetName string) (map[string]*kuberesource, error) {
allGCMarkedResources, err := c.getAllowedResourcesBySelector(gcMarkLabel) // means "gcMarkLabel exists"
if err != nil {
return nil, err
}
syncSetGCMarkedResources := map[string]*kuberesource{}
allowedSyncSetGCMarkedResources := map[string]*kuberesource{}
for resID, kres := range allGCMarkedResources {
if kres.GetGCMark() == makeGCMark(syncSetName, resID) {
syncSetGCMarkedResources[resID] = kres
// Discard resources whose mark doesn't match their resource ID
if kres.GetGCMark() != makeGCMark(syncSetName, resID) {
continue
}
allowedSyncSetGCMarkedResources[resID] = kres
}
return syncSetGCMarkedResources, nil
return allowedSyncSetGCMarkedResources, nil
}

func applyMetadata(res resource.Resource, syncSetName, checksum string) ([]byte, error) {
Expand Down

0 comments on commit 0e601a6

Please sign in to comment.