From 57c4d1e85c2fb9468724df2cf111e11febcff3d3 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 23 Oct 2023 14:37:49 -0400 Subject: [PATCH] Made the async prober private (#3421) Signed-off-by: Calum Murray --- control-plane/pkg/prober/async_prober.go | 10 +++--- control-plane/pkg/prober/async_prober_test.go | 24 ++++++------- control-plane/pkg/prober/composite_prober.go | 14 ++++---- .../pkg/prober/composite_prober_test.go | 20 +++++------ control-plane/pkg/prober/prober.go | 34 +++++++++---------- control-plane/pkg/prober/prober_test.go | 6 ++-- .../pkg/prober/probertesting/mock_prober.go | 9 +---- .../prober/probertesting/mock_prober_test.go | 10 +++--- control-plane/pkg/reconciler/broker/broker.go | 4 +-- .../pkg/reconciler/channel/channel.go | 4 +-- .../pkg/reconciler/channel/v2/channelv2.go | 4 +-- .../pkg/reconciler/sink/kafka_sink.go | 4 +-- 12 files changed, 68 insertions(+), 75 deletions(-) diff --git a/control-plane/pkg/prober/async_prober.go b/control-plane/pkg/prober/async_prober.go index 929226a2cb..c4c69a9f99 100644 --- a/control-plane/pkg/prober/async_prober.go +++ b/control-plane/pkg/prober/async_prober.go @@ -32,7 +32,7 @@ var ( cacheExpiryTime = time.Minute * 30 ) -type IPsLister func(addressable Addressable) ([]string, error) +type IPsLister func(addressable proberAddressable) ([]string, error) type asyncProber struct { client httpClient @@ -47,7 +47,7 @@ type asyncProber struct { // NewAsync creates an async Prober. // // It reports status changes using the provided EnqueueFunc. -func NewAsync(ctx context.Context, client httpClient, port string, IPsLister IPsLister, enqueue EnqueueFunc) Prober { +func NewAsync(ctx context.Context, client httpClient, port string, IPsLister IPsLister, enqueue EnqueueFunc) prober { logger := logging.FromContext(ctx).Desugar(). With(zap.String("scope", "prober")) @@ -65,7 +65,7 @@ func NewAsync(ctx context.Context, client httpClient, port string, IPsLister IPs } } -func NewAsyncWithTLS(ctx context.Context, port string, IPsLister IPsLister, enqueue EnqueueFunc, caCerts *string) (Prober, error) { +func NewAsyncWithTLS(ctx context.Context, port string, IPsLister IPsLister, enqueue EnqueueFunc, caCerts *string) (prober, error) { newClient, err := makeHttpClientWithTLS(caCerts) if err != nil { return nil, err @@ -73,7 +73,7 @@ func NewAsyncWithTLS(ctx context.Context, port string, IPsLister IPsLister, enqu return NewAsync(ctx, newClient, port, IPsLister, enqueue), nil } -func (a *asyncProber) Probe(ctx context.Context, addressable Addressable, expected Status) Status { +func (a *asyncProber) probe(ctx context.Context, addressable proberAddressable, expected Status) Status { address := addressable.Address IPs, err := a.IPsLister(addressable) if err != nil { @@ -165,7 +165,7 @@ func (a *asyncProber) enqueueArg(_ string, arg interface{}) { a.enqueue(arg.(types.NamespacedName)) } -func (a *asyncProber) RotateRootCaCerts(caCerts *string) error { +func (a *asyncProber) rotateRootCaCerts(caCerts *string) error { newClient, err := makeHttpClientWithTLS(caCerts) if err != nil { return err diff --git a/control-plane/pkg/prober/async_prober_test.go b/control-plane/pkg/prober/async_prober_test.go index f0486edf15..9eb32f88d5 100644 --- a/control-plane/pkg/prober/async_prober_test.go +++ b/control-plane/pkg/prober/async_prober_test.go @@ -59,7 +59,7 @@ func TestAsyncProber(t *testing.T) { name string pods []*corev1.Pod podsLabelsSelector labels.Selector - addressable Addressable + addressable proberAddressable responseStatusCode int wantStatus Status wantRequeueCountMin int @@ -70,7 +70,7 @@ func TestAsyncProber(t *testing.T) { name: "no pods", pods: []*corev1.Pod{}, podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}), - addressable: Addressable{ + addressable: proberAddressable{ Address: &url.URL{Scheme: "http", Path: "/b1/b1"}, ResourceKey: types.NamespacedName{Namespace: "b1", Name: "b1"}, }, @@ -92,7 +92,7 @@ func TestAsyncProber(t *testing.T) { }, }, podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}), - addressable: Addressable{ + addressable: proberAddressable{ Address: &url.URL{Scheme: "http", Path: "/b1/b1"}, ResourceKey: types.NamespacedName{Namespace: "b1", Name: "b1"}, }, @@ -115,7 +115,7 @@ func TestAsyncProber(t *testing.T) { }, }, podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}), - addressable: Addressable{ + addressable: proberAddressable{ Address: &url.URL{Scheme: "http", Path: "/b1/b1"}, ResourceKey: types.NamespacedName{Namespace: "b1", Name: "b1"}, }, @@ -138,7 +138,7 @@ func TestAsyncProber(t *testing.T) { }, }, podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}), - addressable: Addressable{ + addressable: proberAddressable{ Address: &url.URL{Scheme: "https", Path: "/b1/b1"}, ResourceKey: types.NamespacedName{Namespace: "b1", Name: "b1"}, }, @@ -185,7 +185,7 @@ func TestAsyncProber(t *testing.T) { u, _ := url.Parse(s.URL) wantRequeueCountMin := atomic.NewInt64(int64(tc.wantRequeueCountMin)) - var IPsLister IPsLister = func(addressable Addressable) ([]string, error) { + var IPsLister IPsLister = func(addressable proberAddressable) ([]string, error) { pods, err := podinformer.Get(ctx).Lister().List(tc.podsLabelsSelector) if err != nil { return nil, err @@ -196,7 +196,7 @@ func TestAsyncProber(t *testing.T) { } return ips, nil } - var prober Prober + var prober prober var err error if tc.useTLS { prober, err = NewAsyncWithTLS(ctx, u.Port(), IPsLister, func(key types.NamespacedName) { @@ -210,7 +210,7 @@ func TestAsyncProber(t *testing.T) { } probeFunc := func() bool { - status := prober.Probe(ctx, tc.addressable, tc.wantStatus) + status := prober.probe(ctx, tc.addressable, tc.wantStatus) return status == tc.wantStatus } @@ -258,7 +258,7 @@ func TestAsyncProberRotateCACerts(t *testing.T) { u, err := url.Parse(addrString) require.NoError(t, err) - addressable := Addressable{ + addressable := proberAddressable{ Address: &url.URL{Scheme: "https", Path: "/b1/b1", Host: addrString}, ResourceKey: types.NamespacedName{Namespace: "b1", Name: "b1"}, } @@ -273,7 +273,7 @@ func TestAsyncProberRotateCACerts(t *testing.T) { } podinformer.Get(ctx).Informer().GetStore().Add(pod) labelSelector := labels.SelectorFromSet(map[string]string{"app": "p"}) - var IPsLister IPsLister = func(addressable Addressable) ([]string, error) { + var IPsLister IPsLister = func(addressable proberAddressable) ([]string, error) { pods, err := podinformer.Get(ctx).Lister().List(labelSelector) if err != nil { return nil, err @@ -292,7 +292,7 @@ func TestAsyncProberRotateCACerts(t *testing.T) { require.NoError(t, err) probeFunc := func() bool { - status := prober.Probe(ctx, addressable, wantStatus) + status := prober.probe(ctx, addressable, wantStatus) return status == wantStatus } @@ -302,7 +302,7 @@ func TestAsyncProberRotateCACerts(t *testing.T) { require.Eventuallyf(tt, func() bool { return wantRequeueCountMin.Load() == 1 }, 5*time.Second, 250*time.Millisecond, "got %d, want 1", wantRequeueCountMin.Load()) }) t.Run("one pod - TLS certs after rotation", func(tt *testing.T) { - prober.RotateRootCaCerts(pointer.String(string(CA2))) + prober.rotateRootCaCerts(pointer.String(string(CA2))) s.TLSConfig.GetCertificate = func(chi *tls.ClientHelloInfo) (*tls.Certificate, error) { cert, err := tls.X509KeyPair(Crt2, Key2) return &cert, err diff --git a/control-plane/pkg/prober/composite_prober.go b/control-plane/pkg/prober/composite_prober.go index 8c51b21bf7..9167f98299 100644 --- a/control-plane/pkg/prober/composite_prober.go +++ b/control-plane/pkg/prober/composite_prober.go @@ -26,8 +26,8 @@ var ( ) type compositeProber struct { - httpProber Prober - httpsProber Prober + httpProber prober + httpsProber prober } // NewComposite creates a composite prober. @@ -50,17 +50,17 @@ func NewCompositeNoTLS(ctx context.Context, httpPort string, IPsLister IPsLister return NewComposite(ctx, httpPort, "443", IPsLister, enqueue, &emptyCaCerts) } -func (c *compositeProber) Probe(ctx context.Context, addressable NewAddressable, expected Status) Status { +func (c *compositeProber) Probe(ctx context.Context, addressable ProberAddressable, expected Status) Status { var status Status for _, addr := range addressable.AddressStatus.Addresses { - oldAddressable := Addressable{ + oldAddressable := proberAddressable{ ResourceKey: addressable.ResourceKey, Address: addr.URL.URL(), } if addr.URL.Scheme == "https" { - status = c.httpsProber.Probe(ctx, oldAddressable, expected) + status = c.httpsProber.probe(ctx, oldAddressable, expected) } else if addr.URL.Scheme == "http" { - status = c.httpProber.Probe(ctx, oldAddressable, expected) + status = c.httpProber.probe(ctx, oldAddressable, expected) } if status != expected { return status @@ -71,6 +71,6 @@ func (c *compositeProber) Probe(ctx context.Context, addressable NewAddressable, func (c *compositeProber) RotateRootCaCerts(caCerts *string) error { // we don't need to rotate the certs on the http prober as it isn't using them - err := c.httpsProber.RotateRootCaCerts(caCerts) + err := c.httpsProber.rotateRootCaCerts(caCerts) return err } diff --git a/control-plane/pkg/prober/composite_prober_test.go b/control-plane/pkg/prober/composite_prober_test.go index a3370e2d81..d10715ebb5 100644 --- a/control-plane/pkg/prober/composite_prober_test.go +++ b/control-plane/pkg/prober/composite_prober_test.go @@ -46,7 +46,7 @@ func TestCompositeProber(t *testing.T) { name string pods []*corev1.Pod podsLabelsSelector labels.Selector - addressable NewAddressable + addressable ProberAddressable responseStatusCode int wantStatus Status wantRequeueCountMin int @@ -66,7 +66,7 @@ func TestCompositeProber(t *testing.T) { }, }, podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}), - addressable: NewAddressable{ + addressable: ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &duckv1.Addressable{ URL: &apis.URL{Scheme: "http", Path: "/b1/b1"}, @@ -97,7 +97,7 @@ func TestCompositeProber(t *testing.T) { }, }, podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}), - addressable: NewAddressable{ + addressable: ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &duckv1.Addressable{ URL: &apis.URL{Scheme: "https", Path: "/b1/b1"}, @@ -128,7 +128,7 @@ func TestCompositeProber(t *testing.T) { }, }, podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}), - addressable: NewAddressable{ + addressable: ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &duckv1.Addressable{ URL: &apis.URL{Scheme: "http", Path: "/b1/b1"}, @@ -197,7 +197,7 @@ func TestCompositeProber(t *testing.T) { } } - var IPsLister IPsLister = func(addressable Addressable) ([]string, error) { + var IPsLister IPsLister = func(addressable proberAddressable) ([]string, error) { pods, err := podinformer.Get(ctx).Lister().List(tc.podsLabelsSelector) if err != nil { return nil, err @@ -238,7 +238,7 @@ func TestCompositeProberNoTLS(t *testing.T) { name string pods []*corev1.Pod podsLabelsSelector labels.Selector - addressable NewAddressable + addressable ProberAddressable responseStatusCode int wantStatus Status wantRequeueCountMin int @@ -258,7 +258,7 @@ func TestCompositeProberNoTLS(t *testing.T) { }, }, podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}), - addressable: NewAddressable{ + addressable: ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &duckv1.Addressable{ URL: &apis.URL{Scheme: "http", Path: "/b1/b1"}, @@ -289,7 +289,7 @@ func TestCompositeProberNoTLS(t *testing.T) { }, }, podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}), - addressable: NewAddressable{ + addressable: ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &duckv1.Addressable{ URL: &apis.URL{Scheme: "https", Path: "/b1/b1"}, @@ -320,7 +320,7 @@ func TestCompositeProberNoTLS(t *testing.T) { }, }, podsLabelsSelector: labels.SelectorFromSet(map[string]string{"app": "p"}), - addressable: NewAddressable{ + addressable: ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &duckv1.Addressable{ URL: &apis.URL{Scheme: "http", Path: "/b1/b1"}, @@ -389,7 +389,7 @@ func TestCompositeProberNoTLS(t *testing.T) { } } - var IPsLister IPsLister = func(addressable Addressable) ([]string, error) { + var IPsLister IPsLister = func(addressable proberAddressable) ([]string, error) { pods, err := podinformer.Get(ctx).Lister().List(tc.podsLabelsSelector) if err != nil { return nil, err diff --git a/control-plane/pkg/prober/prober.go b/control-plane/pkg/prober/prober.go index 92ab008b83..47919efc45 100644 --- a/control-plane/pkg/prober/prober.go +++ b/control-plane/pkg/prober/prober.go @@ -29,8 +29,8 @@ import ( "knative.dev/pkg/network" ) -// Addressable contains addressable resource data for the prober. -type Addressable struct { +// proberAddressable contains addressable resource data for the prober. +type proberAddressable struct { // Addressable address. Address *url.URL // Resource key. @@ -41,15 +41,15 @@ type Addressable struct { type EnqueueFunc func(key types.NamespacedName) // Prober probes an addressable resource. -type Prober interface { +type prober interface { // Probe probes the provided Addressable resource and returns its Status. - Probe(ctx context.Context, addressable Addressable, expected Status) Status + probe(ctx context.Context, addressable proberAddressable, expected Status) Status // RotateRootCaCerts rotates the CA certs used to make http requests - RotateRootCaCerts(caCerts *string) error + rotateRootCaCerts(caCerts *string) error } -// NewAddressable contains addressable resource data for the new prober -type NewAddressable struct { +// ProberAddressable contains addressable resource data for the new prober +type ProberAddressable struct { // Addressable status AddressStatus *duckv1.AddressStatus // Resource key @@ -59,7 +59,7 @@ type NewAddressable struct { // NewProber probes an addressable resource type NewProber interface { // Probe probes the provided NewAddressable resource and returns its Status - Probe(ctx context.Context, addressable NewAddressable, expected Status) Status + Probe(ctx context.Context, addressable ProberAddressable, expected Status) Status // RotateRootCaCerts rotates the CA certs used to make http requests RotateRootCaCerts(caCerts *string) error } @@ -68,21 +68,21 @@ type NewProber interface { // ordinary functions as Prober. If f is a function // with the appropriate signature, Func(f) is a // Prober that calls f. -type Func func(ctx context.Context, addressable Addressable, expected Status) Status +type Func func(ctx context.Context, addressable proberAddressable, expected Status) Status // Probe implements the Prober interface for Func. -func (p Func) Probe(ctx context.Context, addressable Addressable, expected Status) Status { +func (p Func) probe(ctx context.Context, addressable proberAddressable, expected Status) Status { return p(ctx, addressable, expected) } // RotateRootCaCerts is an empty implementation to complete the Prober interface for Func. -func (p Func) RotateRootCaCerts(caCerts *string) error { +func (p Func) rotateRootCaCerts(caCerts *string) error { return nil } -type NewFunc func(ctx context.Context, addressable NewAddressable, expected Status) Status +type NewFunc func(ctx context.Context, addressable ProberAddressable, expected Status) Status -func (p NewFunc) Probe(ctx context.Context, addressable NewAddressable, expected Status) Status { +func (p NewFunc) Probe(ctx context.Context, addressable ProberAddressable, expected Status) Status { return p(ctx, addressable, expected) } @@ -127,7 +127,7 @@ func probe(ctx context.Context, client httpClient, logger *zap.Logger, address s } func IPsListerFromService(svc types.NamespacedName) IPsLister { - return func(addressable Addressable) ([]string, error) { + return func(addressable proberAddressable) ([]string, error) { return []string{GetIPForService(svc)}, nil } } @@ -139,7 +139,7 @@ func GetIPForService(svc types.NamespacedName) string { type IPListerWithMapping interface { Register(svc types.NamespacedName, ip string) Unregister(svc types.NamespacedName) - List(addressable Addressable) ([]string, error) + List(addressable proberAddressable) ([]string, error) } type ipListerWithMapping struct { @@ -171,7 +171,7 @@ func (m *ipListerWithMapping) Unregister(svc types.NamespacedName) { delete(m.mapping, a) } -func (m *ipListerWithMapping) List(addressable Addressable) ([]string, error) { +func (m *ipListerWithMapping) List(addressable proberAddressable) ([]string, error) { a := addressable.ResourceKey.String() m.mx.RLock() @@ -185,7 +185,7 @@ func (m *ipListerWithMapping) List(addressable Addressable) ([]string, error) { } func IdentityIPsLister() IPsLister { - return func(addressable Addressable) ([]string, error) { + return func(addressable proberAddressable) ([]string, error) { return []string{addressable.Address.Host}, nil } } diff --git a/control-plane/pkg/prober/prober_test.go b/control-plane/pkg/prober/prober_test.go index cafb496ae3..29f1e1db02 100644 --- a/control-plane/pkg/prober/prober_test.go +++ b/control-plane/pkg/prober/prober_test.go @@ -30,12 +30,12 @@ func TestFuncProbe(t *testing.T) { calls := atomic.NewInt32(0) status := StatusReady - p := Func(func(ctx context.Context, addressable Addressable, expected Status) Status { + p := Func(func(ctx context.Context, addressable proberAddressable, expected Status) Status { calls.Inc() return status }) - s := p.Probe(context.Background(), Addressable{}, status) + s := p.probe(context.Background(), proberAddressable{}, status) require.Equal(t, status, s, s.String()) require.Equal(t, int32(1), calls.Load()) @@ -60,7 +60,7 @@ func TestIPsListerFromService(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := IPsListerFromService(tt.svc)(Addressable{}) + got, err := IPsListerFromService(tt.svc)(proberAddressable{}) if tt.wantErr != (err != nil) { t.Errorf("Got err %v, wantErr %v", err, tt.wantErr) } diff --git a/control-plane/pkg/prober/probertesting/mock_prober.go b/control-plane/pkg/prober/probertesting/mock_prober.go index c1de9b2dea..7d081bc02b 100644 --- a/control-plane/pkg/prober/probertesting/mock_prober.go +++ b/control-plane/pkg/prober/probertesting/mock_prober.go @@ -22,15 +22,8 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" ) -// MockProber returns a prober that always returns the provided status. -func MockProber(status prober.Status) prober.Prober { - return prober.Func(func(ctx context.Context, addressable prober.Addressable, expected prober.Status) prober.Status { - return status - }) -} - func MockNewProber(status prober.Status) prober.NewProber { - return prober.NewFunc(func(ctx context.Context, addressable prober.NewAddressable, expected prober.Status) prober.Status { + return prober.NewFunc(func(ctx context.Context, addressable prober.ProberAddressable, expected prober.Status) prober.Status { return status }) } diff --git a/control-plane/pkg/prober/probertesting/mock_prober_test.go b/control-plane/pkg/prober/probertesting/mock_prober_test.go index c2c6aba914..ac2559ac84 100644 --- a/control-plane/pkg/prober/probertesting/mock_prober_test.go +++ b/control-plane/pkg/prober/probertesting/mock_prober_test.go @@ -30,30 +30,30 @@ func TestMockProber(t *testing.T) { name string status prober.Status ctx context.Context - addressable prober.Addressable + addressable prober.ProberAddressable }{ { name: "unknown", status: prober.StatusUnknown, ctx: context.Background(), - addressable: prober.Addressable{}, + addressable: prober.ProberAddressable{}, }, { name: "ready", status: prober.StatusReady, ctx: context.Background(), - addressable: prober.Addressable{}, + addressable: prober.ProberAddressable{}, }, { name: "notReady", status: prober.StatusNotReady, ctx: context.Background(), - addressable: prober.Addressable{}, + addressable: prober.ProberAddressable{}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := MockProber(tt.status).Probe(tt.ctx, tt.addressable, tt.status) + got := MockNewProber(tt.status).Probe(tt.ctx, tt.addressable, tt.status) if diff := cmp.Diff(tt.status, got); diff != "" { t.Error("(-want, got)", diff) } diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 63866adc57..014c6d35f0 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -261,7 +261,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } - proberAddressable := prober.NewAddressable{ + proberAddressable := prober.ProberAddressable{ AddressStatus: &addressableStatus, ResourceKey: types.NamespacedName{ Namespace: broker.GetNamespace(), @@ -356,7 +356,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker) // - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 // - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update address := receiver.HTTPAddress(ingressHost, broker) - proberAddressable := prober.NewAddressable{ + proberAddressable := prober.ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &address, Addresses: []duckv1.Addressable{address}, diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index cc705f1987..9952cd10c9 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -346,7 +346,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } - proberAddressable := prober.NewAddressable{ + proberAddressable := prober.ProberAddressable{ AddressStatus: &addressableStatus, ResourceKey: types.NamespacedName{ Namespace: channel.GetNamespace(), @@ -429,7 +429,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1 // - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 // - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update address := receiver.HTTPAddress(r.IngressHost, channel) - proberAddressable := prober.NewAddressable{ + proberAddressable := prober.ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &address, Addresses: []duckv1.Addressable{address}, diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2.go b/control-plane/pkg/reconciler/channel/v2/channelv2.go index 601cdafaa5..09e4d95049 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2.go @@ -313,7 +313,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } - proberAddressable := prober.NewAddressable{ + proberAddressable := prober.ProberAddressable{ AddressStatus: &addressableStatus, ResourceKey: types.NamespacedName{ Namespace: channel.GetNamespace(), @@ -419,7 +419,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1 // - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 // - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update address := receiver.HTTPAddress(r.IngressHost, channel) - proberAddressable := prober.NewAddressable{ + proberAddressable := prober.ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &address, Addresses: []duckv1.Addressable{address}, diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go index d12d02746e..41fdb89f57 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -277,7 +277,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } - proberAddressable := prober.NewAddressable{ + proberAddressable := prober.ProberAddressable{ AddressStatus: &addressableStatus, ResourceKey: types.NamespacedName{ Namespace: ks.GetNamespace(), @@ -349,7 +349,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, ks *eventing.KafkaSink) e // - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 // - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update address := receiver.HTTPAddress(r.IngressHost, ks) - proberAddressable := prober.NewAddressable{ + proberAddressable := prober.ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &address, Addresses: []duckv1.Addressable{address},