diff --git a/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml b/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml index f14c64d2ec..ba1d7ea832 100644 --- a/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml +++ b/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml @@ -155,8 +155,28 @@ spec: required: - url properties: + name: + type: string url: type: string + CACerts: + type: string + audience: + type: string + addresses: + description: Kafka Sink is Addressable. It exposes the endpoints as URIs to get events delivered into the Kafka topic. + type: array + items: + type: object + properties: + name: + type: string + url: + type: string + CACerts: + type: string + audience: + type: string annotations: description: Annotations is additional Status fields for the Resource to save some additional State as well as convey more information to the user. This is roughly akin to Annotations on any k8s resource, just the reconciler conveying richer information outwards. type: object diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index 9952cd10c9..a638cb1993 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -319,7 +319,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta } httpAddress := receiver.ChannelHTTPAddress(channelHttpHost) - httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channelService, caCerts) + httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channel, caCerts) // Permissive mode: // - status.address http address with path-based routing // - status.addresses: @@ -337,7 +337,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta return err } - httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channelService, caCerts) + httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channel, caCerts) addressableStatus.Addresses = []duckv1.Addressable{httpsAddress} addressableStatus.Address = &httpsAddress } else { @@ -672,6 +672,7 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin Ingress: &contract.Ingress{ Host: receiver.Host(channel.GetNamespace(), channel.GetName()), EnableAutoCreateEventTypes: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate), + Path: receiver.Path(channel.GetNamespace(), channel.GetName()), }, BootstrapServers: config.GetBootstrapServers(), Reference: &contract.Reference{ diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 6e72a56b39..9a4c901dcd 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -188,6 +188,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -262,6 +263,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, EgressConfig: &contract.EgressConfig{ @@ -338,6 +340,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -408,6 +411,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -482,6 +486,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{}, @@ -554,6 +559,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -631,6 +637,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -713,6 +720,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -795,6 +803,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -883,6 +892,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -1173,6 +1183,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -1270,6 +1281,7 @@ func TestReconcileKind(t *testing.T) { }, }, Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -1376,6 +1388,7 @@ func TestReconcileKind(t *testing.T) { }, }, Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -1479,6 +1492,7 @@ func TestReconcileKind(t *testing.T) { }, }, Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, Egresses: []*contract.Egress{{ @@ -1557,6 +1571,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -1626,6 +1641,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -1738,6 +1754,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, @@ -1823,6 +1840,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, EgressConfig: &contract.EgressConfig{ @@ -1864,7 +1882,7 @@ func TestReconcileKind(t *testing.T) { WithChannelAddresses([]duckv1.Addressable{ { Name: pointer.String("https"), - URL: httpsURL(ChannelServiceName, ChannelNamespace), + URL: httpsURL(ChannelName, ChannelNamespace), CACerts: pointer.String(testCaCerts), }, { @@ -1925,6 +1943,7 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, EgressConfig: &contract.EgressConfig{ @@ -1966,13 +1985,13 @@ func TestReconcileKind(t *testing.T) { WithChannelAddresses([]duckv1.Addressable{ { Name: pointer.String("https"), - URL: httpsURL(ChannelServiceName, ChannelNamespace), + URL: httpsURL(ChannelName, ChannelNamespace), CACerts: pointer.String(testCaCerts), }, }), WithChannelAddress(duckv1.Addressable{ Name: pointer.String("https"), - URL: httpsURL(ChannelServiceName, ChannelNamespace), + URL: httpsURL(ChannelName, ChannelNamespace), CACerts: pointer.String(testCaCerts), }), WithChannelAddessable(), @@ -2014,6 +2033,7 @@ func TestFinalizeKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), Host: receiver.Host(ChannelNamespace, ChannelName), }, }, diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index 1a38d96ae3..2f801fb32b 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -81,6 +81,9 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf logger := logging.FromContext(ctx) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + featureStore.WatchConfigs(watcher) + _, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx) if err != nil { logger.Fatal("Failed to get or create data plane config map", @@ -96,7 +99,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err)) } - impl := kafkachannelreconciler.NewImpl(ctx, reconciler) + impl := kafkachannelreconciler.NewImpl(ctx, reconciler, + func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: featureStore, + } + }) IPsLister := prober.IdentityIPsLister() reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace) reconciler.Prober, err = prober.NewComposite(ctx, "", "", IPsLister, impl.EnqueueKey, &caCerts) diff --git a/control-plane/pkg/reconciler/channel/controller_test.go b/control-plane/pkg/reconciler/channel/controller_test.go index 297c9b352e..5325324b9a 100644 --- a/control-plane/pkg/reconciler/channel/controller_test.go +++ b/control-plane/pkg/reconciler/channel/controller_test.go @@ -86,8 +86,12 @@ func TestNewController(t *testing.T) { configmap.NewStaticWatcher(&corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: apisconfig.FlagsConfigName, + }}, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-features", }, }), + configs, ) if controller == nil { diff --git a/control-plane/pkg/reconciler/channel/resources/service.go b/control-plane/pkg/reconciler/channel/resources/service.go index 6b9d1410e2..002b2080e9 100644 --- a/control-plane/pkg/reconciler/channel/resources/service.go +++ b/control-plane/pkg/reconciler/channel/resources/service.go @@ -28,8 +28,12 @@ import ( ) const ( - portName = "http" - portNumber = 80 + portName = "http" + portNumber = 80 + + tlsPortName = "https" + tlsPortNumber = 443 + MessagingRoleLabel = "messaging.knative.dev/role" MessagingRole = "kafka-channel" @@ -86,6 +90,11 @@ func MakeK8sService(kc *v1beta1.KafkaChannel, opts ...ServiceOption) (*corev1.Se Protocol: corev1.ProtocolTCP, Port: portNumber, }, + { + Name: tlsPortName, + Protocol: corev1.ProtocolTCP, + Port: tlsPortNumber, + }, }, }, } diff --git a/control-plane/pkg/reconciler/testing/objects_channel.go b/control-plane/pkg/reconciler/testing/objects_channel.go index 026f6220d9..b59b6c1875 100644 --- a/control-plane/pkg/reconciler/testing/objects_channel.go +++ b/control-plane/pkg/reconciler/testing/objects_channel.go @@ -393,6 +393,11 @@ func NewPerChannelService(env *config.Env) *corev1.Service { Protocol: corev1.ProtocolTCP, Port: 80, }, + { + Name: "https", + Protocol: corev1.ProtocolTCP, + Port: 443, + }, }, }, } diff --git a/test/e2e_new/channel_eventing_tls_test.go b/test/e2e_new/channel_eventing_tls_test.go new file mode 100644 index 0000000000..4e0440f4a3 --- /dev/null +++ b/test/e2e_new/channel_eventing_tls_test.go @@ -0,0 +1,48 @@ +//go:build e2e +// +build e2e + +/* + * Copyright 2023 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e_new + +import ( + "testing" + "time" + + "knative.dev/eventing-kafka-broker/test/rekt/features" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" +) + +func TestChannelTLSCARotation(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + eventshub.WithTLS(t), + environment.WithPollTimings(5*time.Second, 4*time.Minute), + ) + + env.Test(ctx, t, features.RotateChannelTLSCertificates()) +} diff --git a/test/rekt/features/channel_tls.go b/test/rekt/features/channel_tls.go new file mode 100644 index 0000000000..d8cb4f7514 --- /dev/null +++ b/test/rekt/features/channel_tls.go @@ -0,0 +1,105 @@ +/* + * Copyright 2023 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package features + +import ( + "context" + "time" + + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + "k8s.io/apimachinery/pkg/types" + "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkachannel" + "knative.dev/eventing/test/rekt/features/featureflags" + "knative.dev/eventing/test/rekt/resources/addressable" + "knative.dev/eventing/test/rekt/resources/subscription" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/resources/service" + "knative.dev/reconciler-test/resources/certificate" +) + +func RotateChannelTLSCertificates() *feature.Feature { + // + ingressCertificateName := "kafka-channel-ingress-server-tls" + ingressSecretName := "kafka-channel-ingress-server-tls" + + channelName := feature.MakeRandomK8sName("channel") + subscriptionName := feature.MakeRandomK8sName("subscription") + sink := feature.MakeRandomK8sName("sink") + source := feature.MakeRandomK8sName("source") + + f := feature.NewFeatureNamed("Rotate Kafka Channel TLS certificate") + + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + f.Setup("Rotate ingress certificate", certificate.Rotate(certificate.RotateCertificate{ + Certificate: types.NamespacedName{ + Namespace: system.Namespace(), + Name: ingressCertificateName, + }, + })) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiverTLS)) + f.Setup("install channel", kafkachannel.Install(channelName, + kafkachannel.WithNumPartitions("3"), + kafkachannel.WithReplicationFactor("1"), + kafkachannel.WithRetentionDuration("P1D"), + )) + f.Setup("channel is ready", kafkachannel.IsReady(channelName)) + + f.Setup("install subscription", func(ctx context.Context, t feature.T) { + d := service.AsDestinationRef(sink) + subscription.Install(subscriptionName, + subscription.WithChannel(&duckv1.KReference{ + Kind: "KafkaChannel", + Name: channelName, + APIVersion: kafkachannel.GVR().GroupVersion().String(), + }), + subscription.WithSubscriberFromDestination(d))(ctx, t) + }) + + f.Setup("subscription is ready", subscription.IsReady(subscriptionName)) + + f.Setup("Channel has HTTPS address", kafkachannel.ValidateAddress(channelName, addressable.AssertHTTPSAddress)) + + event := cetest.FullEvent() + event.SetID(uuid.New().String()) + + f.Requirement("install source", eventshub.Install(source, + eventshub.StartSenderToResourceTLS(kafkachannel.GVR(), channelName, nil), + eventshub.InputEvent(event), + // Send multiple events so that we take into account that the certificate rotation might + // be detected by the server after some time. + eventshub.SendMultipleEvents(100, 3*time.Second), + )) + + f.Assert("Event sent", assert.OnStore(source). + MatchSentEvent(cetest.HasId(event.ID())). + AtLeast(1), + ) + f.Assert("Source match updated peer certificate", assert.OnStore(source). + MatchPeerCertificatesReceived(assert.MatchPeerCertificatesFromSecret(system.Namespace(), ingressSecretName, "tls.crt")). + AtLeast(1), + ) + + return f +} diff --git a/test/rekt/resources/kafkachannel/kafkachannel.go b/test/rekt/resources/kafkachannel/kafkachannel.go index 5ec67d4537..e49419946c 100644 --- a/test/rekt/resources/kafkachannel/kafkachannel.go +++ b/test/rekt/resources/kafkachannel/kafkachannel.go @@ -21,6 +21,9 @@ import ( "embed" "time" + "knative.dev/eventing/test/rekt/resources/addressable" + duckv1 "knative.dev/pkg/apis/duck/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" @@ -34,6 +37,17 @@ func GVR() schema.GroupVersionResource { return schema.GroupVersionResource{Group: "messaging.knative.dev", Version: "v1beta1", Resource: "kafkachannels"} } +func GVK() schema.GroupVersionKind { + return schema.ParseGroupKind(EnvCfg.ChannelGK).WithVersion(EnvCfg.ChannelV) +} + +var EnvCfg EnvConfig + +type EnvConfig struct { + ChannelGK string `envconfig:"CHANNEL_GROUP_KIND" default:"KafkaChannel.messaging.knative.dev" required:"true"` + ChannelV string `envconfig:"CHANNEL_VERSION" default:"v1beta1" required:"true"` +} + // Install will create a KafkaChannel resource, using the latest version, augmented with the config fn options. func Install(name string, opts ...manifest.CfgFn) feature.StepFn { cfg := map[string]interface{}{ @@ -84,3 +98,43 @@ func WithRetentionDuration(retentionDuration string) manifest.CfgFn { cfg["retentionDuration"] = retentionDuration } } + +// AsRef returns a KRef for a Channel without namespace. +func AsRef(name string) *duckv1.KReference { + return &duckv1.KReference{ + Kind: EnvCfg.ChannelGK, + APIVersion: EnvCfg.ChannelV, + Name: name, + } +} + +// AsRef returns a KRef for a Channel without namespace. +func AsDestinationRef(name string) *duckv1.Destination { + return &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: EnvCfg.ChannelGK, + APIVersion: EnvCfg.ChannelV, + Name: name, + }, + } +} + +// Address returns a Channel's address. +func Address(ctx context.Context, name string, timings ...time.Duration) (*duckv1.Addressable, error) { + return addressable.Address(ctx, GVR(), name, timings...) +} + +// ValidateAddress validates the address retured by Address +func ValidateAddress(name string, validate addressable.ValidateAddressFn, timings ...time.Duration) feature.StepFn { + return func(ctx context.Context, t feature.T) { + addr, err := Address(ctx, name, timings...) + if err != nil { + t.Error(err) + return + } + if err := validate(addr); err != nil { + t.Error(err) + return + } + } +}