Skip to content

Commit

Permalink
add support for index-based label and minimal service (#181)
Browse files Browse the repository at this point in the history
* add support for index-based label and minimal service

The minimal service setup only creates the headless service for the main
broker pod, decreasing the networking overhead. In addition, I have needed
to have a label to identify the broker several times, so it makes sense to
add that.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Jun 5, 2023
1 parent b873771 commit c45dd27
Show file tree
Hide file tree
Showing 15 changed files with 229 additions and 4 deletions.
1 change: 1 addition & 0 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ jobs:
fail-fast: false
matrix:
test: [["hello-world", "ghcr.io/flux-framework/flux-restful-api:latest", 60],
["minimal-service", "ghcr.io/flux-framework/flux-restful-api:latest", 60],
["post", "ghcr.io/flux-framework/flux-restful-api:latest", 60],
["batch", "ghcr.io/flux-framework/flux-restful-api:latest", 60],
["singularity", "ghcr.io/rse-ops/singularity:tag-mamba", 60],
Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha1/minicluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ type FluxSpec struct {
// +optional
OptionFlags string `json:"optionFlags"`

// Only expose the broker service (to reduce load on DNS)
// +optional
MinimalService bool `json:"minimalService"`

// Log level to use for flux logging (only in non TestMode)
// +kubebuilder:default=6
// +default=6
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@
"format": "int32",
"default": 6
},
"minimalService": {
"description": "Only expose the broker service (to reduce load on DNS)",
"type": "boolean",
"default": false
},
"optionFlags": {
"description": "Flux option flags, usually provided with -o optional - if needed, default option flags for the server These can also be set in the user interface to override here. This is only valid for a FluxRunner \"runFlux\" true",
"type": "string",
Expand Down
8 changes: 8 additions & 0 deletions api/v1alpha1/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions chart/templates/minicluster-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ spec:
description: Log level to use for flux logging (only in non TestMode)
format: int32
type: integer
minimalService:
description: Only expose the broker service (to reduce load on DNS)
type: boolean
optionFlags:
description: Flux option flags, usually provided with -o optional
- if needed, default option flags for the server These can also
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/flux-framework.org_miniclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ spec:
description: Log level to use for flux logging (only in non TestMode)
format: int32
type: integer
minimalService:
description: Only expose the broker service (to reduce load on
DNS)
type: boolean
optionFlags:
description: Flux option flags, usually provided with -o optional
- if needed, default option flags for the server These can also
Expand Down
70 changes: 70 additions & 0 deletions controllers/flux/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
Copyright 2022-2023 Lawrence Livermore National Security, LLC
(c.f. AUTHORS, NOTICE.LLNS, COPYING)
This is part of the Flux resource manager framework.
For details, see https://github.com/flux-framework.
SPDX-License-Identifier: Apache-2.0
*/

package controllers

import (
"context"
"encoding/json"
api "flux-framework/flux-operator/api/v1alpha1"
"fmt"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
)

type patchPayload struct {
Operation string `json:"op"`
Path string `json:"path"`
Value string `json:"value"`
}

func (r *MiniClusterReconciler) addBrokerLabel(
ctx context.Context,
cluster *api.MiniCluster,
) (ctrl.Result, error) {

// creates the clientset
clientset, err := kubernetes.NewForConfig(r.RESTConfig)
if err != nil {
return ctrl.Result{}, err
}

// List pods, and only get ones with undefined labels
pods, err := clientset.CoreV1().Pods(cluster.Namespace).List(ctx, metav1.ListOptions{LabelSelector: "!job-index"})
if err != nil {
return ctrl.Result{}, err
}

for _, pod := range pods.Items {

// Add labels to all pods to indicate job-index
prefix := fmt.Sprintf("%s-", cluster.Name)
podName := strings.Replace(pod.GetName(), prefix, "", 1)
podIndex := strings.SplitN(podName, "-", 2)[0]

payload := []patchPayload{{
Operation: "add",
Path: "/metadata/labels/job-index",
Value: podIndex,
}}
payloadBytes, _ := json.Marshal(payload)

_, err = clientset.CoreV1().Pods(pod.GetNamespace()).Patch(ctx, pod.GetName(), types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
if err != nil {
return ctrl.Result{}, err
}

}
return ctrl.Result{}, nil
}
11 changes: 10 additions & 1 deletion controllers/flux/minicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ func (r *MiniClusterReconciler) ensureMiniCluster(
}
}

// Create headless service for the MiniCluster
// Create headless service for the MiniCluster OR single service for the broker
selector := map[string]string{"job-name": cluster.Name}
if cluster.Spec.Flux.MinimalService {
selector = map[string]string{"job-index": "0"}
}
result, err = r.exposeServices(ctx, cluster, restfulServiceName, selector)
if err != nil {
return result, err
Expand Down Expand Up @@ -137,6 +140,12 @@ func (r *MiniClusterReconciler) ensureMiniCluster(
}
}

// Add the single label for the broker pod
result, err = r.addBrokerLabel(ctx, cluster)
if err != nil {
return result, err
}

// If we get here, update the status to be ready
status := jobctrl.GetCondition(cluster)
if status != jobctrl.ConditionJobReady {
Expand Down
37 changes: 35 additions & 2 deletions controllers/flux/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,17 @@ func (r *MiniClusterReconciler) exposeServices(
selector map[string]string,
) (ctrl.Result, error) {

// This service is for the restful API
// Create either the headless service or broker service
existing := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: cluster.Namespace}, existing)
if err != nil {
if errors.IsNotFound(err) {
_, err = r.createHeadlessService(ctx, cluster, serviceName, selector)

if cluster.Spec.Flux.MinimalService {
_, err = r.createBrokerService(ctx, cluster, serviceName, selector)
} else {
_, err = r.createHeadlessService(ctx, cluster, serviceName, selector)
}

}
return ctrl.Result{}, err
Expand Down Expand Up @@ -74,6 +79,34 @@ func (r *MiniClusterReconciler) createHeadlessService(
return service, err
}

// createBrokerService creates a service for the lead broker
func (r *MiniClusterReconciler) createBrokerService(
ctx context.Context,
cluster *api.MiniCluster,
serviceName string,
selector map[string]string,
) (*corev1.Service, error) {

r.log.Info("Creating minimal broker service with: ", serviceName, cluster.Namespace)
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: cluster.Namespace},
Spec: corev1.ServiceSpec{
// Target port should be set to same value as port, by default
Ports: []corev1.ServicePort{{
Port: int32(8050),
Protocol: "TCP",
}},
Selector: selector,
},
}
ctrl.SetControllerReference(cluster, service, r.Scheme)
err := r.New(ctx, service)
if err != nil {
r.log.Error(err, "🔴 Create minimal broker service", "Service", service.Name)
}
return service, err
}

// exposeService creates a port-specific service for the MiniCluster
func (r *MiniClusterReconciler) exposeService(
ctx context.Context,
Expand Down
30 changes: 30 additions & 0 deletions docs/getting_started/custom-resource-definition.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ To add custom labels for your job, add a set of key value pairs (strings) to a "
job-attribute-b: dinosaur-b
```

Note that by default, each pod will be labeled with a label `job-index` that corresponds to the
particular pod index. E.g., the lead broker would have `job-index=0` and this could be used as a service
selector.

### deadline

Expand Down Expand Up @@ -355,6 +358,33 @@ flux:

In the above, we would add `--wrap=strace,-e,network,-tt` to flux start commands.

#### minimalService

By default, the Flux MiniCluster will be created with a headless service across the cluster,
meaning that all pods can ping one another via a fully qualified hostname. As an example,
the 0 index (the lead broker) of an indexed job will be available at:

```
flux-sample-0.flux-service.flux-operator.svc.cluster.local: Nam
```

Where "flux-sample" is the name of the job. Index 1 would be at:

```
flux-sample-1.flux-service.flux-operator.svc.cluster.local: Nam
```

However, it's the case that only the lead broker (index 0) needs to be reachable
by the others. If you set `minimalService` to true, this will be honored, so
the networking setup will be more minimal.

```yaml
flux:
minimalService: true
```

The drawback is that you cannot ping the other nodes by hostname.

#### connectTimeout

For Flux versions 0.50.0 and later, you can customize the zeromq timeout. This is done
Expand Down
4 changes: 4 additions & 0 deletions examples/dist/flux-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ spec:
description: Log level to use for flux logging (only in non TestMode)
format: int32
type: integer
minimalService:
description: Only expose the broker service (to reduce load on
DNS)
type: boolean
optionFlags:
description: Flux option flags, usually provided with -o optional
- if needed, default option flags for the server These can also
Expand Down
21 changes: 21 additions & 0 deletions examples/tests/minimal-service/minicluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apiVersion: flux-framework.org/v1alpha1
kind: MiniCluster
metadata:
name: flux-sample
namespace: flux-operator
spec:
# Number of pods to create for MiniCluster
size: 4
tasks: 4

logging:
quiet: true

# Minimal service for Flux means only the lead broker gets an address
flux:
minimalService: true

# This is a list because a pod can support multiple containers
containers:
- image: ghcr.io/flux-framework/flux-restful-api:latest
command: echo hello-world
4 changes: 4 additions & 0 deletions examples/tests/minimal-service/test.out.correct
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
hello-world
hello-world
hello-world
hello-world
1 change: 1 addition & 0 deletions sdk/python/v1alpha1/docs/FluxSpec.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Name | Type | Description | Notes
**connect_timeout** | **str** | Single user executable to provide to flux start | [optional] [default to '5s']
**install_root** | **str** | Install root location | [optional] [default to '/usr']
**log_level** | **int** | Log level to use for flux logging (only in non TestMode) | [optional] [default to 6]
**minimal_service** | **bool** | Only expose the broker service (to reduce load on DNS) | [optional] [default to False]
**option_flags** | **str** | Flux option flags, usually provided with -o optional - if needed, default option flags for the server These can also be set in the user interface to override here. This is only valid for a FluxRunner \&quot;runFlux\&quot; true | [optional] [default to '']
**wrap** | **str** | Commands for flux start --wrap | [optional]

Expand Down
30 changes: 29 additions & 1 deletion sdk/python/v1alpha1/fluxoperator/models/flux_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class FluxSpec(object):
'connect_timeout': 'str',
'install_root': 'str',
'log_level': 'int',
'minimal_service': 'bool',
'option_flags': 'str',
'wrap': 'str'
}
Expand All @@ -44,11 +45,12 @@ class FluxSpec(object):
'connect_timeout': 'connectTimeout',
'install_root': 'installRoot',
'log_level': 'logLevel',
'minimal_service': 'minimalService',
'option_flags': 'optionFlags',
'wrap': 'wrap'
}

def __init__(self, connect_timeout='5s', install_root='/usr', log_level=6, option_flags='', wrap=None, local_vars_configuration=None): # noqa: E501
def __init__(self, connect_timeout='5s', install_root='/usr', log_level=6, minimal_service=False, option_flags='', wrap=None, local_vars_configuration=None): # noqa: E501
"""FluxSpec - a model defined in OpenAPI""" # noqa: E501
if local_vars_configuration is None:
local_vars_configuration = Configuration.get_default_copy()
Expand All @@ -57,6 +59,7 @@ def __init__(self, connect_timeout='5s', install_root='/usr', log_level=6, optio
self._connect_timeout = None
self._install_root = None
self._log_level = None
self._minimal_service = None
self._option_flags = None
self._wrap = None
self.discriminator = None
Expand All @@ -67,6 +70,8 @@ def __init__(self, connect_timeout='5s', install_root='/usr', log_level=6, optio
self.install_root = install_root
if log_level is not None:
self.log_level = log_level
if minimal_service is not None:
self.minimal_service = minimal_service
if option_flags is not None:
self.option_flags = option_flags
if wrap is not None:
Expand Down Expand Up @@ -141,6 +146,29 @@ def log_level(self, log_level):

self._log_level = log_level

@property
def minimal_service(self):
"""Gets the minimal_service of this FluxSpec. # noqa: E501
Only expose the broker service (to reduce load on DNS) # noqa: E501
:return: The minimal_service of this FluxSpec. # noqa: E501
:rtype: bool
"""
return self._minimal_service

@minimal_service.setter
def minimal_service(self, minimal_service):
"""Sets the minimal_service of this FluxSpec.
Only expose the broker service (to reduce load on DNS) # noqa: E501
:param minimal_service: The minimal_service of this FluxSpec. # noqa: E501
:type minimal_service: bool
"""

self._minimal_service = minimal_service

@property
def option_flags(self):
"""Gets the option_flags of this FluxSpec. # noqa: E501
Expand Down

0 comments on commit c45dd27

Please sign in to comment.