Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ gitlab/

# go build output (from go build ./cmd/crank etc)
/crossplane-diff
/diff

# ignore the cluster dir since it's pulled from crossplane/crossplane by earthly
cluster/
283 changes: 265 additions & 18 deletions cmd/diff/client/kubernetes/schema_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,36 @@ import (
"sync"

"github.com/crossplane-contrib/crossplane-diff/cmd/diff/client/core"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
un "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"

"github.com/crossplane/crossplane-runtime/v2/pkg/errors"
"github.com/crossplane/crossplane-runtime/v2/pkg/logging"

xpextv1 "github.com/crossplane/crossplane/v2/apis/apiextensions/v1"
xpextv2 "github.com/crossplane/crossplane/v2/apis/apiextensions/v2"
)

// SchemaClient handles operations related to Kubernetes schemas and CRDs.
type SchemaClient interface {
// GetCRD gets the CustomResourceDefinition for a given GVK
GetCRD(ctx context.Context, gvk schema.GroupVersionKind) (*un.Unstructured, error)
GetCRD(ctx context.Context, gvk schema.GroupVersionKind) (*extv1.CustomResourceDefinition, error)

// GetCRDByName gets the CustomResourceDefinition by its name
GetCRDByName(name string) (*extv1.CustomResourceDefinition, error)

// IsCRDRequired checks if a GVK requires a CRD
IsCRDRequired(ctx context.Context, gvk schema.GroupVersionKind) bool

// ValidateResource validates a resource against its schema
ValidateResource(ctx context.Context, resource *un.Unstructured) error
// LoadCRDsFromXRDs converts XRDs to CRDs and caches them
LoadCRDsFromXRDs(ctx context.Context, xrds []*un.Unstructured) error

// GetAllCRDs returns all cached CRDs (needed for external validation library)
GetAllCRDs() []*extv1.CustomResourceDefinition
}

// DefaultSchemaClient implements SchemaClient.
Expand All @@ -37,6 +48,12 @@ type DefaultSchemaClient struct {
// Resource type caching
resourceTypeMap map[schema.GroupVersionKind]bool
resourceMapMu sync.RWMutex

// CRD caching - consolidated from SchemaValidator
crds []*extv1.CustomResourceDefinition
crdsMu sync.RWMutex
crdByName map[string]*extv1.CustomResourceDefinition // for fast lookup by name
xrdToCRDName map[string]string // maps XRD name to CRD name
}

// NewSchemaClient creates a new DefaultSchemaClient.
Expand All @@ -46,39 +63,64 @@ func NewSchemaClient(clients *core.Clients, typeConverter TypeConverter, logger
typeConverter: typeConverter,
logger: logger,
resourceTypeMap: make(map[schema.GroupVersionKind]bool),
crds: []*extv1.CustomResourceDefinition{},
crdByName: make(map[string]*extv1.CustomResourceDefinition),
xrdToCRDName: make(map[string]string),
}
}

