Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 39 additions & 32 deletions pkg/lbmanager/lbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// LBManager Responsible for the creation and deletion of externally accessible Services to access the Postgresql clusters managed by the Postgreslet.
type LBManager struct {
client.Client // todo: service cluster
LBIP string // todo: via configmap
PortRangeStart int32 // todo: via configmap
client.Client
LBIP string
PortRangeStart int32
PortRangeSize int32
}

// New Creates a new LBManager with the given configuration
func New(client client.Client, lbIP string, portRangeStart, portRangeSize int32) *LBManager {
return &LBManager{
Client: client,
Expand All @@ -27,6 +29,7 @@ func New(client client.Client, lbIP string, portRangeStart, portRangeSize int32)
}
}

// CreateSvcLBIfNone Creates a new Service of type LoadBalancer for the given Postgres resource if neccessary
func (m *LBManager) CreateSvcLBIfNone(ctx context.Context, in *api.Postgres) error {
if err := m.Get(ctx, client.ObjectKey{
Namespace: in.ToPeripheralResourceNamespace(),
Expand All @@ -36,30 +39,20 @@ func (m *LBManager) CreateSvcLBIfNone(ctx context.Context, in *api.Postgres) err
return fmt.Errorf("failed to fetch Service of type LoadBalancer: %w", err)
}

existingLBIP, nextFreePort, err := m.nextFreeSocket(ctx)
nextFreePort, err := m.nextFreePort(ctx)
if err != nil {
return fmt.Errorf("failed to get a free port for creating Service of type LoadBalancer: %w", err)
}
var lbIPToUse string
if m.LBIP != "" {
// a specific IP was configured in the config, so use that one
lbIPToUse = m.LBIP
} else if existingLBIP != "" {
// no ip was configured, but one is already in use, so use the existing one
lbIPToUse = existingLBIP
} else {
// nothing was configured, nothing exists yet, so use an empty address so a new loadbalancer will be created and assigned
lbIPToUse = ""
return fmt.Errorf("failed to get the next free port: %w", err)
}

if err := m.Create(ctx, in.ToSvcLB(lbIPToUse, nextFreePort)); err != nil {
if err := m.Create(ctx, in.ToSvcLB(m.LBIP, nextFreePort)); err != nil {
return fmt.Errorf("failed to create Service of type LoadBalancer: %w", err)
}
return nil
}
return nil
}

// DeleteSvcLB Deletes the corresponding Service of type LoadBalancer of the given Postgres resource.
func (m *LBManager) DeleteSvcLB(ctx context.Context, in *api.Postgres) error {
lb := &corev1.Service{}
lb.Namespace = in.ToPeripheralResourceNamespace()
Expand All @@ -70,37 +63,51 @@ func (m *LBManager) DeleteSvcLB(ctx context.Context, in *api.Postgres) error {
return nil
}

func (m *LBManager) nextFreeSocket(ctx context.Context) (string, int32, error) {
// nextFreeSocket finds any existing LoadBalancerIP and the next free port out of the configure port range.
func (m *LBManager) nextFreePort(ctx context.Context) (int32, error) {
// TODO prevent concurrency issues when calculating port / ip.

existingLBIP := ""

// Fetch all services managed by this postgreslet
lbs := &corev1.ServiceList{}
if err := m.List(ctx, lbs, client.MatchingLabels(api.SvcLoadBalancerLabel)); err != nil {
return existingLBIP, 0, fmt.Errorf("failed to fetch the list of services of type LoadBalancer: %w", err)
return 0, fmt.Errorf("failed to fetch the list of services of type LoadBalancer: %w", err)
}

// If there are none, this will be the first (managed) service we create, so start with PortRangeStart and return
if len(lbs.Items) == 0 {
return existingLBIP, m.PortRangeStart, nil
return m.PortRangeStart, nil
}

// Record weather any port is occupied
isOccupied := make([]bool, int(m.PortRangeSize))
// If there are already any managed services, store all the used ports in a slice.
portsInUse := []int32{}
for i := range lbs.Items {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about factoring this logic out into a testable function ?

svc := lbs.Items[i]
if len(svc.Spec.Ports) > 0 {
isOccupied[svc.Spec.Ports[0].Port-m.PortRangeStart] = true
}
if svc.Spec.LoadBalancerIP != "" {
existingLBIP = svc.Spec.LoadBalancerIP
portsInUse = append(portsInUse, svc.Spec.Ports[0].Port)
}
}

for i := range isOccupied {
if !isOccupied[i] {
return existingLBIP, m.PortRangeStart + int32(i), nil
// Now try all ports in the configured port range to find a free one.
// While not as effective as other implementations, this allows us to freely change PortRangeStart and PortRangeSize
// retroactively without breaking the implementation.
for port := m.PortRangeStart; port < m.PortRangeStart+m.PortRangeSize; port++ {
if containsElem(portsInUse, port) {
// Port already in use, try the next one
continue
}
// The postgreslet hasn't assigned this port yet, so use it.
return port, nil
}

return existingLBIP, 0, errors.New("no free port")
// If we made it this far, no free port could be found.
return 0, errors.New("no free port in the configured port range found")
}

func containsElem(s []int32, v int32) bool {
for _, elem := range s {
if elem == v {
return true
}
}
return false
}
121 changes: 121 additions & 0 deletions pkg/lbmanager/lbmanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package lbmanager

import (
"context"
"fmt"
"testing"

api "github.com/fi-ts/postgreslet/api/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func TestLBManager_nextFreePort(t *testing.T) {
portRangeStart := int32(0)
portRangeSize := int32(5)

tests := []struct {
name string
lbMgr *LBManager
want int32
wantErr bool
}{
{
name: "no svc in the cluster",
lbMgr: &LBManager{
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts()).Build(),
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
},
want: 0,
wantErr: false,
},
{
name: "one svc already in the cluster",
lbMgr: &LBManager{
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0)).Build(),
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
},
want: 1,
wantErr: false,
},
{
name: "last free port left",
lbMgr: &LBManager{
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0, 1, 2, 3)).Build(),
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
},
want: 4,
wantErr: false,
},
{
name: "no free port",
lbMgr: &LBManager{
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0, 1, 2, 3, 4)).Build(),
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
},
want: 0,
wantErr: true,
},
{
name: "re-use releaased port",
lbMgr: &LBManager{
Client: fake.NewClientBuilder().WithScheme(scheme()).WithLists(svcListWithPorts(0, 2, 3)).Build(),
LBIP: "0.0.0.0",
PortRangeStart: portRangeStart,
PortRangeSize: portRangeSize,
},
want: 1,
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.lbMgr.nextFreePort(context.Background())
if (err != nil) != tt.wantErr {
t.Errorf("LBManager.nextFreePort() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("LBManager.nextFreePort() = %v, want %v", got, tt.want)
}
})
}
}

func scheme() *runtime.Scheme {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)

return scheme
}

// svcListWithPorts generates a `ServiceList` containing `Service`s with ports respectively
func svcListWithPorts(ports ...int32) *corev1.ServiceList {
svcList := &corev1.ServiceList{}
for _, port := range ports {
svcList.Items = append(svcList.Items, *svcWithPort(port))
}
return svcList
}

func svcWithPort(port int32) *corev1.Service {
svc := corev1.Service{}
svc.Name = fmt.Sprintf("svc-with-port-%d", port)
svc.Labels = api.SvcLoadBalancerLabel
svc.Spec.Ports = []corev1.ServicePort{
{
Port: port,
},
}
return &svc
}