Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/cl-controlplane/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (o *Options) Run() error {
return fmt.Errorf("cannot create authz controllers: %w", err)
}

controlManager := control.NewManager(mgr.GetClient())
controlManager := control.NewManager(mgr.GetClient(), o.CRDMode)

xdsManager := xds.NewManager()
xds.RegisterService(
Expand Down
3 changes: 2 additions & 1 deletion config/crds/clusterlink.net_imports.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ spec:
type: object
type: array
targetPort:
description: TargetPort of the imported service.
description: TargetPort of the imported service. This is the internal
(non user-facing) listening port used by the dataplane pods.
type: integer
required:
- port
Expand Down
6 changes: 6 additions & 0 deletions config/operator/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ rules:
- patch
- update
- watch
- apiGroups:
- clusterlink.net
resources:
- imports
verbs:
- update
- apiGroups:
- clusterlink.net
resources:
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/clusterlink.net/v1alpha1/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ImportSpec struct {
// Port of the imported service.
Port uint16 `json:"port"`
// TargetPort of the imported service.
// This is the internal (non user-facing) listening port used by the dataplane pods.
TargetPort uint16 `json:"targetPort,omitempty"`
// Sources to import from.
Sources []ImportSource `json:"sources"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/bootstrap/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
{{ if .crdMode }}
- apiGroups: ["clusterlink.net"]
resources: ["imports"]
verbs: ["update"]
{{ end }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
69 changes: 48 additions & 21 deletions pkg/controlplane/control/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import (
// This includes target port generation for imported services, as well as
// k8s service creation per imported service.
type Manager struct {
client client.Client
ports *portManager
client client.Client
crdMode bool
ports *portManager

logger *logrus.Entry
}
Expand Down Expand Up @@ -103,14 +104,6 @@ func (m *Manager) DeleteLegacyExport(namespace string, exportSpec *api.ExportSpe
func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) error {
m.logger.Infof("Adding import '%s/%s'.", imp.Namespace, imp.Name)

// TODO: port manager should map ports to imports, and be able to detect conflicts
port, err := m.ports.Lease(imp.Spec.TargetPort)
if err != nil {
return fmt.Errorf("cannot generate listening port: %w", err)
}

imp.Spec.TargetPort = port

newService := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: imp.Name,
Expand All @@ -130,26 +123,59 @@ func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) error {
}

var oldService v1.Service
err = m.client.Get(
var create bool
err := m.client.Get(
ctx,
types.NamespacedName{
Name: imp.Name,
Namespace: imp.Namespace,
},
&oldService)
if err != nil {
if errors.IsNotFound(err) {
return m.client.Create(ctx, newService)
if !errors.IsNotFound(err) {
return err
}

return err
create = true
}

if serviceChanged(&oldService, newService) {
return m.client.Update(ctx, newService)
// if service exists, and import specifies a random (0) target port,
// then use existing service target port instead of allocating a new port
if !create && len(oldService.Spec.Ports) == 1 && imp.Spec.TargetPort == 0 {
imp.Spec.TargetPort = uint16(oldService.Spec.Ports[0].TargetPort.IntVal)
}

return nil
newPort := imp.Spec.TargetPort == 0

fullName := imp.Namespace + "/" + imp.Name
port, err := m.ports.Lease(fullName, imp.Spec.TargetPort)
if err != nil {
return fmt.Errorf("cannot generate listening port: %w", err)
}

if newPort {
imp.Spec.TargetPort = port
newService.Spec.Ports[0].TargetPort = intstr.FromInt32(int32(port))

if m.crdMode {
if err := m.client.Update(ctx, imp); err != nil {
m.ports.Release(port)
return err
}
}
}

if create {
err = m.client.Create(ctx, newService)
} else if serviceChanged(&oldService, newService) {
err = m.client.Update(ctx, newService)
}

if err != nil && newPort {
m.ports.Release(port)
}

return err
}

// DeleteImport removes the listening socket of a previously imported service.
Expand Down Expand Up @@ -193,12 +219,13 @@ func serviceChanged(svc1, svc2 *v1.Service) bool {
}

// NewManager returns a new control manager.
func NewManager(cl client.Client) *Manager {
func NewManager(cl client.Client, crdMode bool) *Manager {
logger := logrus.WithField("component", "controlplane.control.manager")

return &Manager{
client: cl,
ports: newPortManager(),
logger: logger,
client: cl,
crdMode: crdMode,
ports: newPortManager(),
logger: logger,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
// portManager leases ports for use by imported services.
type portManager struct {
lock sync.Mutex
leasedPorts map[uint16]struct{}
leasedPorts map[uint16]string

logger *logrus.Entry
}
Expand Down Expand Up @@ -74,8 +74,8 @@ func (m *portManager) getRandomFreePort() uint16 {
return port
}

// Lease marks a port as taken. If port is 0, some random free port is returned.
func (m *portManager) Lease(port uint16) (uint16, error) {
// Lease marks a port as taken by the given name. If port is 0, some random free port is returned.
func (m *portManager) Lease(name string, port uint16) (uint16, error) {
m.logger.Infof("Leasing: %d.", port)

m.lock.Lock()
Expand All @@ -89,13 +89,13 @@ func (m *portManager) Lease(port uint16) (uint16, error) {
port = m.getRandomFreePort()
m.logger.Infof("Generated port: %d.", port)
} else {
if _, ok := m.leasedPorts[port]; ok {
return 0, fmt.Errorf("port %d is already leased", port)
if leaseName, ok := m.leasedPorts[port]; ok && leaseName != name {
return 0, fmt.Errorf("port %d is already leased to '%s'", port, leaseName)
}
}

// mark port is leased
m.leasedPorts[port] = struct{}{}
m.leasedPorts[port] = name

return port, nil
}
Expand All @@ -120,7 +120,7 @@ func newPortManager() *portManager {
).Info("Initialized.")

return &portManager{
leasedPorts: make(map[uint16]struct{}),
leasedPorts: make(map[uint16]string),
logger: logger,
}
}
6 changes: 6 additions & 0 deletions pkg/operator/controller/instance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type InstanceReconciler struct {
// +kubebuilder:rbac:groups="",resources=services;serviceaccounts,verbs=list;get;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=nodes,verbs=list;get;watch
// +kubebuilder:rbac:groups="",resources=pods,verbs=list;get;watch
// +kubebuilder:rbac:groups=clusterlink.net,resources=imports,verbs=update
// +kubebuilder:rbac:groups="apps",resources=deployments,verbs=list;get;watch;create;update;patch;delete
//nolint:lll // Ignore long line warning for Kubebuilder command.
// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings,verbs=list;get;watch;create;update;patch;delete
Expand Down Expand Up @@ -456,6 +457,11 @@ func (r *InstanceReconciler) createAccessControl(ctx context.Context, name, name
Resources: []string{"pods"},
Verbs: []string{"get", "list", "watch"},
},
{
APIGroups: []string{"clusterlink.net"},
Resources: []string{"imports"},
Verbs: []string{"update"},
},
},
}

Expand Down