Skip to content

Commit

Permalink
feat(status) add UDP publish service
Browse files Browse the repository at this point in the history
Add CLI arguments for setting a UDP publish service and address
overrides. If set, these will be used to populate status information for
UDPIngresses. If not set, UDPIngress status will fall back to the
default publish service/override, with the expectation that it is a
dual-transport LoadBalancer.
  • Loading branch information
rainest committed Jan 4, 2023
1 parent 5581c8c commit 75fde8d
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 42 deletions.
61 changes: 59 additions & 2 deletions internal/dataplane/address_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ type AddressGetter func() ([]string, error)
// AddressFinder is a threadsafe metadata object which can provide the current
// live addresses in use by the dataplane at any point in time.
type AddressFinder struct {
overrideAddresses []string
addressGetter AddressGetter
overrideAddresses []string
overrideAddressesUDP []string
addressGetter AddressGetter
addressGetterUDP AddressGetter

lock sync.RWMutex
}
Expand All @@ -45,6 +47,14 @@ func (a *AddressFinder) SetGetter(getter AddressGetter) {
a.addressGetter = getter
}

// SetUDPGetter provides a callback function that the AddressFinder will use to
// dynamically retrieve the UDP addresses of the data-plane.
func (a *AddressFinder) SetUDPGetter(getter AddressGetter) {
a.lock.Lock()
defer a.lock.Unlock()
a.addressGetterUDP = getter
}

// SetOverrides hard codes a specific list of addresses to be the addresses
// that this finder produces for the data-plane. To disable overrides, call
// this method again with an empty list.
Expand All @@ -54,6 +64,15 @@ func (a *AddressFinder) SetOverrides(addrs []string) {
a.overrideAddresses = addrs
}

// SetUDPOverrides hard codes a specific list of addresses to be the UDP addresses
// that this finder produces for the data-plane. To disable overrides, call
// this method again with an empty list.
func (a *AddressFinder) SetUDPOverrides(addrs []string) {
a.lock.Lock()
defer a.lock.Unlock()
a.overrideAddressesUDP = addrs
}

// GetAddresses provides a list of the addresses which the data-plane is
// listening on for ingress network traffic. Addresses can either be IP
// addresses or hostnames.
Expand All @@ -72,6 +91,28 @@ func (a *AddressFinder) GetAddresses() ([]string, error) {
return nil, fmt.Errorf("data-plane addresses can't be retrieved: no valid method available")
}

// GetUDPAddresses provides a list of the UDP addresses which the data-plane is
// listening on for ingress network traffic. Addresses can either be IP
// addresses or hostnames. If UDP settings are not configured, falls back to GetAddresses().
func (a *AddressFinder) GetUDPAddresses() ([]string, error) {
a.lock.RLock()
defer a.lock.RUnlock()

if len(a.overrideAddressesUDP) > 0 {
return a.overrideAddressesUDP, nil
}

if len(a.overrideAddresses) > 0 && a.addressGetterUDP == nil {
return a.overrideAddresses, nil
}

if a.addressGetterUDP != nil {
return a.addressGetterUDP()
}

return a.GetAddresses()
}

// GetLoadBalancerAddresses provides a list of the addresses which the
// data-plane is listening on for ingress network traffic, but provides the
// addresses in Kubernetes corev1.LoadBalancerIngress format. Addresses can be
Expand All @@ -81,7 +122,10 @@ func (a *AddressFinder) GetLoadBalancerAddresses() ([]netv1.IngressLoadBalancerI
if err != nil {
return nil, err
}
return getAddressHelper(addrs)
}