// GetCRD gets the CustomResourceDefinition for a given GVK.
func (c *DefaultSchemaClient) GetCRD(ctx context.Context, gvk schema.GroupVersionKind) (*un.Unstructured, error) {
// Get the pluralized resource name
func (c *DefaultSchemaClient) GetCRD(ctx context.Context, gvk schema.GroupVersionKind) (*extv1.CustomResourceDefinition, error) {
// Get the pluralized resource name to construct CRD name
resourceName, err := c.typeConverter.GetResourceNameForGVK(ctx, gvk)
if err != nil {
return nil, errors.Wrapf(err, "cannot determine CRD name for %s", gvk.String())
}

c.logger.Debug("Looking up CRD", "gvk", gvk.String(), "crdName", resourceName)

// Construct the CRD name using the resource name and group
// Construct the full CRD name
crdName := fmt.Sprintf("%s.%s", resourceName, gvk.Group)

// Check cache first
c.crdsMu.RLock()

if cached, ok := c.crdByName[crdName]; ok {
c.crdsMu.RUnlock()
c.logger.Debug("Using cached CRD", "gvk", gvk.String(), "crdName", crdName)

return cached, nil
}

c.crdsMu.RUnlock()

c.logger.Debug("Looking up CRD", "gvk", gvk.String(), "crdName", resourceName)

// Define the CRD GVR directly to avoid recursion
crdGVR := schema.GroupVersionResource{
Group: "apiextensions.k8s.io",
Version: "v1",
Resource: "customresourcedefinitions",
}

// Fetch the CRD
crd, err := c.dynamicClient.Resource(crdGVR).Get(ctx, crdName, metav1.GetOptions{})
// Fetch the CRD from cluster
crdObj, err := c.dynamicClient.Resource(crdGVR).Get(ctx, crdName, metav1.GetOptions{})
if err != nil {
c.logger.Debug("Failed to get CRD", "gvk", gvk.String(), "crdName", crdName, "error", err)
return nil, errors.Wrapf(err, "cannot get CRD %s for %s", crdName, gvk.String())
}

c.logger.Debug("Successfully retrieved CRD", "gvk", gvk.String(), "crdName", resourceName)

return crd, nil
// Convert to typed CRD
crdTyped := &extv1.CustomResourceDefinition{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(crdObj.Object, crdTyped); err != nil {
c.logger.Debug("Error converting CRD", "gvk", gvk.String(), "crdName", crdName, "error", err)
return nil, errors.Wrapf(err, "cannot convert CRD %s to typed", crdName)
}

// Add to cache
c.addCRD(crdTyped)

return crdTyped, nil
}

// IsCRDRequired checks if a GVK requires a CRD.
Expand Down Expand Up @@ -135,17 +177,222 @@ func (c *DefaultSchemaClient) IsCRDRequired(ctx context.Context, gvk schema.Grou
return true
}

// ValidateResource validates a resource against its schema.
func (c *DefaultSchemaClient) ValidateResource(_ context.Context, resource *un.Unstructured) error {
// This would use OpenAPI validation - simplified for now
c.logger.Debug("Validating resource", "kind", resource.GetKind(), "name", resource.GetName())
return nil
}

// Helper to cache resource type requirements.
func (c *DefaultSchemaClient) cacheResourceType(gvk schema.GroupVersionKind, requiresCRD bool) {
c.resourceMapMu.Lock()
defer c.resourceMapMu.Unlock()

c.resourceTypeMap[gvk] = requiresCRD
}

// extractGVKsFromXRDs extracts GVKs from multiple XRDs. This is a pure function with no side effects.
func extractGVKsFromXRDs(xrds []*un.Unstructured) ([]schema.GroupVersionKind, error) {
var allGVKs []schema.GroupVersionKind

for _, xrd := range xrds {
gvks, err := extractGVKsFromXRD(xrd)
if err != nil {
return nil, errors.Wrapf(err, "failed to extract GVKs from XRD %s", xrd.GetName())
}

allGVKs = append(allGVKs, gvks...)
}

return allGVKs, nil
}

// extractGVKsFromXRD extracts all GroupVersionKinds from an XRD using strongly-typed conversion.
// This method handles both v1 and v2 XRDs and leverages Kubernetes runtime conversion.
func extractGVKsFromXRD(xrd *un.Unstructured) ([]schema.GroupVersionKind, error) {
apiVersion := xrd.GetAPIVersion()

switch apiVersion {
case "apiextensions.crossplane.io/v1":
return extractGVKsFromV1XRD(xrd)
case "apiextensions.crossplane.io/v2":
return extractGVKsFromV2XRD(xrd)
default:
return nil, errors.Errorf("unsupported XRD apiVersion %s in XRD %s", apiVersion, xrd.GetName())
}
}

// extractGVKsFromV1XRD extracts GVKs from a v1 XRD using strongly-typed conversion.
func extractGVKsFromV1XRD(xrd *un.Unstructured) ([]schema.GroupVersionKind, error) {
typedXRD := &xpextv1.CompositeResourceDefinition{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(xrd.Object, typedXRD); err != nil {
return nil, errors.Wrapf(err, "cannot convert XRD %s to v1 typed object", xrd.GetName())
}

// Extract GVKs for each version - no validation needed since XRDs from server are guaranteed valid
gvks := make([]schema.GroupVersionKind, 0, len(typedXRD.Spec.Versions))
for _, version := range typedXRD.Spec.Versions {
gvks = append(gvks, schema.GroupVersionKind{
Group: typedXRD.Spec.Group,
Version: version.Name,
Kind: typedXRD.Spec.Names.Kind,
})
}

return gvks, nil
}

// extractGVKsFromV2XRD extracts GVKs from a v2 XRD using strongly-typed conversion.
func extractGVKsFromV2XRD(xrd *un.Unstructured) ([]schema.GroupVersionKind, error) {
typedXRD := &xpextv2.CompositeResourceDefinition{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(xrd.Object, typedXRD); err != nil {
return nil, errors.Wrapf(err, "cannot convert XRD %s to v2 typed object", xrd.GetName())
}

// Extract GVKs for each version - no validation needed since XRDs from server are guaranteed valid
gvks := make([]schema.GroupVersionKind, 0, len(typedXRD.Spec.Versions))
for _, version := range typedXRD.Spec.Versions {
gvks = append(gvks, schema.GroupVersionKind{
Group: typedXRD.Spec.Group,
Version: version.Name,
Kind: typedXRD.Spec.Names.Kind,
})
}

return gvks, nil
}

// LoadCRDsFromXRDs fetches corresponding CRDs from the cluster for the given XRDs and caches them.
// Instead of converting XRDs to CRDs, this method fetches the actual CRDs that should already
// exist in the cluster since the Crossplane control plane manages both XRDs and their corresponding CRDs.
func (c *DefaultSchemaClient) LoadCRDsFromXRDs(ctx context.Context, xrds []*un.Unstructured) error {
c.logger.Debug("Loading CRDs from cluster for XRDs", "xrdCount", len(xrds))

if len(xrds) == 0 {
c.logger.Debug("No XRDs provided, nothing to load")
return nil
}

// Extract GVKs from XRDs using the pure function
gvks, err := extractGVKsFromXRDs(xrds)
if err != nil {
return err // Error already wrapped with context from ExtractGVKsFromXRDs
}

// Build XRD-to-CRD name mappings for later use
xrdToCRDMappings := make(map[string]string) // XRD name -> CRD name

for _, xrd := range xrds {
// Extract the CRD name from XRD spec (format: {plural}.{group})
group, _, _ := un.NestedString(xrd.Object, "spec", "group")

plural, _, _ := un.NestedString(xrd.Object, "spec", "names", "plural")
if group != "" && plural != "" {
crdName := plural + "." + group
xrdName := xrd.GetName()
xrdToCRDMappings[xrdName] = crdName
c.logger.Debug("Mapped XRD to CRD", "xrdName", xrdName, "crdName", crdName)
}
}

// Load CRDs from the extracted GVKs
err = c.loadCRDsFromGVKs(ctx, gvks)
if err != nil {
return err // Error already wrapped with context from LoadCRDsFromGVKs
}

// Store XRD-to-CRD name mappings
c.crdsMu.Lock()

for xrdName, crdName := range xrdToCRDMappings {
c.xrdToCRDName[xrdName] = crdName
}

c.crdsMu.Unlock()

c.logger.Debug("Successfully stored XRD-to-CRD mappings", "count", len(xrdToCRDMappings))

return nil
}

// loadCRDsFromGVKs fetches CRDs from the cluster for the given GVKs and caches them.
// This method fetches the actual CRDs from the cluster for each provided GVK.
func (c *DefaultSchemaClient) loadCRDsFromGVKs(ctx context.Context, gvks []schema.GroupVersionKind) error {
c.logger.Debug("Loading CRDs from cluster for GVKs", "gvkCount", len(gvks))

if len(gvks) == 0 {
c.logger.Debug("No GVKs provided, nothing to load")
return nil
}

// TODO: Consider parallel fetching of CRDs to improve performance for large numbers of GVKs.
// This could significantly speed up initialization when dealing with many XRDs.
// For now, we fetch sequentially to keep the implementation simple.

// Fetch CRDs from cluster for each GVK - fail fast if any CRD is missing
// Per repository guidelines: never continue in a degraded state
fetchedCRDs := make([]*extv1.CustomResourceDefinition, 0, len(gvks))

for _, gvk := range gvks {
crd, err := c.GetCRD(ctx, gvk)
if err != nil {
c.logger.Debug("Failed to fetch required CRD for GVK", "gvk", gvk.String(), "error", err)
return errors.Wrapf(err, "cannot fetch required CRD for %s", gvk.String())
}

fetchedCRDs = append(fetchedCRDs, crd)
}

c.logger.Debug("Successfully fetched all required CRDs from cluster", "count", len(fetchedCRDs))

return nil
}

// GetCRDByName gets a CRD by its name from the cache.
// If the name is not found directly, it will also check if it's an XRD name
// that maps to a different CRD name (e.g., claim XRDs).
func (c *DefaultSchemaClient) GetCRDByName(name string) (*extv1.CustomResourceDefinition, error) {
c.crdsMu.RLock()
defer c.crdsMu.RUnlock()

// First, try direct lookup by CRD name
if crd, exists := c.crdByName[name]; exists {
return crd, nil
}

// If not found, check if this is an XRD name that maps to a different CRD name
if crdName, exists := c.xrdToCRDName[name]; exists {
if crd, exists := c.crdByName[crdName]; exists {
c.logger.Debug("Found CRD for XRD via name mapping", "xrdName", name, "crdName", crdName)
return crd, nil
}
}

return nil, errors.Errorf("CRD with name %s not found in cache", name)
}

// GetAllCRDs returns all cached CRDs.
func (c *DefaultSchemaClient) GetAllCRDs() []*extv1.CustomResourceDefinition {
c.crdsMu.RLock()
defer c.crdsMu.RUnlock()

// Return a copy to prevent external modification
result := make([]*extv1.CustomResourceDefinition, len(c.crds))
copy(result, c.crds)

return result
}

// addCRD adds a CRD to the cache.
func (c *DefaultSchemaClient) addCRD(crd *extv1.CustomResourceDefinition) {
c.crdsMu.Lock()
defer c.crdsMu.Unlock()

// Check if already cached to avoid duplicates
if _, exists := c.crdByName[crd.Name]; exists {
c.logger.Debug("CRD already in cache, skipping", "crdName", crd.Name)
return
}

// Add to slice
c.crds = append(c.crds, crd)

// Add to name lookup map
c.crdByName[crd.Name] = crd

c.logger.Debug("Added CRD to cache", "crdName", crd.Name)
}
Loading
Loading