Skip to content

Commit

Permalink
[#627] fix(operator): support specifying custom ports (#629)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Set coordinator/shuffler server's container port to the fields of RSS spec

### Why are the changes needed?
Fix #627.

### Does this PR introduce _any_ user-facing change?
For RSS cluster admin, they can set custom ports for shuffle servers and coordinators.

### How was this patch tested?
Manually verified.
  • Loading branch information
wangao1236 committed Feb 20, 2023
1 parent ffa26b4 commit b31a5c9
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
package constants

const (
// ContainerShuffleServerRPCPort indicates rpc port used in shuffle server containers.
ContainerShuffleServerRPCPort int32 = 19997
// ContainerShuffleServerHTTPPort indicates http port used in shuffle server containers.
ContainerShuffleServerHTTPPort int32 = 19996
// ContainerCoordinatorRPCPort indicates rpc port used in coordinator containers.
ContainerCoordinatorRPCPort int32 = 19997
// ContainerCoordinatorHTTPPort indicates http port used in coordinator containers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func GenerateAddresses(rss *unifflev1alpha1.RemoteShuffleService) string {
for i := 0; i < int(*rss.Spec.Coordinator.Count); i++ {
name := GenerateNameByIndex(rss, i)
serviceName := appendHeadless(name)
current := fmt.Sprintf("%v:%v", serviceName, controllerconstants.ContainerShuffleServerRPCPort)
current := fmt.Sprintf("%v:%v", serviceName, *rss.Spec.Coordinator.RPCPort)
names = append(names, current)
}
return strings.Join(names, ",")
Expand Down Expand Up @@ -312,11 +312,11 @@ func generateMainContainer(rss *unifflev1alpha1.RemoteShuffleService) *corev1.Co
func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService) []corev1.ContainerPort {
ports := []corev1.ContainerPort{
{
ContainerPort: controllerconstants.ContainerCoordinatorRPCPort,
ContainerPort: *rss.Spec.Coordinator.RPCPort,
Protocol: corev1.ProtocolTCP,
},
{
ContainerPort: controllerconstants.ContainerCoordinatorHTTPPort,
ContainerPort: *rss.Spec.Coordinator.HTTPPort,
Protocol: corev1.ProtocolTCP,
},
}
Expand All @@ -329,11 +329,11 @@ func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService) []corev
env := []corev1.EnvVar{
{
Name: controllerconstants.CoordinatorRPCPortEnv,
Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorRPCPort), 10),
Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.RPCPort), 10),
},
{
Name: controllerconstants.CoordinatorHTTPPortEnv,
Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorHTTPPort), 10),
Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.HTTPPort), 10),
},
{
Name: controllerconstants.XmxSizeEnv,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ import (
)

const (
testRuntimeClassName = "test-runtime"
testRuntimeClassName = "test-runtime"
testRPCPort int32 = 19990
testHTTPPort int32 = 19991
)

// IsValidDeploy checks generated deployment, returns whether it is valid and error message.
Expand Down Expand Up @@ -86,6 +88,57 @@ func buildRssWithCustomENVs() *uniffleapi.RemoteShuffleService {
return rss
}

func buildRssWithCustomRPCPort() *uniffleapi.RemoteShuffleService {
rss := utils.BuildRSSWithDefaultValue()
rss.Spec.Coordinator.RPCPort = pointer.Int32(testRPCPort)
return rss
}

func buildRssWithCustomHTTPPort() *uniffleapi.RemoteShuffleService {
rss := utils.BuildRSSWithDefaultValue()
rss.Spec.Coordinator.HTTPPort = pointer.Int32(testHTTPPort)
return rss
}

func buildCommonExpectedENVs(rss *uniffleapi.RemoteShuffleService) []corev1.EnvVar {
return []corev1.EnvVar{
{
Name: controllerconstants.CoordinatorRPCPortEnv,
Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.RPCPort), 10),
},
{
Name: controllerconstants.CoordinatorHTTPPortEnv,
Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.HTTPPort), 10),
},
{
Name: controllerconstants.XmxSizeEnv,
Value: rss.Spec.Coordinator.XmxSize,
},
{
Name: controllerconstants.ServiceNameEnv,
Value: controllerconstants.CoordinatorServiceName,
},
{
Name: controllerconstants.NodeNameEnv,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "spec.nodeName",
},
},
},
{
Name: controllerconstants.RssIPEnv,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "status.podIP",
},
},
},
}
}

func TestGenerateDeploy(t *testing.T) {
for _, tt := range []struct {
name string
Expand Down Expand Up @@ -145,42 +198,7 @@ func TestGenerateDeploy(t *testing.T) {
rss: buildRssWithCustomENVs(),
IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) (
valid bool, err error) {
expectENVs := []corev1.EnvVar{
{
Name: controllerconstants.CoordinatorRPCPortEnv,
Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorRPCPort), 10),
},
{
Name: controllerconstants.CoordinatorHTTPPortEnv,
Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorHTTPPort), 10),
},
{
Name: controllerconstants.XmxSizeEnv,
Value: rss.Spec.Coordinator.XmxSize,
},
{
Name: controllerconstants.ServiceNameEnv,
Value: controllerconstants.CoordinatorServiceName,
},
{
Name: controllerconstants.NodeNameEnv,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "spec.nodeName",
},
},
},
{
Name: controllerconstants.RssIPEnv,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "status.podIP",
},
},
},
}
expectENVs := buildCommonExpectedENVs(rss)
defaultEnvNames := sets.NewString()
for i := range expectENVs {
defaultEnvNames.Insert(expectENVs[i].Name)
Expand All @@ -202,6 +220,94 @@ func TestGenerateDeploy(t *testing.T) {
return
},
},
{
name: "set custom rpc port used by coordinator",
rss: buildRssWithCustomRPCPort(),
IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) (
valid bool, err error) {
// check envs
expectENVs := buildCommonExpectedENVs(rss)
for i := range expectENVs {
if expectENVs[i].Name == controllerconstants.CoordinatorRPCPortEnv {
expectENVs[i].Value = strconv.FormatInt(int64(testRPCPort), 10)
}
}
actualENVs := deploy.Spec.Template.Spec.Containers[0].Env
valid = reflect.DeepEqual(expectENVs, actualENVs)
if !valid {
actualEnvBody, _ := json.Marshal(actualENVs)
expectEnvBody, _ := json.Marshal(expectENVs)
err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v",
string(actualEnvBody), string(expectEnvBody))
return
}

