Skip to content

Commit

Permalink
clusterCache: add RWMutex for namespacedResources
Browse files Browse the repository at this point in the history
namespacedResources is accessed by multiple goroutines simultaneously
and therefore must be guarded by a mutex. Because this field is read
significantly more often than it's written, use an RWMutex.
  • Loading branch information
ncdc committed Jul 1, 2024
1 parent 7c2ee16 commit fa96340
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 20 deletions.
17 changes: 17 additions & 0 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ type clusterCache struct {
apisMeta map[schema.GroupKind]*apiMeta
serverVersion string
apiResources []kube.APIResourceInfo

namespacedResourcesLock sync.RWMutex
// namespacedResources is a simple map which indicates a groupKind is namespaced
namespacedResources map[schema.GroupKind]bool

Expand Down Expand Up @@ -436,7 +438,11 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
opts[i](c)
}
c.apisMeta = nil

c.namespacedResourcesLock.Lock()
c.namespacedResources = nil
c.namespacedResourcesLock.Unlock()

c.log.Info("Invalidated cluster")
}

Expand Down Expand Up @@ -516,7 +522,11 @@ func (c *clusterCache) startMissingWatches() error {
}
}
}

c.namespacedResourcesLock.Lock()
c.namespacedResources = namespacedResources
c.namespacedResourcesLock.Unlock()

return nil
}

Expand Down Expand Up @@ -801,7 +811,11 @@ func (c *clusterCache) sync() error {
}
c.apisMeta = make(map[schema.GroupKind]*apiMeta)
c.resources = make(map[kube.ResourceKey]*Resource)

c.namespacedResourcesLock.Lock()
defer c.namespacedResourcesLock.Unlock()
c.namespacedResources = make(map[schema.GroupKind]bool)

config := c.config
version, err := c.kubectl.GetServerVersion(config)

Expand Down Expand Up @@ -1002,6 +1016,9 @@ func (c *clusterCache) IterateHierarchy(key kube.ResourceKey, action func(resour

// IsNamespaced answers if specified group/kind is a namespaced resource API or not
func (c *clusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) {
c.namespacedResourcesLock.RLock()
defer c.namespacedResourcesLock.RUnlock()

if isNamespaced, ok := c.namespacedResources[gk]; ok {
return isNamespaced, nil
}
Expand Down
73 changes: 53 additions & 20 deletions pkg/cache/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package cache
import (
"context"
"fmt"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
"sort"
"strings"
"testing"
"time"

"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -84,6 +85,10 @@ func newCluster(t *testing.T, objs ...runtime.Object) *clusterCache {
})

apiResources := []kube.APIResourceInfo{{
GroupKind: schema.GroupKind{Group: "", Kind: "Namespace"},
GroupVersionResource: schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"},
Meta: metav1.APIResource{Namespaced: false},
}, {
GroupKind: schema.GroupKind{Group: "", Kind: "Pod"},
GroupVersionResource: schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"},
Meta: metav1.APIResource{Namespaced: true},
Expand Down Expand Up @@ -492,23 +497,23 @@ metadata:
func TestGetManagedLiveObjsFailedConversion(t *testing.T) {
cronTabGroup := "stable.example.com"

testCases := []struct{
name string
localConvertFails bool
testCases := []struct {
name string
localConvertFails bool
expectConvertToVersionCalled bool
expectGetResourceCalled bool
expectGetResourceCalled bool
}{
{
name: "local convert fails, so GetResource is called",
localConvertFails: true,
name: "local convert fails, so GetResource is called",
localConvertFails: true,
expectConvertToVersionCalled: true,
expectGetResourceCalled: true,
expectGetResourceCalled: true,
},
{
name: "local convert succeeds, so GetResource is not called",
localConvertFails: false,
name: "local convert succeeds, so GetResource is not called",
localConvertFails: false,
expectConvertToVersionCalled: true,
expectGetResourceCalled: false,
expectGetResourceCalled: false,
},
}

Expand Down Expand Up @@ -557,7 +562,6 @@ metadata:
return testCronTab(), nil
})


managedObjs, err := cluster.GetManagedLiveObjs([]*unstructured.Unstructured{targetDeploy}, func(r *Resource) bool {
return true
})
Expand Down Expand Up @@ -816,25 +820,25 @@ func testPod() *corev1.Pod {

func testCRD() *apiextensions.CustomResourceDefinition {
return &apiextensions.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "crontabs.stable.example.com",
},
Spec: apiextensions.CustomResourceDefinitionSpec{
Spec: apiextensions.CustomResourceDefinitionSpec{
Group: "stable.example.com",
Versions: []apiextensions.CustomResourceDefinitionVersion{
{
Name: "v1",
Served: true,
Name: "v1",
Served: true,
Storage: true,
Schema: &apiextensions.CustomResourceValidation{
OpenAPIV3Schema: &apiextensions.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensions.JSONSchemaProps{
"cronSpec": {Type: "string"},
"image": {Type: "string"},
"image": {Type: "string"},
"replicas": {Type: "integer"},
},
},
Expand All @@ -855,14 +859,14 @@ func testCRD() *apiextensions.CustomResourceDefinition {
func testCronTab() *unstructured.Unstructured {
return &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "stable.example.com/v1",
"kind": "CronTab",
"kind": "CronTab",
"metadata": map[string]interface{}{
"name": "test-crontab",
"name": "test-crontab",
"namespace": "default",
},
"spec": map[string]interface{}{
"cronSpec": "* * * * */5",
"image": "my-awesome-cron-image",
"image": "my-awesome-cron-image",
},
}}
}
Expand Down Expand Up @@ -1006,3 +1010,32 @@ func TestIterateHierachy(t *testing.T) {
keys)
})
}

func TestIsNamespaced_Sync_RaceCondition(t *testing.T) {
cc := newCluster(t)

t.Log("Syncing cluster cache to prime namespacedResources map")
err := cc.sync()
require.NoError(t, err)

t.Log("Starting infinite sync loop goroutine")
go func() {
for {
cc.lock.Lock()
cc.syncStatus.lock.Lock()

err := cc.sync()

cc.syncStatus.lock.Unlock()
cc.lock.Unlock()

require.NoError(t, err)
}
}()

t.Log("Repeatedly checking if a Namespace is NamespacedOrUnknown")
namespaceGK := schema.GroupKind{Group: "", Kind: "Namespace"}
for i := 0; i < 1000; i++ {
require.False(t, kube.IsNamespacedOrUnknown(cc, namespaceGK))
}
}

0 comments on commit fa96340

Please sign in to comment.