diff --git a/pkg/apis/feature/features.go b/pkg/apis/feature/features.go index a93a9a8d105..313f4dbcee5 100644 --- a/pkg/apis/feature/features.go +++ b/pkg/apis/feature/features.go @@ -102,6 +102,25 @@ func (e Flags) String() string { return fmt.Sprintf("%+v", map[string]Flag(e)) } +func (e Flags) NodeSelector() map[string]string { + // Check if NodeSelector is not nil + if e == nil { + return map[string]string{} + } + + nodeSelectorMap := make(map[string]string) + + for k, v := range e { + if strings.Contains(k, NodeSelectorLabel) { + key := strings.TrimPrefix(k, NodeSelectorLabel) + value := strings.TrimSpace(string(v)) + nodeSelectorMap[key] = value + } + } + + return nodeSelectorMap +} + // NewFlagsConfigFromMap creates a Flags from the supplied Map func NewFlagsConfigFromMap(data map[string]string) (Flags, error) { flags := newDefaults() @@ -122,6 +141,8 @@ func NewFlagsConfigFromMap(data map[string]string) (Flags, error) { flags[sanitizedKey] = Permissive } else if k == TransportEncryption && strings.EqualFold(v, string(Strict)) { flags[sanitizedKey] = Strict + } else if strings.Contains(k, NodeSelectorLabel) { + flags[sanitizedKey] = Flag(v) } else { return flags, fmt.Errorf("cannot parse the feature flag '%s' = '%s'", k, v) } diff --git a/pkg/apis/feature/features_test.go b/pkg/apis/feature/features_test.go index c5d2386f28b..9b599e5049f 100644 --- a/pkg/apis/feature/features_test.go +++ b/pkg/apis/feature/features_test.go @@ -56,10 +56,13 @@ func TestGetFlags(t *testing.T) { require.True(t, flags.IsAllowed("my-enabled-flag")) require.True(t, flags.IsAllowed("my-allowed-flag")) require.False(t, flags.IsAllowed("non-disabled-flag")) + + nodeSelector := flags.NodeSelector() + expectedNodeSelector := map[string]string{"testkey": "testvalue", "testkey1": "testvalue1", "testkey2": "testvalue2"} + require.Equal(t, expectedNodeSelector, nodeSelector) } func TestShouldNotOverrideDefaults(t *testing.T) { - f, err := NewFlagsConfigFromMap(map[string]string{}) require.Nil(t, err) require.NotNil(t, f) diff --git a/pkg/apis/feature/flag_names.go b/pkg/apis/feature/flag_names.go index 189c0d2b777..454cadc6f48 100644 --- a/pkg/apis/feature/flag_names.go +++ b/pkg/apis/feature/flag_names.go @@ -25,4 +25,5 @@ const ( TransportEncryption = "transport-encryption" EvenTypeAutoCreate = "eventtype-auto-create" OIDCAuthentication = "authentication-oidc" + NodeSelectorLabel = "apiserversources.nodeselector." ) diff --git a/pkg/apis/feature/testdata/config-features.yaml b/pkg/apis/feature/testdata/config-features.yaml index 565352a1cc9..31a747cf484 100644 --- a/pkg/apis/feature/testdata/config-features.yaml +++ b/pkg/apis/feature/testdata/config-features.yaml @@ -25,3 +25,6 @@ data: my-enabled-flag: "enabled" my-disabled-flag: "disabled" my-allowed-flag: "allowed" + apiserversources.nodeselector.testkey: testvalue + apiserversources.nodeselector.testkey1: testvalue1 + apiserversources.nodeselector.testkey2: testvalue2 diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index f6a95d3aa2f..a4051f378ab 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -102,7 +102,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour // no Namespace defined in dest.Ref, we will use the Namespace of the source // as the Namespace of dest.Ref. if dest.Ref.Namespace == "" { - //TODO how does this work with deprecated fields + // TODO how does this work with deprecated fields dest.Ref.Namespace = source.GetNamespace() } } @@ -118,7 +118,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour if featureFlags.IsOIDCAuthentication() { // Create the role err := r.createOIDCRole(ctx, source) - if err != nil { logging.FromContext(ctx).Errorw("Failed when creating the OIDC Role for ApiServerSource", zap.Error(err)) return err @@ -225,6 +224,8 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer // return nil, err // } + featureFlags := feature.FromContext(ctx) + adapterArgs := resources.ReceiveAdapterArgs{ Image: r.receiveAdapterImage, Source: src, @@ -235,6 +236,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer Configs: r.configs, Namespaces: namespaces, AllNamespaces: allNamespaces, + NodeSelector: featureFlags.NodeSelector(), } expected, err := resources.MakeReceiveAdapter(&adapterArgs) @@ -357,7 +359,6 @@ func (r *Reconciler) runAccessCheck(ctx context.Context, src *v1.ApiServerSource src.Status.MarkNoSufficientPermissions(lastReason, "User %s cannot %s", user, missing) return fmt.Errorf("insufficient permissions: User %s cannot %s", user, missing) - } func (r *Reconciler) createCloudEventAttributes(src *v1.ApiServerSource) ([]duckv1.CloudEventAttributes, error) { @@ -385,7 +386,6 @@ func (r *Reconciler) createOIDCRole(ctx context.Context, source *v1.ApiServerSou roleName := resources.GetOIDCTokenRoleName(source.Name) expected, err := resources.MakeOIDCRole(source) - if err != nil { return fmt.Errorf("Cannot create OIDC role for ApiServerSource %s/%s: %w", source.GetName(), source.GetNamespace(), err) } @@ -417,7 +417,6 @@ func (r *Reconciler) createOIDCRole(ctx context.Context, source *v1.ApiServerSou } return nil - } // createOIDCRoleBinding: this function will call resources package to get the rolebinding object diff --git a/pkg/reconciler/apiserversource/apiserversource_test.go b/pkg/reconciler/apiserversource/apiserversource_test.go index 30612fcd43e..d53dd96ea1e 100644 --- a/pkg/reconciler/apiserversource/apiserversource_test.go +++ b/pkg/reconciler/apiserversource/apiserversource_test.go @@ -1087,6 +1087,75 @@ func TestReconcile(t *testing.T) { }, WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. + }, { + Name: "Valid with nodeSelector", + + Ctx: feature.ToContext(context.Background(), feature.Flags{ + "apiserversources.nodeselector.testkey1": "testvalue1", + "apiserversources.nodeselector.testkey2": "testvalue2", + }), + Objects: []runtime.Object{ + rttestingv1.NewApiServerSource(sourceName, testNS, + rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{ + Resources: []sourcesv1.APIVersionKindSelector{{ + APIVersion: "v1", + Kind: "Namespace", + }}, + SourceSpec: duckv1.SourceSpec{Sink: sinkDest}, + }), + rttestingv1.WithApiServerSourceUID(sourceUID), + rttestingv1.WithApiServerSourceObjectMetaGeneration(generation), + ), + rttestingv1.NewChannel(sinkName, testNS, + rttestingv1.WithInitChannelConditions, + rttestingv1.WithChannelAddress(sinkAddressable), + ), + makeAvailableReceiveAdapter(t), + }, + Key: testNS + "/" + sourceName, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: rttestingv1.NewApiServerSource(sourceName, testNS, + rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{ + Resources: []sourcesv1.APIVersionKindSelector{{ + APIVersion: "v1", + Kind: "Namespace", + }}, + SourceSpec: duckv1.SourceSpec{Sink: sinkDest}, + }), + rttestingv1.WithApiServerSourceUID(sourceUID), + rttestingv1.WithApiServerSourceObjectMetaGeneration(generation), + // Status Update: + rttestingv1.WithInitApiServerSourceConditions, + rttestingv1.WithApiServerSourceDeployed, + rttestingv1.WithApiServerSourceSink(sinkURI), + rttestingv1.WithApiServerSourceSufficientPermissions, + rttestingv1.WithApiServerSourceReferenceModeEventTypes(source), + rttestingv1.WithApiServerSourceStatusObservedGeneration(generation), + rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}), + rttestingv1.WithApiServerSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), + ), + }}, + WantCreates: []runtime.Object{ + makeSubjectAccessReview("namespaces", "get", "default"), + makeSubjectAccessReview("namespaces", "list", "default"), + makeSubjectAccessReview("namespaces", "watch", "default"), + }, + + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeAvailableReceiveAdapterWithNodeSelector(t, map[string]string{ + "testkey1": "testvalue1", + "testkey2": "testvalue2", + }), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), + Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(sourceName, testNS), + }, + WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)}, + SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped. }, } @@ -1445,3 +1514,38 @@ func makeApiServerSourceOIDCServiceAccountWithoutOwnerRef() *corev1.ServiceAccou return sa } + +func makeAvailableReceiveAdapterWithNodeSelector(t *testing.T, selector map[string]string) *appsv1.Deployment { + t.Helper() + + src := rttestingv1.NewApiServerSource(sourceName, testNS, + rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{ + Resources: []sourcesv1.APIVersionKindSelector{{ + APIVersion: "v1", + Kind: "Namespace", + }}, + SourceSpec: duckv1.SourceSpec{Sink: sinkDest}, + }), + rttestingv1.WithApiServerSourceUID(sourceUID), + // Status Update: + rttestingv1.WithInitApiServerSourceConditions, + rttestingv1.WithApiServerSourceDeployed, + rttestingv1.WithApiServerSourceSink(sinkURI), + ) + + args := resources.ReceiveAdapterArgs{ + Image: image, + Source: src, + Labels: resources.Labels(sourceName), + SinkURI: sinkURI.String(), + Configs: &reconcilersource.EmptyVarsGenerator{}, + NodeSelector: selector, + Namespaces: []string{testNS}, + } + + ra, err := resources.MakeReceiveAdapter(&args) + require.NoError(t, err) + + rttesting.WithDeploymentAvailable()(ra) + return ra +} diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index 4f05730f2ca..6b3bfe0827b 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -48,6 +48,7 @@ type ReceiveAdapterArgs struct { Configs reconcilersource.ConfigAccessor Namespaces []string AllNamespaces bool + NodeSelector map[string]string } // MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for @@ -82,6 +83,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) (*appsv1.Deployment, error) { Labels: args.Labels, }, Spec: corev1.PodSpec{ + NodeSelector: args.NodeSelector, ServiceAccountName: args.Source.Spec.ServiceAccountName, EnableServiceLinks: ptr.Bool(false), Containers: []corev1.Container{ @@ -149,29 +151,30 @@ func makeEnv(args *ReceiveAdapterArgs) ([]corev1.EnvVar, error) { config = string(b) } - envs := []corev1.EnvVar{{ - Name: adapter.EnvConfigSink, - Value: args.SinkURI, - }, { - Name: "K_SOURCE_CONFIG", - Value: config, - }, { - Name: "SYSTEM_NAMESPACE", - Value: system.Namespace(), - }, { - Name: adapter.EnvConfigNamespace, - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", + envs := []corev1.EnvVar{ + { + Name: adapter.EnvConfigSink, + Value: args.SinkURI, + }, { + Name: "K_SOURCE_CONFIG", + Value: config, + }, { + Name: "SYSTEM_NAMESPACE", + Value: system.Namespace(), + }, { + Name: adapter.EnvConfigNamespace, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, }, + }, { + Name: adapter.EnvConfigName, + Value: args.Source.Name, + }, { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", }, - }, { - Name: adapter.EnvConfigName, - Value: args.Source.Name, - }, { - Name: "METRICS_DOMAIN", - Value: "knative.dev/eventing", - }, } if args.CACerts != nil {