Skip to content

Commit

Permalink
support: nodeselector in apiserversource (knative#7584)
Browse files Browse the repository at this point in the history
* support: nodeselector in apiserversource

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>

* fix

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>

* minorfix

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>

* multiple values check

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>

* test: nodeselector reconcile

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>

* undo format

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>

* lintfix

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>

* newline config-features

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>

* correct space

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>

---------

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
  • Loading branch information
sadath-12 committed Jan 24, 2024
1 parent cdb8638 commit ab47824
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 27 deletions.
21 changes: 21 additions & 0 deletions pkg/apis/feature/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,25 @@ func (e Flags) String() string {
return fmt.Sprintf("%+v", map[string]Flag(e))
}

func (e Flags) NodeSelector() map[string]string {
// Check if NodeSelector is not nil
if e == nil {
return map[string]string{}
}

nodeSelectorMap := make(map[string]string)

for k, v := range e {
if strings.Contains(k, NodeSelectorLabel) {
key := strings.TrimPrefix(k, NodeSelectorLabel)
value := strings.TrimSpace(string(v))
nodeSelectorMap[key] = value
}
}

return nodeSelectorMap
}

// NewFlagsConfigFromMap creates a Flags from the supplied Map
func NewFlagsConfigFromMap(data map[string]string) (Flags, error) {
flags := newDefaults()
Expand All @@ -122,6 +141,8 @@ func NewFlagsConfigFromMap(data map[string]string) (Flags, error) {
flags[sanitizedKey] = Permissive
} else if k == TransportEncryption && strings.EqualFold(v, string(Strict)) {
flags[sanitizedKey] = Strict
} else if strings.Contains(k, NodeSelectorLabel) {
flags[sanitizedKey] = Flag(v)
} else {
return flags, fmt.Errorf("cannot parse the feature flag '%s' = '%s'", k, v)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/feature/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ func TestGetFlags(t *testing.T) {
require.True(t, flags.IsAllowed("my-enabled-flag"))
require.True(t, flags.IsAllowed("my-allowed-flag"))
require.False(t, flags.IsAllowed("non-disabled-flag"))

nodeSelector := flags.NodeSelector()
expectedNodeSelector := map[string]string{"testkey": "testvalue", "testkey1": "testvalue1", "testkey2": "testvalue2"}
require.Equal(t, expectedNodeSelector, nodeSelector)
}

func TestShouldNotOverrideDefaults(t *testing.T) {

f, err := NewFlagsConfigFromMap(map[string]string{})
require.Nil(t, err)
require.NotNil(t, f)
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/feature/flag_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ const (
TransportEncryption = "transport-encryption"
EvenTypeAutoCreate = "eventtype-auto-create"
OIDCAuthentication = "authentication-oidc"
NodeSelectorLabel = "apiserversources.nodeselector."
)
3 changes: 3 additions & 0 deletions pkg/apis/feature/testdata/config-features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ data:
my-enabled-flag: "enabled"
my-disabled-flag: "disabled"
my-allowed-flag: "allowed"
apiserversources.nodeselector.testkey: testvalue
apiserversources.nodeselector.testkey1: testvalue1
apiserversources.nodeselector.testkey2: testvalue2
9 changes: 4 additions & 5 deletions pkg/reconciler/apiserversource/apiserversource.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour
// no Namespace defined in dest.Ref, we will use the Namespace of the source
// as the Namespace of dest.Ref.
if dest.Ref.Namespace == "" {
//TODO how does this work with deprecated fields
// TODO how does this work with deprecated fields
dest.Ref.Namespace = source.GetNamespace()
}
}
Expand All @@ -118,7 +118,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour
if featureFlags.IsOIDCAuthentication() {
// Create the role
err := r.createOIDCRole(ctx, source)

if err != nil {
logging.FromContext(ctx).Errorw("Failed when creating the OIDC Role for ApiServerSource", zap.Error(err))
return err
Expand Down Expand Up @@ -225,6 +224,8 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer
// return nil, err
// }

featureFlags := feature.FromContext(ctx)

adapterArgs := resources.ReceiveAdapterArgs{
Image: r.receiveAdapterImage,
Source: src,
Expand All @@ -235,6 +236,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer
Configs: r.configs,
Namespaces: namespaces,
AllNamespaces: allNamespaces,
NodeSelector: featureFlags.NodeSelector(),
}

expected, err := resources.MakeReceiveAdapter(&adapterArgs)
Expand Down Expand Up @@ -357,7 +359,6 @@ func (r *Reconciler) runAccessCheck(ctx context.Context, src *v1.ApiServerSource

src.Status.MarkNoSufficientPermissions(lastReason, "User %s cannot %s", user, missing)
return fmt.Errorf("insufficient permissions: User %s cannot %s", user, missing)

}

func (r *Reconciler) createCloudEventAttributes(src *v1.ApiServerSource) ([]duckv1.CloudEventAttributes, error) {
Expand Down Expand Up @@ -385,7 +386,6 @@ func (r *Reconciler) createOIDCRole(ctx context.Context, source *v1.ApiServerSou
roleName := resources.GetOIDCTokenRoleName(source.Name)

expected, err := resources.MakeOIDCRole(source)

if err != nil {
return fmt.Errorf("Cannot create OIDC role for ApiServerSource %s/%s: %w", source.GetName(), source.GetNamespace(), err)
}
Expand Down Expand Up @@ -417,7 +417,6 @@ func (r *Reconciler) createOIDCRole(ctx context.Context, source *v1.ApiServerSou
}

return nil

}

// createOIDCRoleBinding: this function will call resources package to get the rolebinding object
Expand Down
104 changes: 104 additions & 0 deletions pkg/reconciler/apiserversource/apiserversource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,75 @@ func TestReconcile(t *testing.T) {
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Name: "Valid with nodeSelector",

Ctx: feature.ToContext(context.Background(), feature.Flags{
"apiserversources.nodeselector.testkey1": "testvalue1",
"apiserversources.nodeselector.testkey2": "testvalue2",
}),
Objects: []runtime.Object{
rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Resources: []sourcesv1.APIVersionKindSelector{{
APIVersion: "v1",
Kind: "Namespace",
}},
SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
}),
rttestingv1.WithApiServerSourceUID(sourceUID),
rttestingv1.WithApiServerSourceObjectMetaGeneration(generation),
),
rttestingv1.NewChannel(sinkName, testNS,
rttestingv1.WithInitChannelConditions,
rttestingv1.WithChannelAddress(sinkAddressable),
),
makeAvailableReceiveAdapter(t),
},
Key: testNS + "/" + sourceName,
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Resources: []sourcesv1.APIVersionKindSelector{{
APIVersion: "v1",
Kind: "Namespace",
}},
SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
}),
rttestingv1.WithApiServerSourceUID(sourceUID),
rttestingv1.WithApiServerSourceObjectMetaGeneration(generation),
// Status Update:
rttestingv1.WithInitApiServerSourceConditions,
rttestingv1.WithApiServerSourceDeployed,
rttestingv1.WithApiServerSourceSink(sinkURI),
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
rttestingv1.WithApiServerSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(),
),
}},
WantCreates: []runtime.Object{
makeSubjectAccessReview("namespaces", "get", "default"),
makeSubjectAccessReview("namespaces", "list", "default"),
makeSubjectAccessReview("namespaces", "watch", "default"),
},

