Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v15] Fix parsing Kubernetes Resources without GroupVersionKind #39168

Merged
merged 2 commits into from Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/kube/proxy/auth_test.go
Expand Up @@ -325,7 +325,7 @@ current-context: foo
require.Empty(t, cmp.Diff(fwd.clusterDetails, tt.want,
cmp.AllowUnexported(staticKubeCreds{}),
cmp.AllowUnexported(kubeDetails{}),
cmpopts.IgnoreFields(kubeDetails{}, "rwMu", "kubeCodecs", "wg", "cancelFunc"),
cmpopts.IgnoreFields(kubeDetails{}, "rwMu", "kubeCodecs", "wg", "cancelFunc", "gvkSupportedResources"),
cmp.Comparer(func(a, b *transport.Config) bool { return (a == nil) == (b == nil) }),
cmp.Comparer(func(a, b *tls.Config) bool { return true }),
cmp.Comparer(func(a, b *kubernetes.Clientset) bool { return (a == nil) == (b == nil) }),
Expand Down
42 changes: 32 additions & 10 deletions lib/kube/proxy/cluster_details.go
Expand Up @@ -21,6 +21,7 @@ package proxy
import (
"context"
"encoding/base64"
"strings"
"sync"
"time"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand All @@ -52,7 +54,7 @@ type kubeDetails struct {
// kubeCluster is the dynamic kube_cluster or a static generated from kubeconfig and that only has the name populated.
kubeCluster types.KubeCluster

// rwMu is the mutex to protect the kubeCodecs and rbacSupportedTypes.
// rwMu is the mutex to protect the kubeCodecs, gvkSupportedResources, and rbacSupportedTypes.
rwMu sync.RWMutex
// kubeCodecs is the codec factory for the cluster resources.
// The codec factory includes the default resources and the namespaced resources
Expand All @@ -64,6 +66,9 @@ type kubeDetails struct {
// The list is updated periodically to include the latest custom resources
// that are added to the cluster.
rbacSupportedTypes rbacSupportedResources
// gvkSupportedResources is the list of registered API path resources and their
// GVK definition.
gvkSupportedResources gvkSupportedResources
// isClusterOffline is true if the cluster is offline.
// An offline cluster will not be able to serve any requests until it comes back online.
// The cluster is marked as offline if the cluster schema cannot be created
Expand Down Expand Up @@ -121,7 +126,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
}
var isClusterOffline bool
// Create the codec factory and the list of supported types for RBAC.
codecFactory, rbacSupportedTypes, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
if err != nil {
cfg.log.WithError(err).Warn("Failed to create cluster schema. Possibly the cluster is offline.")
// If the cluster is offline, we will not be able to create the codec factory
Expand All @@ -133,13 +138,14 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe

ctx, cancel := context.WithCancel(ctx)
k := &kubeDetails{
kubeCreds: creds,
dynamicLabels: dynLabels,
kubeCluster: cfg.cluster,
kubeCodecs: codecFactory,
rbacSupportedTypes: rbacSupportedTypes,
cancelFunc: cancel,
isClusterOffline: isClusterOffline,
kubeCreds: creds,
dynamicLabels: dynLabels,
kubeCluster: cfg.cluster,
kubeCodecs: codecFactory,
rbacSupportedTypes: rbacSupportedTypes,
cancelFunc: cancel,
isClusterOffline: isClusterOffline,
gvkSupportedResources: gvkSupportedRes,
}

k.wg.Add(1)
Expand All @@ -153,7 +159,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
case <-ctx.Done():
return
case <-ticker.Chan():
codecFactory, rbacSupportedTypes, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
codecFactory, rbacSupportedTypes, gvkSupportedResources, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
if err != nil {
cfg.log.WithError(err).Error("Failed to update cluster schema")
continue
Expand All @@ -162,6 +168,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
k.rwMu.Lock()
k.kubeCodecs = codecFactory
k.rbacSupportedTypes = rbacSupportedTypes
k.gvkSupportedResources = gvkSupportedResources
k.isClusterOffline = false
k.rwMu.Unlock()
}
Expand Down Expand Up @@ -193,6 +200,21 @@ func (k *kubeDetails) getClusterSupportedResources() (*serializer.CodecFactory,
return &(k.kubeCodecs), k.rbacSupportedTypes, nil
}

// getObjectGVK returns the default GVK (if any) registered for the specified request path.
func (k *kubeDetails) getObjectGVK(resource apiResource) *schema.GroupVersionKind {
k.rwMu.RLock()
defer k.rwMu.RUnlock()
// kube doesn't use core but teleport does.
if resource.apiGroup == "core" {
resource.apiGroup = ""
}
return k.gvkSupportedResources[gvkSupportedResourcesKey{
name: strings.Split(resource.resourceKind, "/")[0],
apiGroup: resource.apiGroup,
version: resource.apiGroupVersion,
}]
}

// getKubeClusterCredentials generates kube credentials for dynamic clusters.
func getKubeClusterCredentials(ctx context.Context, cfg clusterDetailsConfig) (kubeCreds, error) {
dynCredsCfg := dynamicCredsConfig{kubeCluster: cfg.cluster, log: cfg.log, checker: cfg.checker, resourceMatchers: cfg.resourceMatchers, clock: cfg.clock, component: cfg.component}
Expand Down
4 changes: 3 additions & 1 deletion lib/kube/proxy/resource_deletecollection.go
Expand Up @@ -139,7 +139,9 @@ func (f *Forwarder) handleDeleteCollectionReq(req *http.Request, sess *clusterSe
req.Body.Close()

// decode memory rw body.
obj, err := decodeAndSetGVK(decoder, memWriter.Buffer().Bytes())
// We are reading an API request and API honors the GVK in the request so we don't
// need to set it.
obj, err := decodeAndSetGVK(decoder, memWriter.Buffer().Bytes(), nil /* defaults GVK */)
if err != nil {
return internalErrStatus, trace.Wrap(err)
}
Expand Down
15 changes: 11 additions & 4 deletions lib/kube/proxy/resource_filters.go
Expand Up @@ -36,6 +36,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"

"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -602,7 +603,9 @@ func (d *resourceFilterer) decode(buffer []byte) (runtime.Object, []byte, error)
// Logic from: https://github.com/kubernetes/client-go/blob/58ff029093df37cad9fa28778a37f11fa495d9cf/rest/request.go#L1040
return nil, buffer, nil
default:
out, err := decodeAndSetGVK(d.decoder, buffer)
// We are reading an API request and API honors the GVK in the request so we don't
// need to set it.
out, err := decodeAndSetGVK(d.decoder, buffer, nil /* defaults GVK */)
return out, nil, trace.Wrap(err)
}
}
Expand All @@ -616,7 +619,9 @@ func (d *resourceFilterer) decodePartialObjectMetadata(row *metav1.TableRow) err
}
var err error
// decode only if row.Object.Object was not decoded before.
row.Object.Object, err = decodeAndSetGVK(d.decoder, row.Object.Raw)
// We are reading an API request and API honors the GVK in the request so we don't
// need to set it.
row.Object.Object, err = decodeAndSetGVK(d.decoder, row.Object.Raw, nil /* defaults GVK */)
return trace.Wrap(err)
}

Expand Down Expand Up @@ -729,8 +734,10 @@ func newEncoderAndDecoderForContentType(contentType string, negotiator runtime.C

// decodeAndSetGVK decodes the payload into the appropriate type using the decoder
// provider and sets the GVK if available.
func decodeAndSetGVK(decoder runtime.Decoder, payload []byte) (runtime.Object, error) {
obj, gvk, err := decoder.Decode(payload, nil, nil)
// defaults is the fallback GVK used by the decoder if the payload doesn't set their
// own GVK.
func decodeAndSetGVK(decoder runtime.Decoder, payload []byte, defaults *schema.GroupVersionKind) (runtime.Object, error) {
obj, gvk, err := decoder.Decode(payload, defaults, nil)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/resource_filters_test.go
Expand Up @@ -211,7 +211,7 @@ func Test_filterBuffer(t *testing.T) {
if row.Object.Object == nil {
var err error
// decode only if row.Object.Object was not decoded before.
row.Object.Object, err = decodeAndSetGVK(decoder, row.Object.Raw)
row.Object.Object, err = decodeAndSetGVK(decoder, row.Object.Raw, nil)
require.NoError(t, err)
}

Expand Down
36 changes: 30 additions & 6 deletions lib/kube/proxy/scheme.go
Expand Up @@ -98,17 +98,28 @@ func newClientNegotiator(codecFactory *serializer.CodecFactory) runtime.ClientNe
)
}

// gvkSupportedResourcesKey is the key used in gvkSupportedResources
// to map from a parsed API path to the corresponding resource GVK.
type gvkSupportedResourcesKey struct {
name string
apiGroup string
version string
}

// gvkSupportedResources maps a parsed API path to the corresponding resource GVK.
type gvkSupportedResources map[gvkSupportedResourcesKey]*schema.GroupVersionKind

// newClusterSchemaBuilder creates a new schema builder for the given cluster.
// This schema includes all well-known Kubernetes types and all namespaced
// custom resources.
// It also returns a map of resources that we support RBAC restrictions for.
func newClusterSchemaBuilder(log logrus.FieldLogger, client kubernetes.Interface) (serializer.CodecFactory, rbacSupportedResources, error) {
func newClusterSchemaBuilder(log logrus.FieldLogger, client kubernetes.Interface) (serializer.CodecFactory, rbacSupportedResources, gvkSupportedResources, error) {
kubeScheme := runtime.NewScheme()
kubeCodecs := serializer.NewCodecFactory(kubeScheme)
supportedResources := maps.Clone(defaultRBACResources)

gvkSupportedRes := make(gvkSupportedResources)
if err := registerDefaultKubeTypes(kubeScheme); err != nil {
return serializer.CodecFactory{}, nil, trace.Wrap(err)
return serializer.CodecFactory{}, nil, nil, trace.Wrap(err)
}
// discoveryErr is returned when the discovery of one or more API groups fails.
var discoveryErr *discovery.ErrGroupDiscoveryFailed
Expand All @@ -126,17 +137,30 @@ func newClusterSchemaBuilder(log logrus.FieldLogger, client kubernetes.Interface
// available in the cluster.
log.WithError(err).Debugf("Failed to discover some API groups: %v", maps.Keys(discoveryErr.Groups))
case err != nil:
return serializer.CodecFactory{}, nil, trace.Wrap(err)
return serializer.CodecFactory{}, nil, nil, trace.Wrap(err)
}

for _, apiGroup := range apiGroups {
group, version := getKubeAPIGroupAndVersion(apiGroup.GroupVersion)

for _, apiResource := range apiGroup.APIResources {
// register all types
gvkSupportedRes[gvkSupportedResourcesKey{
name: apiResource.Name, /* pods, configmaps, ... */
apiGroup: group,
version: version,
}] = &schema.GroupVersionKind{
Group: group,
Version: version,
Kind: apiResource.Kind, /* Pod, ConfigMap ...*/
}
}

// Skip well-known Kubernetes API groups because they are already registered
// in the scheme.
if _, ok := knownKubernetesGroups[group]; ok {
continue
}

groupVersion := schema.GroupVersion{Group: group, Version: version}
for _, apiResource := range apiGroup.APIResources {
// Skip cluster-scoped resources because we don't support RBAC restrictions
Expand Down Expand Up @@ -177,7 +201,7 @@ func newClusterSchemaBuilder(log logrus.FieldLogger, client kubernetes.Interface
}
}

return kubeCodecs, supportedResources, nil
return kubeCodecs, supportedResources, gvkSupportedRes, nil
}

// getKubeAPIGroupAndVersion returns the API group and version from the given
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/scheme_test.go
Expand Up @@ -31,7 +31,7 @@ import (
// TestNewClusterSchemaBuilder tests that newClusterSchemaBuilder doesn't panic
// when it's given types already registered in the global scheme.
func Test_newClusterSchemaBuilder(t *testing.T) {
_, _, err := newClusterSchemaBuilder(logrus.StandardLogger(), &clientSet{})
_, _, _, err := newClusterSchemaBuilder(logrus.StandardLogger(), &clientSet{})
require.NoError(t, err)
}

Expand Down
3 changes: 2 additions & 1 deletion lib/kube/proxy/self_subject_reviews.go
Expand Up @@ -241,7 +241,8 @@ func parseSelfSubjectAccessReviewRequest(decoder runtime.Decoder, req *http.Requ
req.Body.Close()

req.Body = io.NopCloser(bytes.NewReader(payload))
obj, err := decodeAndSetGVK(decoder, payload)
gvk := authv1.SchemeGroupVersion.WithKind("SelfSubjectAccessReview")
obj, err := decodeAndSetGVK(decoder, payload, &gvk)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
11 changes: 8 additions & 3 deletions lib/kube/proxy/url.go
Expand Up @@ -26,6 +26,7 @@ import (
"strings"

"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"

"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -238,7 +239,7 @@ func getResourceFromRequest(req *http.Request, kubeDetails *kubeDetails) (*types
case apiResource.resourceName == "" && verb == types.KubeVerbCreate:
// If the request is a create request, extract the resource name from the request body.
var err error
if apiResource.resourceName, err = extractResourceNameFromPostRequest(req, codecFactory); err != nil {
if apiResource.resourceName, err = extractResourceNameFromPostRequest(req, codecFactory, kubeDetails.getObjectGVK(apiResource)); err != nil {
return nil, apiResource, trace.Wrap(err)
}
}
Expand All @@ -256,7 +257,11 @@ func getResourceFromRequest(req *http.Request, kubeDetails *kubeDetails) (*types
// and decodes it into a Kubernetes object. It then extracts the resource name
// from the object.
// The body is then reset to the original request body using a new buffer.
func extractResourceNameFromPostRequest(req *http.Request, codecs *serializer.CodecFactory) (string, error) {
func extractResourceNameFromPostRequest(
req *http.Request,
codecs *serializer.CodecFactory,
defaults *schema.GroupVersionKind,
) (string, error) {
if req.Body == nil {
return "", trace.BadParameter("request body is empty")
}
Expand All @@ -279,7 +284,7 @@ func extractResourceNameFromPostRequest(req *http.Request, codecs *serializer.Co
}
req.Body = io.NopCloser(newBody)
// decode memory rw body.
obj, err := decodeAndSetGVK(decoder, newBody.Bytes())
obj, err := decodeAndSetGVK(decoder, newBody.Bytes(), defaults)
if err != nil {
return "", trace.Wrap(err)
}
Expand Down