func getAddressHelper(addrs []string) ([]netv1.IngressLoadBalancerIngress, error) {
var loadBalancerAddresses []netv1.IngressLoadBalancerIngress
for _, addr := range addrs {
ing := netv1.IngressLoadBalancerIngress{}
Expand All @@ -99,6 +143,19 @@ func (a *AddressFinder) GetLoadBalancerAddresses() ([]netv1.IngressLoadBalancerI
return loadBalancerAddresses, nil
}

// GetUDPLoadBalancerAddresses provides a list of the addresses which the
// data-plane is listening on for UDP network traffic, but provides the
// addresses in Kubernetes corev1.LoadBalancerIngress format. Addresses can be
// IP addresses or hostnames.
func (a *AddressFinder) GetUDPLoadBalancerAddresses() ([]netv1.IngressLoadBalancerIngress, error) {
addrs, err := a.GetUDPAddresses()
if err != nil {
return nil, err
}
return getAddressHelper(addrs)

}

// -----------------------------------------------------------------------------
//
// -----------------------------------------------------------------------------
Expand Down
38 changes: 38 additions & 0 deletions internal/dataplane/address_finder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,41 @@ func TestAddressFinder(t *testing.T) {
require.Empty(t, lbs)
require.Equal(t, fmt.Sprintf("%s is not a valid DNS hostname", invalidDNSAddrs[0]), err.Error())
}

func TestUDPAddressFinder(t *testing.T) {
t.Log("generating a new AddressFinder")
finder := NewAddressFinder()
require.NotNil(t, finder)
require.Nil(t, finder.addressGetter)

t.Log("generating fake AddressGetters")
defaultAddrs := []string{"127.0.0.1", "127.0.0.2"}
overrideAddrs := []string{"192.168.1.1", "192.168.1.2", "192.168.1.3"}
fakeGetter := func() ([]string, error) { return defaultAddrs, nil }

defaultUDPAddrs := []string{"127.1.0.1", "127.1.0.2"}
overrideUDPAddrs := []string{"192.168.2.1", "192.168.2.2", "192.168.2.3"}
fakeUDPGetter := func() ([]string, error) { return defaultUDPAddrs, nil }

t.Log("verifying getting a list of addresses from the finder after a getter function is provided")
finder.SetGetter(fakeGetter)
addrs, err := finder.GetUDPAddresses()
require.NoError(t, err)
require.Equal(t, defaultAddrs, addrs)

t.Log("verifying that overrides take precedent over the getter")
finder.SetOverrides(overrideAddrs)
addrs, err = finder.GetAddresses()
require.NoError(t, err)
require.Equal(t, overrideAddrs, addrs)

finder.SetUDPGetter(fakeUDPGetter)
addrs, err = finder.GetUDPAddresses()
require.NoError(t, err)
require.Equal(t, defaultUDPAddrs, addrs)

finder.SetUDPOverrides(overrideUDPAddrs)
addrs, err = finder.GetUDPAddresses()
require.NoError(t, err)
require.Equal(t, overrideUDPAddrs, addrs)
}
23 changes: 15 additions & 8 deletions internal/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ type Config struct {
GatewayAPIControllerName string

// Ingress status
PublishService string
PublishStatusAddress []string
UpdateStatus bool
PublishService string
PublishServiceUDP string
PublishStatusAddress []string
PublishStatusAddressUDP []string
UpdateStatus bool

// Kubernetes API toggling
IngressExtV1beta1Enabled bool
Expand Down Expand Up @@ -168,11 +170,16 @@ func (c *Config) FlagSet() *pflag.FlagSet {
a comma-separated list of namespaces.`)

// Ingress status
flagSet.StringVar(&c.PublishService, "publish-service", "", `Service fronting Ingress resources in "namespace/name"
format. The controller will update Ingress status information with this Service's endpoints.`)
flagSet.StringSliceVar(&c.PublishStatusAddress, "publish-status-address", []string{}, `User-provided addresses in
comma-separated string format, for use in lieu of "publish-service" when that Service lacks useful address
information (for example, in bare-metal environments).`)
flagSet.StringVar(&c.PublishService, "publish-service", "", `Service fronting routing resources in "namespace/name"
format. The controller will update route resource status information with this Service's endpoints.`)
flagSet.StringSliceVar(&c.PublishStatusAddress, "publish-status-address", []string{}, `User-provided address CSV.
For use in lieu of "publish-service" when that Service lacks useful address information (for example,
in bare-metal environments).`)
flagSet.StringVar(&c.PublishServiceUDP, "publish-service-udp", "", `Service fronting UDP routing resources in
"namespace/name" format. The controller will update UDP route status information with this Service's
endpoints. If omitted, the same Service will be used for both TCP and UDP routes.`)
flagSet.StringSliceVar(&c.PublishStatusAddressUDP, "publish-status-address-udp", []string{}, `User-provided
address CSV, for use in lieu of "publish-service-udp" when that Service lacks useful address information.`)
flagSet.BoolVar(&c.UpdateStatus, "update-status", true,
`Indicates if the ingress controller should update the status of resources (e.g. IP/Hostname for v1.Ingress, e.t.c.)`)

Expand Down
3 changes: 2 additions & 1 deletion internal/manager/controllerdef.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func setupControllers(
mgr manager.Manager,
dataplaneClient *dataplane.KongClient,
dataplaneAddressFinder *dataplane.AddressFinder,
udpDataplaneAddressFinder *dataplane.AddressFinder,
kubernetesStatusQueue *status.Queue,
c *Config,
featureGates map[string]bool,
Expand Down Expand Up @@ -197,7 +198,7 @@ func setupControllers(
IngressClassName: c.IngressClassName,
DisableIngressClassLookups: !c.IngressClassNetV1Enabled,
StatusQueue: kubernetesStatusQueue,
DataplaneAddressFinder: dataplaneAddressFinder,
DataplaneAddressFinder: udpDataplaneAddressFinder,
CacheSyncTimeout: c.CacheSyncTimeout,
},
},
Expand Down
5 changes: 3 additions & 2 deletions internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d
}

setupLog.Info("Initializing Dataplane Address Discovery")
dataplaneAddressFinder, err := setupDataplaneAddressFinder(ctx, mgr.GetClient(), c)
dataplaneAddressFinder, udpDataplaneAddressFinder, err := setupDataplaneAddressFinder(ctx, mgr.GetClient(), c)
if err != nil {
return err
}
Expand All @@ -190,7 +190,8 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d
gateway.ControllerName = gatewayv1beta1.GatewayController(c.GatewayAPIControllerName)

setupLog.Info("Starting Enabled Controllers")
controllers, err := setupControllers(mgr, dataplaneClient, dataplaneAddressFinder, kubernetesStatusQueue, c, featureGates)
controllers, err := setupControllers(mgr, dataplaneClient,
dataplaneAddressFinder, udpDataplaneAddressFinder, kubernetesStatusQueue, c, featureGates)
if err != nil {
return fmt.Errorf("unable to setup controller as expected %w", err)
}
Expand Down
96 changes: 67 additions & 29 deletions internal/manager/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,51 +200,89 @@ func setupAdmissionServer(
return nil
}

func setupDataplaneAddressFinder(ctx context.Context, mgrc client.Client, c *Config) (*dataplane.AddressFinder, error) {
// setupDataplaneAddressFinder returns a default and UDP address finder. These finders return the override addresses if
// set or the publish service addresses if no overrides are set. If no UDP overrides or UDP publish service are set,
// the UDP finder will also return the default addresses. If no override or publish service is set, this function
// returns nil finders and an error.
func setupDataplaneAddressFinder(
ctx context.Context,
mgrc client.Client,
c *Config,
) (*dataplane.AddressFinder, *dataplane.AddressFinder, error) {
dataplaneAddressFinder := dataplane.NewAddressFinder()
udpDataplaneAddressFinder := dataplane.NewAddressFinder()
var getter func() ([]string, error)
if c.UpdateStatus {
// Default
if overrideAddrs := c.PublishStatusAddress; len(overrideAddrs) > 0 {
dataplaneAddressFinder.SetOverrides(overrideAddrs)
} else if c.PublishService != "" {
parts := strings.Split(c.PublishService, "/")
if len(parts) != 2 {
return nil, fmt.Errorf("publish service %s is invalid, expecting <namespace>/<name>", c.PublishService)
return nil, nil, fmt.Errorf("publish service %s is invalid, expecting <namespace>/<name>", c.PublishService)
}
nsn := types.NamespacedName{
Namespace: parts[0],
Name: parts[1],
}
dataplaneAddressFinder.SetGetter(func() ([]string, error) {
svc := new(corev1.Service)
if err := mgrc.Get(ctx, nsn, svc); err != nil {
return nil, err
}
getter = generateAddressFinderGetter(ctx, mgrc, c, nsn)
dataplaneAddressFinder.SetGetter(getter)
} else {
return nil, nil, fmt.Errorf("status updates enabled but no method to determine data-plane addresses, need either --publish-service or --publish-status-address")
}

var addrs []string
switch svc.Spec.Type { //nolint:exhaustive
case corev1.ServiceTypeLoadBalancer:
for _, lbaddr := range svc.Status.LoadBalancer.Ingress {
if lbaddr.IP != "" {
addrs = append(addrs, lbaddr.IP)
}
if lbaddr.Hostname != "" {
addrs = append(addrs, lbaddr.Hostname)
}
}
default:
addrs = append(addrs, svc.Spec.ClusterIPs...)
}
// UDP. falls back to default if not configured
if udpOverrideAddrs := c.PublishStatusAddressUDP; len(udpOverrideAddrs) > 0 {
dataplaneAddressFinder.SetUDPOverrides(udpOverrideAddrs)
} else if c.PublishServiceUDP != "" {
parts := strings.Split(c.PublishServiceUDP, "/")
if len(parts) != 2 {
return nil, nil, fmt.Errorf("UDP publish service %s is invalid, expecting <namespace>/<name>", c.PublishService)
}
nsn := types.NamespacedName{
Namespace: parts[0],
Name: parts[1],
}
udpDataplaneAddressFinder.SetGetter(generateAddressFinderGetter(ctx, mgrc, c, nsn))
} else {
udpDataplaneAddressFinder.SetGetter(getter)
}
}

return dataplaneAddressFinder, udpDataplaneAddressFinder, nil
}

func generateAddressFinderGetter(
ctx context.Context,
mgrc client.Client,
c *Config,
nsn types.NamespacedName,
) func() ([]string, error) {
return func() ([]string, error) {
svc := new(corev1.Service)
if err := mgrc.Get(ctx, nsn, svc); err != nil {
return nil, err
}

if len(addrs) == 0 {
return nil, fmt.Errorf("waiting for addresses to be provisioned for publish service %s/%s", nsn.Namespace, nsn.Name)
var addrs []string
switch svc.Spec.Type { //nolint:exhaustive
case corev1.ServiceTypeLoadBalancer:
for _, lbaddr := range svc.Status.LoadBalancer.Ingress {
if lbaddr.IP != "" {
addrs = append(addrs, lbaddr.IP)
}
if lbaddr.Hostname != "" {
addrs = append(addrs, lbaddr.Hostname)
}
}
default:
addrs = append(addrs, svc.Spec.ClusterIPs...)
}

return addrs, nil
})
} else {
return nil, fmt.Errorf("status updates enabled but no method to determine data-plane addresses, need either --publish-service or --publish-status-address")
if len(addrs) == 0 {
return nil, fmt.Errorf("waiting for addresses to be provisioned for publish service %s/%s", nsn.Namespace, nsn.Name)
}
}

return dataplaneAddressFinder, nil
return addrs, nil
}
}

0 comments on commit 75fde8d

Please sign in to comment.