Skip to content

Commit

Permalink
[ISSUE-48][FEATURE][FOLLOW UP] Add controller component (#214)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
For issue #48 
I add some codes about controller module, and I'll add more unit tests in next PR.

### Why are the changes needed?
Support K8S


### Does this PR introduce _any_ user-facing change?
Yes, we will add the doc later


### How was this patch tested?
Manual test
  • Loading branch information
wangao1236 committed Sep 20, 2022
1 parent e5c3412 commit e6b4260
Show file tree
Hide file tree
Showing 20 changed files with 3,157 additions and 27 deletions.
56 changes: 56 additions & 0 deletions deploy/kubernetes/operator/cmd/controller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"flag"

"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/controller"
)

func main() {
klog.InitFlags(nil)
cfg := &config.Config{}
cfg.AddFlags()
flag.Parse()

cfg.Complete()
klog.Infof("run config: %+v", cfg)

// create a manager for leader election.
mgr, err := ctrl.NewManager(cfg.RESTConfig, ctrl.Options{
LeaderElection: true,
LeaderElectionID: cfg.LeaderElectionID(),
})
if err != nil {
klog.Fatal(err)
}
// create a rss controller.
rc := controller.NewRSSController(cfg)
if err = mgr.Add(rc); err != nil {
klog.Fatal(err)
}
// start the rss controller.
if err = mgr.Start(cfg.RunCtx); err != nil {
klog.Fatal(err)
}
}
14 changes: 8 additions & 6 deletions deploy/kubernetes/operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ module github.com/apache/incubator-uniffle/deploy/kubernetes/operator
go 1.16

require (
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/ginkgo/v2 v2.1.4
github.com/onsi/gomega v1.19.0
github.com/onsi/gomega v1.20.0
github.com/parnurzeal/gorequest v0.2.16
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gomodules.xyz/jsonpatch/v2 v2.2.0
k8s.io/api v0.22.1
k8s.io/apimachinery v0.22.1
k8s.io/client-go v0.22.1
k8s.io/code-generator v0.22.1
k8s.io/api v0.22.2
k8s.io/apimachinery v0.22.2
k8s.io/client-go v0.22.2
k8s.io/code-generator v0.22.2
k8s.io/klog/v2 v2.9.0
k8s.io/utils v0.0.0-20210802155522-efc7438f0176
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
moul.io/http2curl v1.0.0 // indirect
sigs.k8s.io/controller-runtime v0.10.0
)
56 changes: 36 additions & 20 deletions deploy/kubernetes/operator/go.sum

Large diffs are not rendered by default.

51 changes: 51 additions & 0 deletions deploy/kubernetes/operator/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package config

import (
"flag"

"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/constants"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
)

const (
flagWorkers = "workers"
)

// Config contains all configurations.
type Config struct {
Workers int
utils.GenericConfig
}

// LeaderElectionID returns leader election ID.
func (c *Config) LeaderElectionID() string {
return "rss-controller-" + constants.LeaderIDSuffix
}

// AddFlags adds all configurations to the global flags.
func (c *Config) AddFlags() {
flag.IntVar(&c.Workers, flagWorkers, 1, "Concurrency of the rss controller.")
c.GenericConfig.AddFlags()
}

// Complete is called before rss-controller runs.
func (c *Config) Complete() {
c.GenericConfig.Complete()
}
83 changes: 83 additions & 0 deletions deploy/kubernetes/operator/pkg/controller/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package constants

