From 698ee54f70c284a6614eb20b29d9cc7b429ae399 Mon Sep 17 00:00:00 2001 From: Travis Raines <571832+rainest@users.noreply.github.com> Date: Wed, 4 Jan 2023 15:24:53 -0800 Subject: [PATCH] feat(status) add UDP publish service Add CLI arguments for setting a UDP publish service and address overrides. If set, these will be used to populate status information for UDPIngresses. If not set, UDPIngress status will fall back to the default publish service/override, with the expectation that it is a dual-transport LoadBalancer. --- internal/dataplane/address_finder.go | 61 ++++++++++++++- internal/dataplane/address_finder_test.go | 38 +++++++++ internal/manager/config.go | 23 ++++-- internal/manager/controllerdef.go | 3 +- internal/manager/run.go | 5 +- internal/manager/setup.go | 95 ++++++++++++++++------- 6 files changed, 183 insertions(+), 42 deletions(-) diff --git a/internal/dataplane/address_finder.go b/internal/dataplane/address_finder.go index cf2d802add..3a60380c79 100644 --- a/internal/dataplane/address_finder.go +++ b/internal/dataplane/address_finder.go @@ -21,8 +21,10 @@ type AddressGetter func() ([]string, error) // AddressFinder is a threadsafe metadata object which can provide the current // live addresses in use by the dataplane at any point in time. type AddressFinder struct { - overrideAddresses []string - addressGetter AddressGetter + overrideAddresses []string + overrideAddressesUDP []string + addressGetter AddressGetter + addressGetterUDP AddressGetter lock sync.RWMutex } @@ -45,6 +47,14 @@ func (a *AddressFinder) SetGetter(getter AddressGetter) { a.addressGetter = getter } +// SetUDPGetter provides a callback function that the AddressFinder will use to +// dynamically retrieve the UDP addresses of the data-plane. +func (a *AddressFinder) SetUDPGetter(getter AddressGetter) { + a.lock.Lock() + defer a.lock.Unlock() + a.addressGetterUDP = getter +} + // SetOverrides hard codes a specific list of addresses to be the addresses // that this finder produces for the data-plane. To disable overrides, call // this method again with an empty list. @@ -54,6 +64,15 @@ func (a *AddressFinder) SetOverrides(addrs []string) { a.overrideAddresses = addrs } +// SetUDPOverrides hard codes a specific list of addresses to be the UDP addresses +// that this finder produces for the data-plane. To disable overrides, call +// this method again with an empty list. +func (a *AddressFinder) SetUDPOverrides(addrs []string) { + a.lock.Lock() + defer a.lock.Unlock() + a.overrideAddressesUDP = addrs +} + // GetAddresses provides a list of the addresses which the data-plane is // listening on for ingress network traffic. Addresses can either be IP // addresses or hostnames. @@ -72,6 +91,28 @@ func (a *AddressFinder) GetAddresses() ([]string, error) { return nil, fmt.Errorf("data-plane addresses can't be retrieved: no valid method available") } +// GetUDPAddresses provides a list of the UDP addresses which the data-plane is +// listening on for ingress network traffic. Addresses can either be IP +// addresses or hostnames. If UDP settings are not configured, falls back to GetAddresses(). +func (a *AddressFinder) GetUDPAddresses() ([]string, error) { + a.lock.RLock() + defer a.lock.RUnlock() + + if len(a.overrideAddressesUDP) > 0 { + return a.overrideAddressesUDP, nil + } + + if len(a.overrideAddresses) > 0 && a.addressGetterUDP == nil { + return a.overrideAddresses, nil + } + + if a.addressGetterUDP != nil { + return a.addressGetterUDP() + } + + return a.GetAddresses() +} + // GetLoadBalancerAddresses provides a list of the addresses which the // data-plane is listening on for ingress network traffic, but provides the // addresses in Kubernetes corev1.LoadBalancerIngress format. Addresses can be @@ -81,7 +122,10 @@ func (a *AddressFinder) GetLoadBalancerAddresses() ([]netv1.IngressLoadBalancerI if err != nil { return nil, err } + return getAddressHelper(addrs) +} +func getAddressHelper(addrs []string) ([]netv1.IngressLoadBalancerIngress, error) { var loadBalancerAddresses []netv1.IngressLoadBalancerIngress for _, addr := range addrs { ing := netv1.IngressLoadBalancerIngress{} @@ -99,6 +143,19 @@ func (a *AddressFinder) GetLoadBalancerAddresses() ([]netv1.IngressLoadBalancerI return loadBalancerAddresses, nil } +// GetUDPLoadBalancerAddresses provides a list of the addresses which the +// data-plane is listening on for UDP network traffic, but provides the +// addresses in Kubernetes corev1.LoadBalancerIngress format. Addresses can be +// IP addresses or hostnames. +func (a *AddressFinder) GetUDPLoadBalancerAddresses() ([]netv1.IngressLoadBalancerIngress, error) { + addrs, err := a.GetUDPAddresses() + if err != nil { + return nil, err + } + return getAddressHelper(addrs) + +} + // ----------------------------------------------------------------------------- // // ----------------------------------------------------------------------------- diff --git a/internal/dataplane/address_finder_test.go b/internal/dataplane/address_finder_test.go index 6a87e0f5e7..58efaa9183 100644 --- a/internal/dataplane/address_finder_test.go +++ b/internal/dataplane/address_finder_test.go @@ -74,3 +74,41 @@ func TestAddressFinder(t *testing.T) { require.Empty(t, lbs) require.Equal(t, fmt.Sprintf("%s is not a valid DNS hostname", invalidDNSAddrs[0]), err.Error()) } + +func TestUDPAddressFinder(t *testing.T) { + t.Log("generating a new AddressFinder") + finder := NewAddressFinder() + require.NotNil(t, finder) + require.Nil(t, finder.addressGetter) + + t.Log("generating fake AddressGetters") + defaultAddrs := []string{"127.0.0.1", "127.0.0.2"} + overrideAddrs := []string{"192.168.1.1", "192.168.1.2", "192.168.1.3"} + fakeGetter := func() ([]string, error) { return defaultAddrs, nil } + + defaultUDPAddrs := []string{"127.1.0.1", "127.1.0.2"} + overrideUDPAddrs := []string{"192.168.2.1", "192.168.2.2", "192.168.2.3"} + fakeUDPGetter := func() ([]string, error) { return defaultUDPAddrs, nil } + + t.Log("verifying getting a list of addresses from the finder after a getter function is provided") + finder.SetGetter(fakeGetter) + addrs, err := finder.GetUDPAddresses() + require.NoError(t, err) + require.Equal(t, defaultAddrs, addrs) + + t.Log("verifying that overrides take precedent over the getter") + finder.SetOverrides(overrideAddrs) + addrs, err = finder.GetAddresses() + require.NoError(t, err) + require.Equal(t, overrideAddrs, addrs) + + finder.SetUDPGetter(fakeUDPGetter) + addrs, err = finder.GetUDPAddresses() + require.NoError(t, err) + require.Equal(t, defaultUDPAddrs, addrs) + + finder.SetUDPOverrides(overrideUDPAddrs) + addrs, err = finder.GetUDPAddresses() + require.NoError(t, err) + require.Equal(t, overrideUDPAddrs, addrs) +} diff --git a/internal/manager/config.go b/internal/manager/config.go index d622af68d0..aa29eb23ad 100644 --- a/internal/manager/config.go +++ b/internal/manager/config.go @@ -67,9 +67,11 @@ type Config struct { GatewayAPIControllerName string // Ingress status - PublishService string - PublishStatusAddress []string - UpdateStatus bool + PublishService string + PublishServiceUDP string + PublishStatusAddress []string + PublishStatusAddressUDP []string + UpdateStatus bool // Kubernetes API toggling IngressExtV1beta1Enabled bool @@ -168,11 +170,16 @@ func (c *Config) FlagSet() *pflag.FlagSet { a comma-separated list of namespaces.`) // Ingress status - flagSet.StringVar(&c.PublishService, "publish-service", "", `Service fronting Ingress resources in "namespace/name" - format. The controller will update Ingress status information with this Service's endpoints.`) - flagSet.StringSliceVar(&c.PublishStatusAddress, "publish-status-address", []string{}, `User-provided addresses in - comma-separated string format, for use in lieu of "publish-service" when that Service lacks useful address - information (for example, in bare-metal environments).`) + flagSet.StringVar(&c.PublishService, "publish-service", "", `Service fronting routing resources in "namespace/name" + format. The controller will update route resource status information with this Service's endpoints.`) + flagSet.StringSliceVar(&c.PublishStatusAddress, "publish-status-address", []string{}, `User-provided address CSV. + For use in lieu of "publish-service" when that Service lacks useful address information (for example, + in bare-metal environments).`) + flagSet.StringVar(&c.PublishServiceUDP, "publish-service-udp", "", `Service fronting UDP routing resources in + "namespace/name" format. The controller will update UDP route status information with this Service's + endpoints. If omitted, the same Service will be used for both TCP and UDP routes.`) + flagSet.StringSliceVar(&c.PublishStatusAddressUDP, "publish-status-address-udp", []string{}, `User-provided + address CSV, for use in lieu of "publish-service-udp" when that Service lacks useful address information.`) flagSet.BoolVar(&c.UpdateStatus, "update-status", true, `Indicates if the ingress controller should update the status of resources (e.g. IP/Hostname for v1.Ingress, e.t.c.)`) diff --git a/internal/manager/controllerdef.go b/internal/manager/controllerdef.go index ddd0505530..bcc50ce442 100644 --- a/internal/manager/controllerdef.go +++ b/internal/manager/controllerdef.go @@ -59,6 +59,7 @@ func setupControllers( mgr manager.Manager, dataplaneClient *dataplane.KongClient, dataplaneAddressFinder *dataplane.AddressFinder, + udpDataplaneAddressFinder *dataplane.AddressFinder, kubernetesStatusQueue *status.Queue, c *Config, featureGates map[string]bool, @@ -197,7 +198,7 @@ func setupControllers( IngressClassName: c.IngressClassName, DisableIngressClassLookups: !c.IngressClassNetV1Enabled, StatusQueue: kubernetesStatusQueue, - DataplaneAddressFinder: dataplaneAddressFinder, + DataplaneAddressFinder: udpDataplaneAddressFinder, CacheSyncTimeout: c.CacheSyncTimeout, }, }, diff --git a/internal/manager/run.go b/internal/manager/run.go index 4f8087e891..8529f0059c 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -179,7 +179,7 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d } setupLog.Info("Initializing Dataplane Address Discovery") - dataplaneAddressFinder, err := setupDataplaneAddressFinder(ctx, mgr.GetClient(), c) + dataplaneAddressFinder, udpDataplaneAddressFinder, err := setupDataplaneAddressFinder(ctx, mgr.GetClient(), c) if err != nil { return err } @@ -190,7 +190,8 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d gateway.ControllerName = gatewayv1beta1.GatewayController(c.GatewayAPIControllerName) setupLog.Info("Starting Enabled Controllers") - controllers, err := setupControllers(mgr, dataplaneClient, dataplaneAddressFinder, kubernetesStatusQueue, c, featureGates) + controllers, err := setupControllers(mgr, dataplaneClient, + dataplaneAddressFinder, udpDataplaneAddressFinder, kubernetesStatusQueue, c, featureGates) if err != nil { return fmt.Errorf("unable to setup controller as expected %w", err) } diff --git a/internal/manager/setup.go b/internal/manager/setup.go index d40690789f..ff82015b3e 100644 --- a/internal/manager/setup.go +++ b/internal/manager/setup.go @@ -200,51 +200,88 @@ func setupAdmissionServer( return nil } -func setupDataplaneAddressFinder(ctx context.Context, mgrc client.Client, c *Config) (*dataplane.AddressFinder, error) { +// setupDataplaneAddressFinder returns a default and UDP address finder. These finders return the override addresses if +// set or the publish service addresses if no overrides are set. If no UDP overrides or UDP publish service are set, +// the UDP finder will also return the default addresses. If no override or publish service is set, this function +// returns nil finders and an error. +func setupDataplaneAddressFinder( + ctx context.Context, + mgrc client.Client, + c *Config, +) (*dataplane.AddressFinder, *dataplane.AddressFinder, error) { dataplaneAddressFinder := dataplane.NewAddressFinder() + udpDataplaneAddressFinder := dataplane.NewAddressFinder() + var getter func() ([]string, error) if c.UpdateStatus { + // Default if overrideAddrs := c.PublishStatusAddress; len(overrideAddrs) > 0 { dataplaneAddressFinder.SetOverrides(overrideAddrs) } else if c.PublishService != "" { parts := strings.Split(c.PublishService, "/") if len(parts) != 2 { - return nil, fmt.Errorf("publish service %s is invalid, expecting /", c.PublishService) + return nil, nil, fmt.Errorf("publish service %s is invalid, expecting /", c.PublishService) } nsn := types.NamespacedName{ Namespace: parts[0], Name: parts[1], } - dataplaneAddressFinder.SetGetter(func() ([]string, error) { - svc := new(corev1.Service) - if err := mgrc.Get(ctx, nsn, svc); err != nil { - return nil, err - } + getter = generateAddressFinderGetter(ctx, mgrc, nsn) + dataplaneAddressFinder.SetGetter(getter) + } else { + return nil, nil, fmt.Errorf("status updates enabled but no method to determine data-plane addresses, need either --publish-service or --publish-status-address") + } - var addrs []string - switch svc.Spec.Type { //nolint:exhaustive - case corev1.ServiceTypeLoadBalancer: - for _, lbaddr := range svc.Status.LoadBalancer.Ingress { - if lbaddr.IP != "" { - addrs = append(addrs, lbaddr.IP) - } - if lbaddr.Hostname != "" { - addrs = append(addrs, lbaddr.Hostname) - } - } - default: - addrs = append(addrs, svc.Spec.ClusterIPs...) - } + // UDP. falls back to default if not configured + if udpOverrideAddrs := c.PublishStatusAddressUDP; len(udpOverrideAddrs) > 0 { + dataplaneAddressFinder.SetUDPOverrides(udpOverrideAddrs) + } else if c.PublishServiceUDP != "" { + parts := strings.Split(c.PublishServiceUDP, "/") + if len(parts) != 2 { + return nil, nil, fmt.Errorf("UDP publish service %s is invalid, expecting /", c.PublishService) + } + nsn := types.NamespacedName{ + Namespace: parts[0], + Name: parts[1], + } + udpDataplaneAddressFinder.SetGetter(generateAddressFinderGetter(ctx, mgrc, nsn)) + } else { + udpDataplaneAddressFinder.SetGetter(getter) + } + } + + return dataplaneAddressFinder, udpDataplaneAddressFinder, nil +} + +func generateAddressFinderGetter( + ctx context.Context, + mgrc client.Client, + nsn types.NamespacedName, +) func() ([]string, error) { + return func() ([]string, error) { + svc := new(corev1.Service) + if err := mgrc.Get(ctx, nsn, svc); err != nil { + return nil, err + } - if len(addrs) == 0 { - return nil, fmt.Errorf("waiting for addresses to be provisioned for publish service %s/%s", nsn.Namespace, nsn.Name) + var addrs []string + switch svc.Spec.Type { //nolint:exhaustive + case corev1.ServiceTypeLoadBalancer: + for _, lbaddr := range svc.Status.LoadBalancer.Ingress { + if lbaddr.IP != "" { + addrs = append(addrs, lbaddr.IP) } + if lbaddr.Hostname != "" { + addrs = append(addrs, lbaddr.Hostname) + } + } + default: + addrs = append(addrs, svc.Spec.ClusterIPs...) + } - return addrs, nil - }) - } else { - return nil, fmt.Errorf("status updates enabled but no method to determine data-plane addresses, need either --publish-service or --publish-status-address") + if len(addrs) == 0 { + return nil, fmt.Errorf("waiting for addresses to be provisioned for publish service %s/%s", nsn.Namespace, nsn.Name) } - } - return dataplaneAddressFinder, nil + return addrs, nil + } }