WantUpdates: []clientgotesting.UpdateActionImpl{{
Object: makeAvailableReceiveAdapterWithNodeSelector(t, map[string]string{
"testkey1": "testvalue1",
"testkey2": "testvalue2",
}),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
},
}

Expand Down Expand Up @@ -1445,3 +1514,38 @@ func makeApiServerSourceOIDCServiceAccountWithoutOwnerRef() *corev1.ServiceAccou

return sa
}

func makeAvailableReceiveAdapterWithNodeSelector(t *testing.T, selector map[string]string) *appsv1.Deployment {
t.Helper()

src := rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Resources: []sourcesv1.APIVersionKindSelector{{
APIVersion: "v1",
Kind: "Namespace",
}},
SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
}),
rttestingv1.WithApiServerSourceUID(sourceUID),
// Status Update:
rttestingv1.WithInitApiServerSourceConditions,
rttestingv1.WithApiServerSourceDeployed,
rttestingv1.WithApiServerSourceSink(sinkURI),
)

args := resources.ReceiveAdapterArgs{
Image: image,
Source: src,
Labels: resources.Labels(sourceName),
SinkURI: sinkURI.String(),
Configs: &reconcilersource.EmptyVarsGenerator{},
NodeSelector: selector,
Namespaces: []string{testNS},
}

ra, err := resources.MakeReceiveAdapter(&args)
require.NoError(t, err)

rttesting.WithDeploymentAvailable()(ra)
return ra
}
45 changes: 24 additions & 21 deletions pkg/reconciler/apiserversource/resources/receive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type ReceiveAdapterArgs struct {
Configs reconcilersource.ConfigAccessor
Namespaces []string
AllNamespaces bool
NodeSelector map[string]string
}

// MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for
Expand Down Expand Up @@ -82,6 +83,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) (*appsv1.Deployment, error) {
Labels: args.Labels,
},
Spec: corev1.PodSpec{
NodeSelector: args.NodeSelector,
ServiceAccountName: args.Source.Spec.ServiceAccountName,
EnableServiceLinks: ptr.Bool(false),
Containers: []corev1.Container{
Expand Down Expand Up @@ -149,29 +151,30 @@ func makeEnv(args *ReceiveAdapterArgs) ([]corev1.EnvVar, error) {
config = string(b)
}

envs := []corev1.EnvVar{{
Name: adapter.EnvConfigSink,
Value: args.SinkURI,
}, {
Name: "K_SOURCE_CONFIG",
Value: config,
}, {
Name: "SYSTEM_NAMESPACE",
Value: system.Namespace(),
}, {
Name: adapter.EnvConfigNamespace,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
envs := []corev1.EnvVar{
{
Name: adapter.EnvConfigSink,
Value: args.SinkURI,
}, {
Name: "K_SOURCE_CONFIG",
Value: config,
}, {
Name: "SYSTEM_NAMESPACE",
Value: system.Namespace(),
}, {
Name: adapter.EnvConfigNamespace,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
}, {
Name: adapter.EnvConfigName,
Value: args.Source.Name,
}, {
Name: "METRICS_DOMAIN",
Value: "knative.dev/eventing",
},
}, {
Name: adapter.EnvConfigName,
Value: args.Source.Name,
}, {
Name: "METRICS_DOMAIN",
Value: "knative.dev/eventing",
},
}

if args.CACerts != nil {
Expand Down

0 comments on commit ab47824

Please sign in to comment.