Navigation Menu

Skip to content

Commit

Permalink
Use profile from configmap
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumerose committed Jul 2, 2020
1 parent 9b26547 commit 93eeecf
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 20 deletions.
2 changes: 1 addition & 1 deletion hack/cluster-version-util/task_graph.go
Expand Up @@ -30,7 +30,7 @@ func newTaskGraphCmd() *cobra.Command {

func runTaskGraphCmd(cmd *cobra.Command, args []string) error {
manifestDir := args[0]
release, err := payload.LoadUpdate(manifestDir, "", "")
release, err := payload.LoadUpdate(manifestDir, "", "", "")
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cvo/cvo.go
Expand Up @@ -238,7 +238,7 @@ func (vcb *verifyClientBuilder) HTTPClient() (*http.Client, error) {
// controller that loads and applies content to the cluster. It returns an error if the payload appears to
// be in error rather than continuing.
func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestConfig *rest.Config) error {
update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.releaseImage, optr.exclude)
update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.releaseImage, optr.exclude, "")
if err != nil {
return fmt.Errorf("the local release contents are invalid - no current version can be determined from disk: %v", err)
}
Expand Down Expand Up @@ -284,6 +284,7 @@ func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestCo
Steps: 3,
},
optr.exclude,
optr.defaultClusterProfileRetriever(),
)

return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/cvo/cvo_scenarios_test.go
Expand Up @@ -92,6 +92,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak
Steps: 1,
},
"exclude-test",
&fakeClusterProfileRetriever{},
)
o.configSync = worker

Expand Down
6 changes: 6 additions & 0 deletions pkg/cvo/sync_test.go
Expand Up @@ -424,6 +424,12 @@ func (r *fakeDirectoryRetriever) RetrievePayload(ctx context.Context, update con
return r.Info, r.Err
}

type fakeClusterProfileRetriever struct{}

func (fakeClusterProfileRetriever) RetrieveClusterProfile(ctx context.Context) (string, error) {
return "default", nil
}

// testResourceBuilder uses a fake dynamic client to exercise the generic builder in tests.
type testResourceBuilder struct {
client *dynamicfake.FakeDynamicClient
Expand Down
21 changes: 17 additions & 4 deletions pkg/cvo/sync_worker.go
Expand Up @@ -56,6 +56,10 @@ type PayloadRetriever interface {
RetrievePayload(ctx context.Context, desired configv1.Update) (PayloadInfo, error)
}

type ClusterProfileRetriever interface {
RetrieveClusterProfile(ctx context.Context) (string, error)
}

// StatusReporter abstracts how status is reported by the worker run method. Introduced for testing.
type StatusReporter interface {
Report(status SyncWorkerStatus)
Expand Down Expand Up @@ -152,11 +156,13 @@ type SyncWorker struct {
// manifests should be excluded based on an annotation
// of the form exclude.release.openshift.io/<identifier>=true
exclude string

clusterProfileRetriever ClusterProfileRetriever
}

// NewSyncWorker initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder
// to a server, and obey limits about how often to reconcile or retry on errors.
func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, reconcileInterval time.Duration, backoff wait.Backoff, exclude string) *SyncWorker {
func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, clusterProfileRetriever ClusterProfileRetriever) *SyncWorker {
return &SyncWorker{
retriever: retriever,
builder: builder,
Expand All @@ -171,14 +177,16 @@ func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder,
report: make(chan SyncWorkerStatus, 500),

exclude: exclude,

clusterProfileRetriever: clusterProfileRetriever,
}
}

// NewSyncWorkerWithPreconditions initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder
// to a server, and obey limits about how often to reconcile or retry on errors.
// It allows providing preconditions for loading payload.
func NewSyncWorkerWithPreconditions(retriever PayloadRetriever, builder payload.ResourceBuilder, preconditions precondition.List, reconcileInterval time.Duration, backoff wait.Backoff, exclude string) *SyncWorker {
worker := NewSyncWorker(retriever, builder, reconcileInterval, backoff, exclude)
func NewSyncWorkerWithPreconditions(retriever PayloadRetriever, builder payload.ResourceBuilder, preconditions precondition.List, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, clusterProfileRetriever ClusterProfileRetriever) *SyncWorker {
worker := NewSyncWorker(retriever, builder, reconcileInterval, backoff, exclude, clusterProfileRetriever)
worker.preconditions = preconditions
return worker
}
Expand Down Expand Up @@ -494,7 +502,12 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
return err
}

payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image, w.exclude)
profile, err := w.clusterProfileRetriever.RetrieveClusterProfile(ctx)
if err != nil {
return err
}

payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image, w.exclude, profile)
if err != nil {
reporter.Report(SyncWorkerStatus{
Generation: work.Generation,
Expand Down
@@ -0,0 +1,6 @@
kind: Test
apiVersion: v1
metadata:
name: file-30-yml
annotations:
include.release.openshift.io/edge: "true"
20 changes: 20 additions & 0 deletions pkg/cvo/updatepayload.go
Expand Up @@ -5,6 +5,8 @@ import (
"crypto/md5"
"encoding/base64"
"fmt"
"github.com/openshift/cluster-version-operator/pkg/internal"
listerscorev1 "k8s.io/client-go/listers/core/v1"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -315,3 +317,21 @@ func findUpdateFromConfigVersion(config *configv1.ClusterVersion, version string
}
return configv1.Update{}, false
}

func (optr *Operator) defaultClusterProfileRetriever() ClusterProfileRetriever {
return &clusterProfileRetriever{
cmConfigLister: optr.cmConfigLister,
}
}

type clusterProfileRetriever struct {
cmConfigLister listerscorev1.ConfigMapNamespaceLister
}

func (r *clusterProfileRetriever) RetrieveClusterProfile(_ context.Context) (string, error) {
configMap, err := r.cmConfigLister.Get(internal.ClusterProfileConfigMap)
if err != nil {
return "", err
}
return configMap.Data["profile"], nil
}
7 changes: 4 additions & 3 deletions pkg/internal/constants.go
@@ -1,7 +1,8 @@
package internal

const (
ConfigNamespace = "openshift-config"
InstallerConfigMap = "openshift-install"
ManifestsConfigMap = "openshift-install-manifests"
ConfigNamespace = "openshift-config"
InstallerConfigMap = "openshift-install"
ManifestsConfigMap = "openshift-install-manifests"
ClusterProfileConfigMap = "cluster-profile"
)
39 changes: 34 additions & 5 deletions pkg/payload/payload.go
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -107,7 +108,7 @@ type Update struct {
Manifests []lib.Manifest
}

func LoadUpdate(dir, releaseImage, excludeIdentifier string) (*Update, error) {
func LoadUpdate(dir, releaseImage, excludeIdentifier, profile string) (*Update, error) {
payload, tasks, err := loadUpdatePayloadMetadata(dir, releaseImage)
if err != nil {
return nil, err
Expand Down Expand Up @@ -157,7 +158,7 @@ func LoadUpdate(dir, releaseImage, excludeIdentifier string) (*Update, error) {
// Filter out manifests that should be excluded based on annotation
filteredMs := []lib.Manifest{}
for _, manifest := range ms {
if shouldExclude(excludeIdentifier, &manifest) {
if shouldExclude(excludeIdentifier, profile, &manifest) {
continue
}
filteredMs = append(filteredMs, manifest)
Expand Down Expand Up @@ -188,10 +189,38 @@ func LoadUpdate(dir, releaseImage, excludeIdentifier string) (*Update, error) {
return payload, nil
}

func shouldExclude(excludeIdentifier string, manifest *lib.Manifest) bool {
excludeAnnotation := fmt.Sprintf("exclude.release.openshift.io/%s", excludeIdentifier)
func shouldExclude(excludeIdentifier, profile string, manifest *lib.Manifest) bool {
annotations := manifest.Object().GetAnnotations()
return annotations != nil && annotations[excludeAnnotation] == "true"
if annotations == nil {
return false
}

excludeAnnotation := fmt.Sprintf("exclude.release.openshift.io/%s", excludeIdentifier)
if annotations[excludeAnnotation] == "true" {
return true
}

if profile == "" {
return false
}

shouldCheckProfile := false
for k, _ := range annotations {
if strings.HasPrefix(k, "include.release.openshift.io/") {
shouldCheckProfile = true
break
}
}

if shouldCheckProfile {
profileAnnotation := fmt.Sprintf("include.release.openshift.io/%s", profile)
if val, ok := annotations[profileAnnotation]; ok && val == "true" {
return false
}
return true
}

return profile != "default"
}

// ValidateDirectory checks if a directory can be a candidate update by
Expand Down
73 changes: 72 additions & 1 deletion pkg/payload/payload_test.go
Expand Up @@ -104,7 +104,7 @@ func Test_loadUpdatePayload(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := LoadUpdate(tt.args.dir, tt.args.releaseImage, "exclude-test")
got, err := LoadUpdate(tt.args.dir, tt.args.releaseImage, "exclude-test", "default")
if (err != nil) != tt.wantErr {
t.Errorf("loadUpdatePayload() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -123,3 +123,74 @@ func mustRead(path string) []byte {
}
return data
}

func TestExclude(t *testing.T) {
tests := []struct {
exclude string
profile string
annotations map[string]interface{}

isExcluded bool
}{
{
annotations: map[string]interface{}{},
},
{
exclude: "",
annotations: map[string]interface{}{"exclude.release.openshift.io/identifier": "true"},
},
{
exclude: "identifier",
annotations: map[string]interface{}{"exclude.release.openshift.io/identifier": "true"},
isExcluded: true,
},
{
profile: "",
annotations: map[string]interface{}{"include.release.openshift.io/edge": "true"},
},
{
profile: "default",
annotations: map[string]interface{}{"include.release.openshift.io/edge": "true"},
isExcluded: true,
},
{
profile: "edge",
annotations: map[string]interface{}{"include.release.openshift.io/edge": "true"},
},
{
profile: "",
annotations: map[string]interface{}{"include.release.openshift.io/default": "true"},
},
{
profile: "default",
annotations: map[string]interface{}{"include.release.openshift.io/default": "true"},
},
{
profile: "edge",
annotations: map[string]interface{}{"include.release.openshift.io/default": "true"},
isExcluded: true,
},
{
profile: "default",
},
{
profile: "edge",
isExcluded: true,
},
}
for _, tt := range tests {
ret := shouldExclude(tt.exclude, tt.profile, &lib.Manifest{
Obj: &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": tt.annotations,
},
},
},
})
if ret != tt.isExcluded {
t.Errorf("(exclude: %v, profile: %v, annotations: %v) %v != %v", tt.exclude, tt.profile, tt.annotations, tt.isExcluded, ret)
}
}

}
2 changes: 1 addition & 1 deletion pkg/payload/task_graph_test.go
Expand Up @@ -487,7 +487,7 @@ func Test_TaskGraph_real(t *testing.T) {
if len(path) == 0 {
t.Skip("TEST_GRAPH_PATH unset")
}
p, err := LoadUpdate(path, "arbitrary/image:1", "")
p, err := LoadUpdate(path, "arbitrary/image:1", "", "")
if err != nil {
t.Fatal(err)
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/start/start_integration_test.go
Expand Up @@ -241,7 +241,7 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) {
options.EnableMetrics = false
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "")
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", &fakeClusterProfileRetriever{})
controllers.CVO.SetSyncWorkerForTesting(worker)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) {
options.ResyncInterval = 3 * time.Second
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}, "")
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}, "", &fakeClusterProfileRetriever{})
controllers.CVO.SetSyncWorkerForTesting(worker)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -500,7 +500,7 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) {
options.EnableMetrics = false
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "")
worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", &fakeClusterProfileRetriever{})
controllers.CVO.SetSyncWorkerForTesting(worker)

lock, err := createResourceLock(cb, ns, ns)
Expand Down Expand Up @@ -673,7 +673,7 @@ metadata:
t.Fatal(err)
}

worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "")
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", &fakeClusterProfileRetriever{})
controllers.CVO.SetSyncWorkerForTesting(worker)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1119,3 +1119,9 @@ func (r *mapPayloadRetriever) RetrievePayload(ctx context.Context, update config
Directory: path,
}, nil
}

type fakeClusterProfileRetriever struct{}

func (fakeClusterProfileRetriever) RetrieveClusterProfile(ctx context.Context) (string, error) {
return "default", nil
}

0 comments on commit 93eeecf

Please sign in to comment.