Skip to content
This repository has been archived by the owner on Oct 26, 2023. It is now read-only.

Commit

Permalink
Make Ray clusters work in Istio environments (#26)
Browse files Browse the repository at this point in the history
* adds gcs server and worker ports to ray api

so we can set these values explicitly instead of using ray's random
defaulting behavior

* adds validation for new api fields

* using recreate strategy in operator helm chart

i've seen two versions of the operator combat each other over how to
apply configurations during an upgrade. setting `enableLeaderElection: true`
addresses this matter indirectly but i think we really want to enforce
that 2 different versions of the operator never run at the same time.
this feels like a good use of the "Recreate" strategy.

* edits ray resources/controller to support istio

adds headless services for head/worker workloads
maps out communication ports in full
updates unit/integration tests

* adds the creation of peer authentication resources

in istio envs with STRICT mTLS, users may want/need to relax that
constraint so that (a) their clusters are fully-functional and (b) they
can interact with their clusters as they see fit.

* adds pkg/resources/istio

oops...missed this

* adds webhook integration tests for mTLS field
  • Loading branch information
sonnysideup committed Apr 22, 2021
1 parent feaf93e commit dd73cb6
Show file tree
Hide file tree
Showing 30 changed files with 732 additions and 101 deletions.
8 changes: 8 additions & 0 deletions api/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ type Autoscaling struct {
ScaleDownStabilizationWindowSeconds *int32 `json:"scaleDownStabilizationWindowSeconds,omitempty"`
}

// IstioConfig defines operator configuration parameters.
type IstioConfig struct {
// MutualTLSMode will be used to create a workload-specific peer
// authentication policy that takes precedence over a global and/or
// namespace-wide policy.
MutualTLSMode string `json:"istioMutualTLSMode,omitempty"`
}

// OCIImageDefinition describes where and when to fetch a container image.
type OCIImageDefinition struct {
// Registry where the container image is hosted.
Expand Down
9 changes: 9 additions & 0 deletions api/v1alpha1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ type RayClusterSpec struct {
// NodeManagerPort is the raylet port for the node manager.
NodeManagerPort int32 `json:"nodeManagerPort,omitempty"`

// GCSServerPort is the port for the global control store.
GCSServerPort int32 `json:"gcsServerPort,omitempty"`

// WorkerPorts specifies the range of ports used by worker processes.
WorkerPorts []int32 `json:"workerPorts,omitempty"`

// ObjectStoreMemoryBytes is initial amount of memory with which to start
// the object store.
ObjectStoreMemoryBytes *int64 `json:"objectStoreMemoryBytes,omitempty"`
Expand All @@ -130,6 +136,9 @@ type RayClusterSpec struct {
// EnvVars added to all every ray pod container.
EnvVars []corev1.EnvVar `json:"envVars,omitempty"`

// IstioConfig parameters for ray clusters.
IstioConfig `json:",inline"`

// Head node configuration parameters.
Head RayClusterHead `json:"head,omitempty"`

Expand Down
49 changes: 46 additions & 3 deletions api/v1alpha1/raycluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1alpha1
import (
"fmt"

securityv1beta1 "istio.io/api/security/v1beta1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -25,7 +26,9 @@ var (
rayDefaultClientServerPort int32 = 10001
rayDefaultObjectManagerPort int32 = 2384
rayDefaultNodeManagerPort int32 = 2385
rayDefaultGCSServerPort int32 = 2386
rayDefaultDashboardPort int32 = 8265
rayDefaultWorkerPorts = []int32{11_000, 11_001, 11_002, 11_003, 11_004}
rayDefaultRedisShardPorts = []int32{6380, 6381}
rayDefaultEnableDashboard = pointer.BoolPtr(true)
rayDefaultEnableNetworkPolicy = pointer.BoolPtr(true)
Expand All @@ -36,7 +39,7 @@ var (

rayDefaultImage = &OCIImageDefinition{
Repository: "rayproject/ray",
Tag: "1.2.0-cpu",
Tag: "1.3.0-cpu",
}
)

Expand Down Expand Up @@ -75,6 +78,14 @@ func (r *RayCluster) Default() {
log.Info("setting default object manager port", "value", rayDefaultObjectManagerPort)
r.Spec.ObjectManagerPort = rayDefaultObjectManagerPort
}
if r.Spec.GCSServerPort == 0 {
log.Info("setting default gcs server port", "value", rayDefaultGCSServerPort)
r.Spec.GCSServerPort = rayDefaultGCSServerPort
}
if r.Spec.WorkerPorts == nil {
log.Info("setting default worker ports", "value", rayDefaultWorkerPorts)
r.Spec.WorkerPorts = rayDefaultWorkerPorts
}
if r.Spec.NodeManagerPort == 0 {
log.Info("setting default node manager port", "value", rayDefaultNodeManagerPort)
r.Spec.NodeManagerPort = rayDefaultNodeManagerPort
Expand Down Expand Up @@ -103,7 +114,6 @@ func (r *RayCluster) Default() {
log.Info("setting default worker replicas", "value", *rayDefaultWorkerReplicas)
r.Spec.Worker.Replicas = rayDefaultWorkerReplicas
}

if r.Spec.Image == nil {
log.Info("setting default image", "value", *rayDefaultImage)
r.Spec.Image = rayDefaultImage
Expand Down Expand Up @@ -137,6 +147,9 @@ func (r *RayCluster) ValidateDelete() error {
func (r *RayCluster) validateRayCluster() error {
var allErrs field.ErrorList

if err := r.validateMutualTLSMode(); err != nil {
allErrs = append(allErrs, err)
}
if err := r.validateWorkerReplicas(); err != nil {
allErrs = append(allErrs, err)
}
Expand All @@ -161,12 +174,32 @@ func (r *RayCluster) validateRayCluster() error {
}

return apierrors.NewInvalid(
schema.GroupKind{Group: "distributed-compute.dominodatalab.com", Kind: "RayCluster"},
schema.GroupKind{Group: GroupVersion.Group, Kind: "RayCluster"},
r.Name,
allErrs,
)
}

func (r *RayCluster) validateMutualTLSMode() *field.Error {
if r.Spec.MutualTLSMode == "" {
return nil
}
if _, ok := securityv1beta1.PeerAuthentication_MutualTLS_Mode_value[r.Spec.MutualTLSMode]; ok {
return nil
}

var validModes []string
for s := range securityv1beta1.PeerAuthentication_MutualTLS_Mode_value {
validModes = append(validModes, s)
}

return field.Invalid(
field.NewPath("spec").Child("istioMutualTLSMode"),
r.Spec.MutualTLSMode,
fmt.Sprintf("mode must be one of the following: %v", validModes),
)
}

func (r *RayCluster) validateImage() field.ErrorList {
var errs field.ErrorList
fldPath := field.NewPath("spec").Child("image")
Expand Down Expand Up @@ -236,6 +269,13 @@ func (r *RayCluster) validatePorts() field.ErrorList {
}
}

for idx, port := range r.Spec.WorkerPorts {
name := fmt.Sprintf("workerPorts[%d]", idx)
if err := r.validatePort(port, field.NewPath("spec").Child(name)); err != nil {
errs = append(errs, err)
}
}

if err := r.validatePort(r.Spec.ClientServerPort, field.NewPath("spec").Child("clientServerPort")); err != nil {
errs = append(errs, err)
}
Expand All @@ -245,6 +285,9 @@ func (r *RayCluster) validatePorts() field.ErrorList {
if err := r.validatePort(r.Spec.NodeManagerPort, field.NewPath("spec").Child("nodeManagerPort")); err != nil {
errs = append(errs, err)
}
if err := r.validatePort(r.Spec.GCSServerPort, field.NewPath("spec").Child("gcsServerPort")); err != nil {
errs = append(errs, err)
}
if err := r.validatePort(r.Spec.DashboardPort, field.NewPath("spec").Child("dashboardPort")); err != nil {
errs = append(errs, err)
}
Expand Down
37 changes: 35 additions & 2 deletions api/v1alpha1/raycluster_webhook_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ var _ = Describe("RayCluster", func() {
BeNumerically("==", 2385),
"node manager port should equal 2385",
)
Expect(rc.Spec.GCSServerPort).To(
BeNumerically("==", 2386),
"gcs server port should equal 2386",
)
Expect(rc.Spec.WorkerPorts).To(
Equal([]int32{11000, 11001, 11002, 11003, 11004}),
"worker ports should equal [11000, 11001, 11002, 11003, 11004]",
)
Expect(rc.Spec.DashboardPort).To(
BeNumerically("==", 8265),
"dashboard port should equal 8265",
Expand All @@ -86,8 +94,8 @@ var _ = Describe("RayCluster", func() {
"worker replicas should point to 1",
)
Expect(rc.Spec.Image).To(
Equal(&OCIImageDefinition{Repository: "rayproject/ray", Tag: "1.2.0-cpu"}),
`image reference should equal "rayproject/ray:1.2.0-cpu"`,
Equal(&OCIImageDefinition{Repository: "rayproject/ray", Tag: "1.3.0-cpu"}),
`image reference should equal "rayproject/ray:1.3.0-cpu"`,
)
})

Expand Down Expand Up @@ -223,6 +231,12 @@ var _ = Describe("RayCluster", func() {
Entry("rejects an invalid node manager port",
func(rc *RayCluster, val int32) { rc.Spec.NodeManagerPort = val },
),
Entry("rejects an invalid gcs server port",
func(rc *RayCluster, val int32) { rc.Spec.GCSServerPort = val },
),
Entry("rejects invalid worker ports",
func(rc *RayCluster, val int32) { rc.Spec.WorkerPorts = append(rc.Spec.WorkerPorts, val) },
),
Entry("rejects an invalid dashboard port",
func(rc *RayCluster, val int32) { rc.Spec.DashboardPort = val },
),
Expand Down Expand Up @@ -318,5 +332,24 @@ var _ = Describe("RayCluster", func() {
Expect(k8sClient.Create(ctx, rc)).ToNot(Succeed())
})
})

DescribeTable("With mutal tls mode set",
func(smode string, expectErr bool) {
rc := rayFixture(testNS.Name)
rc.Spec.MutualTLSMode = smode

if expectErr {
Expect(k8sClient.Create(ctx, rc)).To(HaveOccurred())
} else {
Expect(k8sClient.Create(ctx, rc)).NotTo(HaveOccurred())
}
},
Entry("empty string is valid", "", false),
Entry("UNSET is valid", "UNSET", false),
Entry("DISABLE is valid", "DISABLE", false),
Entry("PERMISSIVE is valid", "PERMISSIVE", false),
Entry("STRICT is valid", "STRICT", false),
Entry("GARBAGE is not valid", "GARBAGE", true),
)
})
})
21 changes: 21 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions cmd/crdapply.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ Apply Rules:
- When a definition is is missing, it will be created
- If a definition is already present, then it will be updated
- Updating definitions that have not changed results in a no-op`,
RunE: processIstioFlag(func(enabled bool) error {
return crd.Apply(context.Background(), enabled)
}),
RunE: func(cmd *cobra.Command, args []string) error {
return crd.Apply(context.Background(), istioEnabled)
},
}

func init() {
addIstioFlag(crdApplyCmd)
rootCmd.AddCommand(crdApplyCmd)
}
7 changes: 3 additions & 4 deletions cmd/crddelete.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ var crdDeleteCmd = &cobra.Command{
Any running distributed compute resources will be decommissioned when this
operation runs (i.e. your deployments will be deleted immediately). This will
only attempt to remove definitions that are already present in Kubernetes.`,
RunE: processIstioFlag(func(enabled bool) error {
return crd.Delete(context.Background(), enabled)
}),
RunE: func(cmd *cobra.Command, args []string) error {
return crd.Delete(context.Background(), istioEnabled)
},
}

func init() {
addIstioFlag(crdDeleteCmd)
rootCmd.AddCommand(crdDeleteCmd)
}
18 changes: 3 additions & 15 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/spf13/cobra"
)

var istioEnabled bool

var rootCmd = &cobra.Command{
Use: "distributed-compute-operator",
Short: "Kubernetes operator that manages parallel computing clusters.",
Expand All @@ -21,22 +23,8 @@ func Execute() {
}
}

func addIstioFlag(cmd *cobra.Command) {
cmd.Flags().BoolP("istio-enabled", "i", false, "Enable support for Istio sidecar container")
}

func processIstioFlag(op func(enabled bool) error) func(cmd *cobra.Command, args []string) error {
return func(cmd *cobra.Command, args []string) error {
istioEnabled, err := cmd.Flags().GetBool("istio-enabled")
if err != nil {
return err
}

return op(istioEnabled)
}
}

func init() {
// NOTE: required until https://github.com/spf13/cobra/issues/587
rootCmd.SetHelpCommand(&cobra.Command{Hidden: true})
rootCmd.PersistentFlags().BoolVar(&istioEnabled, "istio-enabled", false, "Enable support for Istio sidecar container")
}
1 change: 1 addition & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var startCmd = &cobra.Command{
HealthProbeAddr: probeAddr,
WebhookServerPort: webhookPort,
EnableLeaderElection: enableLeaderElection,
IstioEnabled: istioEnabled,
ZapOptions: zapOpts,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ spec:
- name
type: object
type: array
gcsServerPort:
description: GCSServerPort is the port for the global control store.
format: int32
type: integer
head:
description: Head node configuration parameters.
properties:
Expand Down Expand Up @@ -3652,6 +3656,11 @@ spec:
type: string
type: object
type: array
istioMutualTLSMode:
description: MutualTLSMode will be used to create a workload-specific
peer authentication policy that takes precedence over a global and/or
namespace-wide policy.
type: string
networkPolicy:
description: NetworkPolicy parameters that grant intra-cluster and external
network access to cluster nodes.
Expand Down Expand Up @@ -7277,6 +7286,13 @@ spec:
type: object
type: array
type: object
workerPorts:
description: WorkerPorts specifies the range of ports used by worker
processes.
items:
format: int32
type: integer
type: array
type: object
status:
description: RayClusterStatus defines the observed state of a RayCluster
Expand Down
Loading

0 comments on commit dd73cb6

Please sign in to comment.