Skip to content
This repository was archived by the owner on Jul 10, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ submarine-security/spark-security/derby.log

# submarine-cloud-v2
submarine-cloud-v2/vendor/*
submarine-cloud-v2/submarine-operator

# vscode file
.project
Expand Down
6 changes: 6 additions & 0 deletions submarine-cloud-v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,10 @@ kubectl apply -f artifacts/examples/example-submarine.yaml

# Step3: Run unit test
go test
```

# Build Project
```bash
go build -o submarine-operator
./submarine-operator
```
185 changes: 185 additions & 0 deletions submarine-cloud-v2/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
appsinformers "k8s.io/client-go/informers/apps/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientset "submarine-cloud-v2/pkg/generated/clientset/versioned"
informers "submarine-cloud-v2/pkg/generated/informers/externalversions/submarine/v1alpha1"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/klog/v2"
"k8s.io/client-go/util/workqueue"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/apimachinery/pkg/util/wait"
submarinescheme "submarine-cloud-v2/pkg/generated/clientset/versioned/scheme"
)

const controllerAgentName = "submarine-controller"

// Controller is the controller implementation for Foo resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
// sampleclientset is a clientset for our own API group
submarineclientset clientset.Interface

submarinesSynced cache.InformerSynced
// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
}

// NewController returns a new sample controller
func NewController(
kubeclientset kubernetes.Interface,
submarineclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer,
submarineInformer informers.SubmarineInformer) *Controller {

// TODO: Create event broadcaster
// Add Submarine types to the default Kubernetes Scheme so Events can be
// logged for Submarine types.
utilruntime.Must(submarinescheme.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})


// Initialize controller
controller := &Controller{
kubeclientset: kubeclientset,
submarineclientset: submarineclientset,
submarinesSynced: submarineInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Submarines"),
recorder: recorder,
}

// Setting up event handler for Submarine
klog.Info("Setting up event handlers")
submarineInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueSubmarine,
UpdateFunc: func(old, new interface{}) {
controller.enqueueSubmarine(new)
},
})

// TODO: Setting up event handler for other resources. E.g. namespace

return controller
}

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()

// Start the informer factories to begin populating the informer caches
klog.Info("Starting Submarine controller")

// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.submarinesSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

klog.Info("Starting workers")
// Launch two workers to process Submarine resources
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")

return nil
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}

// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// TODO: Maintain workqueue
defer c.workqueue.Done(obj)
key, _ := obj.(string)
c.syncHandler(key)
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
}

return true
}

// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
// TODO: business logic
klog.Info("syncHandler: ", key)
return nil
}

// enqueueFoo takes a Submarine resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Submarine.
func (c *Controller) enqueueSubmarine(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}

// key: [namespace]/[CR name]
// Example: default/example-submarine
c.workqueue.Add(key)
}
1 change: 1 addition & 0 deletions submarine-cloud-v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand Down
46 changes: 45 additions & 1 deletion submarine-cloud-v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@ package main

import (
"flag"
"time"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/klog/v2"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/kubernetes"
"os"
"submarine-cloud-v2/pkg/signals"
clientset "submarine-cloud-v2/pkg/generated/clientset/versioned"
informers "submarine-cloud-v2/pkg/generated/informers/externalversions"
)

var (
Expand All @@ -32,7 +39,44 @@ func main() {
klog.InitFlags(nil)
flag.Parse()

// TODO: Create a Submarine operator
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}

submarineClient, err := clientset.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building submarine clientset: %s", err.Error())
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
submarineInformerFactory := informers.NewSharedInformerFactory(submarineClient, time.Second*30)

// TODO: Pass informers to NewController()
// ex: namespace informer

// Create a Submarine operator
controller := NewController(kubeClient, submarineClient,
kubeInformerFactory.Apps().V1().Deployments(),
submarineInformerFactory.Submarine().V1alpha1().Submarines())

// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
kubeInformerFactory.Start(stopCh)
submarineInformerFactory.Start(stopCh)

// Run controller
if err = controller.Run(2, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}

func init() {
Expand Down
44 changes: 44 additions & 0 deletions submarine-cloud-v2/pkg/signals/signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package signals

import (
"os"
"os/signal"
)

var onlyOneSignalHandler = make(chan struct{})

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler() (stopCh <-chan struct{}) {
close(onlyOneSignalHandler) // panics when called twice

stop := make(chan struct{})
c := make(chan os.Signal, 2)
signal.Notify(c, shutdownSignals...)
go func() {
<-c
close(stop)
<-c
os.Exit(1) // second signal. Exit directly.
}()

return stop
}
25 changes: 25 additions & 0 deletions submarine-cloud-v2/pkg/signals/signal_posix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package signals

import (
"os"
"syscall"
)

var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
24 changes: 24 additions & 0 deletions submarine-cloud-v2/pkg/signals/signal_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package signals

import (
"os"
)

var shutdownSignals = []os.Signal{os.Interrupt}