// check ports
expectPorts := []corev1.ContainerPort{
{
ContainerPort: testRPCPort,
Protocol: corev1.ProtocolTCP,
},
{
ContainerPort: *rss.Spec.Coordinator.HTTPPort,
Protocol: corev1.ProtocolTCP,
},
}
actualPorts := deploy.Spec.Template.Spec.Containers[0].Ports
valid = reflect.DeepEqual(expectPorts, actualPorts)
if !valid {
actualPortsBody, _ := json.Marshal(actualPorts)
expectPortsBody, _ := json.Marshal(expectPorts)
err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v",
string(actualPortsBody), string(expectPortsBody))
}
return
},
},
{
name: "set custom http port used by coordinator",
rss: buildRssWithCustomHTTPPort(),
IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) (
valid bool, err error) {
// check envs
expectENVs := buildCommonExpectedENVs(rss)
for i := range expectENVs {
if expectENVs[i].Name == controllerconstants.CoordinatorHTTPPortEnv {
expectENVs[i].Value = strconv.FormatInt(int64(testHTTPPort), 10)
}
}
actualENVs := deploy.Spec.Template.Spec.Containers[0].Env
valid = reflect.DeepEqual(expectENVs, actualENVs)
if !valid {
actualEnvBody, _ := json.Marshal(actualENVs)
expectEnvBody, _ := json.Marshal(expectENVs)
err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v",
string(actualEnvBody), string(expectEnvBody))
return
}

// check ports
expectPorts := []corev1.ContainerPort{
{
ContainerPort: *rss.Spec.Coordinator.RPCPort,
Protocol: corev1.ProtocolTCP,
},
{
ContainerPort: testHTTPPort,
Protocol: corev1.ProtocolTCP,
},
}
actualPorts := deploy.Spec.Template.Spec.Containers[0].Ports
valid = reflect.DeepEqual(expectPorts, actualPorts)
if !valid {
actualPortsBody, _ := json.Marshal(actualPorts)
expectPortsBody, _ := json.Marshal(expectPorts)
err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v",
string(actualPortsBody), string(expectPortsBody))
}
return
},
},
} {
t.Run(tt.name, func(tc *testing.T) {
deploy := GenerateDeploy(tt.rss, 0)
Expand Down Expand Up @@ -269,4 +375,8 @@ func TestGenerateAddresses(t *testing.T) {
rss := buildRssWithLabels()
quorum := GenerateAddresses(rss)
assertion.Contains(quorum, "headless")

rss = buildRssWithCustomRPCPort()
quorum = GenerateAddresses(rss)
assertion.Contains(quorum, strconv.FormatInt(int64(testRPCPort), 10))
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func GenerateSts(rss *unifflev1alpha1.RemoteShuffleService) *appsv1.StatefulSet
constants.AnnotationRssName: rss.Name,
constants.AnnotationRssUID: string(rss.UID),
constants.AnnotationMetricsServerPort: fmt.Sprintf("%v",
controllerconstants.ContainerShuffleServerHTTPPort),
*rss.Spec.ShuffleServer.HTTPPort),
constants.AnnotationShuffleServerPort: fmt.Sprintf("%v",
controllerconstants.ContainerShuffleServerRPCPort),
*rss.Spec.ShuffleServer.RPCPort),
},
},
Spec: podSpec,
Expand Down Expand Up @@ -217,11 +217,11 @@ func generateMainContainer(rss *unifflev1alpha1.RemoteShuffleService) *corev1.Co
func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService) []corev1.ContainerPort {
ports := []corev1.ContainerPort{
{
ContainerPort: controllerconstants.ContainerShuffleServerRPCPort,
ContainerPort: *rss.Spec.ShuffleServer.RPCPort,
Protocol: corev1.ProtocolTCP,
},
{
ContainerPort: controllerconstants.ContainerShuffleServerHTTPPort,
ContainerPort: *rss.Spec.ShuffleServer.HTTPPort,
Protocol: corev1.ProtocolTCP,
},
}
Expand All @@ -234,11 +234,11 @@ func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService) []corev
env := []corev1.EnvVar{
{
Name: controllerconstants.ShuffleServerRPCPortEnv,
Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerRPCPort), 10),
Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.RPCPort), 10),
},
{
Name: controllerconstants.ShuffleServerHTTPPortEnv,
Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerHTTPPort), 10),
Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.HTTPPort), 10),
},
{
Name: controllerconstants.RSSCoordinatorQuorumEnv,
Expand Down
Loading

0 comments on commit b31a5c9

Please sign in to comment.