const (
// ContainerShuffleServerRPCPort indicates rpc port used in shuffle server containers.
ContainerShuffleServerRPCPort int32 = 19997
// ContainerShuffleServerHTTPPort indicates http port used in shuffle server containers.
ContainerShuffleServerHTTPPort int32 = 19996
// ContainerCoordinatorRPCPort indicates rpc port used in coordinator containers.
ContainerCoordinatorRPCPort int32 = 19997
// ContainerCoordinatorHTTPPort indicates http port used in coordinator containers.
ContainerCoordinatorHTTPPort int32 = 19996

// ShuffleServerRPCPortEnv indicates environment name of rpc port used by shuffle servers.
ShuffleServerRPCPortEnv = "SERVER_RPC_PORT"
// ShuffleServerHTTPPortEnv indicates environment name of http port used by shuffle servers.
ShuffleServerHTTPPortEnv = "SERVER_HTTP_PORT"
// CoordinatorRPCPortEnv indicates environment name of rpc port used by coordinators.
CoordinatorRPCPortEnv = "COORDINATOR_RPC_PORT"
// CoordinatorHTTPPortEnv indicates environment name of http port used by coordinators.
CoordinatorHTTPPortEnv = "COORDINATOR_HTTP_PORT"
// RSSCoordinatorQuorumEnv indicates environment name of rss coordinator quorum used by shuffle servers.
RSSCoordinatorQuorumEnv = "RSS_COORDINATOR_QUORUM"
// XmxSizeEnv indicates environment name of xmx size used by coordinators or shuffle servers.
XmxSizeEnv = "XMX_SIZE"
// ServiceNameEnv indicates environment name of service name used by coordinators or shuffle servers.
ServiceNameEnv = "SERVICE_NAME"
// NodeNameEnv indicates environment name of physical node name used by coordinators or shuffle servers.
NodeNameEnv = "NODE_NAME"
// RssIPEnv indicates environment name of shuffle servers' ip addresses.
RssIPEnv = "RSS_IP"

// CoordinatorServiceName defines environment variable value of "SERVICE_NAME" used by coordinators.
CoordinatorServiceName = "coordinator"
// ShuffleServerServiceName defines environment variable value of "SERVICE_NAME" used by shuffle servers.
ShuffleServerServiceName = "server"

// ExcludeNodesFile indicates volume mounting name of exclude nodes file
ExcludeNodesFile = "exclude-nodes-file"

// UpdateStatusError means reason of updating status of rss error
UpdateStatusError = "UpdateStatusError"

// OwnerLabel is the label of configMap's owner.
OwnerLabel = "uniffle.apache.org/owner-label"

// ConfigurationVolumeName is the name of configMap volume records configuration of coordinators or shuffle servers.
ConfigurationVolumeName = "configuration"
)

// PropertyKey defines property key in configuration of coordinators or shuffle servers.
type PropertyKey string

const (
// RPCServerPort represent rss port property in configuration of coordinators or shuffle servers.
RPCServerPort PropertyKey = "rss.rpc.server.port"
// JettyHTTPPort represent http port property in configuration of coordinators or shuffle servers.
JettyHTTPPort PropertyKey = "rss.jetty.http.port"

// CoordinatorQuorum represent coordinator quorum property in configuration of shuffle servers.
CoordinatorQuorum PropertyKey = "rss.coordinator.quorum"
// StorageBasePath represent storage base path property in configuration of shuffle servers.
StorageBasePath PropertyKey = "rss.storage.basePath"

// CoordinatorExcludeNodesPath represent exclude nodes path property in configuration of coordinators.
CoordinatorExcludeNodesPath PropertyKey = "rss.coordinator.exclude.nodes.file.path"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package controller

import (
"context"
"reflect"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubefake "k8s.io/client-go/kubernetes/fake"

unifflev1alpha1 "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/controller/config"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/generated/clientset/versioned/fake"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
)

func buildEmptyPhaseRssObj() *unifflev1alpha1.RemoteShuffleService {
return &unifflev1alpha1.RemoteShuffleService{
ObjectMeta: metav1.ObjectMeta{
Name: testRssName,
Namespace: testNamespace,
ResourceVersion: "test",
},
Spec: unifflev1alpha1.RemoteShuffleServiceSpec{
Coordinator: &unifflev1alpha1.CoordinatorConfig{
ExcludeNodesFilePath: "/exclude_nodes",
},
},
Status: unifflev1alpha1.RemoteShuffleServiceStatus{},
}
}

func TestProcessEmptyPhaseRss(t *testing.T) {
rss := buildEmptyPhaseRssObj()

rssClient := fake.NewSimpleClientset(rss)
kubeClient := kubefake.NewSimpleClientset()

rc := newRSSController(&config.Config{
GenericConfig: utils.GenericConfig{
KubeClient: kubeClient,
RSSClient: rssClient,
},
})

for _, tt := range []struct {
name string
expectedRssStatus unifflev1alpha1.RemoteShuffleServiceStatus
expectedNeedRetry bool
expectedError error
}{
{
name: "process rss object which has just been created, and whose status phase is empty",
expectedRssStatus: unifflev1alpha1.RemoteShuffleServiceStatus{
Phase: unifflev1alpha1.RSSPending,
},
expectedNeedRetry: false,
},
} {
needRetry, err := rc.processNormal(rss)
if err != nil {
t.Errorf("process rss object failed: %v", err)
return
}
if needRetry != tt.expectedNeedRetry {
t.Errorf("unexpected result indicates whether to retrys: %v, expected: %v",
needRetry, tt.expectedNeedRetry)
return
}
updatedRss, getErr := rssClient.UniffleV1alpha1().RemoteShuffleServices(rss.Namespace).
Get(context.TODO(), rss.Name, metav1.GetOptions{})
if getErr != nil {
t.Errorf("get updated rss object failed: %v", err)
return
}
if !reflect.DeepEqual(updatedRss.Status, tt.expectedRssStatus) {
t.Errorf("unexpected status of updated rss object: %+v, expected: %+v",
updatedRss.Status, tt.expectedRssStatus)
return
}
}
}
Loading

0 comments on commit e6b4260

Please sign in to comment.