Skip to content

Commit

Permalink
BackupInstance controller
Browse files Browse the repository at this point in the history
  • Loading branch information
earlgreyz committed Apr 29, 2018
1 parent 70cf6bc commit 08e9bba
Show file tree
Hide file tree
Showing 12 changed files with 561 additions and 13 deletions.
5 changes: 5 additions & 0 deletions operator.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/sirupsen/logrus"

"github.com/grtl/mysql-operator/pkg/client/clientset/versioned"
"github.com/grtl/mysql-operator/pkg/controller/backupinstance"
"github.com/grtl/mysql-operator/pkg/controller/backupschedule"
"github.com/grtl/mysql-operator/pkg/controller/cluster"
backupinstancecrd "github.com/grtl/mysql-operator/pkg/crd/backupinstance"
Expand Down Expand Up @@ -70,6 +71,10 @@ func main() {
backupScheduleController := backupschedule.NewBackupScheduleController(clientset, kubeClientset)
go backupScheduleController.Run(ctx)

logrus.Debug("Starting the backup instance controller")
backupInstanceController := backupinstance.NewBackupInstanceController(clientset, kubeClientset)
go backupInstanceController.Run(ctx)

logrus.Info("Listening for events")

signals := make(chan os.Signal, 1)
Expand Down
13 changes: 13 additions & 0 deletions pkg/controller/backupinstance/backupinstance_suite_test.go
@@ -0,0 +1,13 @@
package backupinstance_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestBackupSchedule(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Controller Backup Schedule Suite")
}
78 changes: 78 additions & 0 deletions pkg/controller/backupinstance/controller.go
@@ -0,0 +1,78 @@
package backupinstance

import (
"context"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

crv1 "github.com/grtl/mysql-operator/pkg/apis/cr/v1"
"github.com/grtl/mysql-operator/pkg/client/clientset/versioned"
"github.com/grtl/mysql-operator/pkg/client/informers/externalversions"
"github.com/grtl/mysql-operator/pkg/controller"
)

// NewBackupInstanceController returns new backup instance controller.
func NewBackupInstanceController(clientset versioned.Interface, kubeClientset kubernetes.Interface) controller.Controller {
return &backupInstanceController{
Base: controller.NewControllerBase(),
clientset: clientset,
}
}

type backupInstanceController struct {
controller.Base
clientset versioned.Interface
}

func (c *backupInstanceController) Run(ctx context.Context) error {
factory := externalversions.NewSharedInformerFactory(c.clientset, 0)
informer := factory.Cr().V1().MySQLBackupInstances().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
DeleteFunc: c.onDelete,
})
informer.Run(ctx.Done())
<-ctx.Done()
return ctx.Err()
}

func (c *backupInstanceController) onAdd(obj interface{}) {
backup := obj.(*crv1.MySQLBackupInstance)

logBackupInstanceEventBegin(backup, BackupInstanceAdded)

logBackupInstanceEventSuccess(backup, BackupInstanceAdded)

// Run hooks
for _, hook := range c.GetHooks() {
hook.OnAdd(backup)
}
}

func (c *backupInstanceController) onUpdate(oldObj, newObj interface{}) {
newBackup := newObj.(*crv1.MySQLBackupInstance)

logBackupInstanceEventBegin(newBackup, BackupInstanceUpdated)

logBackupInstanceEventSuccess(newBackup, BackupInstanceUpdated)

// Run hooks
for _, hook := range c.GetHooks() {
hook.OnUpdate(newBackup)
}
}

func (c *backupInstanceController) onDelete(obj interface{}) {
backup := obj.(*crv1.MySQLBackupInstance)

logBackupInstanceEventBegin(backup, BackupInstanceDeleted)

logBackupInstanceEventSuccess(backup, BackupInstanceDeleted)

// Run hooks
for _, hook := range c.GetHooks() {
hook.OnDelete(backup)
}
}
113 changes: 113 additions & 0 deletions pkg/controller/backupinstance/controller_test.go
@@ -0,0 +1,113 @@
package backupinstance_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

. "github.com/grtl/mysql-operator/pkg/controller/backupinstance"

"context"
"io/ioutil"

"k8s.io/apimachinery/pkg/watch"

"github.com/nauyey/factory"
"github.com/sirupsen/logrus"

crv1 "github.com/grtl/mysql-operator/pkg/apis/cr/v1"
"github.com/grtl/mysql-operator/pkg/controller"
testingFactory "github.com/grtl/mysql-operator/pkg/testing/factory"
)

var _ = Describe("Backup Instance Controller", func() {
// Turn off logging output
logrus.SetOutput(ioutil.Discard)

var (
backup *crv1.MySQLBackupInstance

watcher *watch.FakeWatcher
scheduleController controller.Controller
eventsHook controller.EventsHook
)

BeforeEach(func() {
// Initialize the controller
watcher, scheduleController = NewFakeBackupInstanceController(16)
eventsHook = controller.NewEventsHook(16)

// Setup fake Backup Instance
backup = new(crv1.MySQLBackupInstance)
err := factory.Build(testingFactory.MySQLBackupInstanceFactory).To(backup)
Expect(err).NotTo(HaveOccurred())
})

JustBeforeEach(func() {
err := scheduleController.AddHook(eventsHook)
Expect(err).NotTo(HaveOccurred())

watcher.Add(backup)
})

When("Backup Instance is added", func() {
It("should get processed by the controller", func(done Done) {
var event controller.Event

ctx, cancelFunc := context.WithCancel(context.Background())
go scheduleController.Run(ctx)
defer cancelFunc()

Eventually(eventsHook.GetEventsChan()).Should(Receive(&event))
Expect(event.Type).To(Equal(controller.EventAdded))
Expect(event.Object).To(Equal(backup))

close(done)
})
})

When("Backup Instance is updated", func() {
It("should get processed by the controller", func(done Done) {
var event controller.Event

ctx, cancelFunc := context.WithCancel(context.Background())
go scheduleController.Run(ctx)
defer cancelFunc()

// Ignore added event
Eventually(eventsHook.GetEventsChan()).Should(Receive(&event))
Expect(event.Type).To(Equal(controller.EventAdded))

// Update backup backup
watcher.Modify(backup)

Eventually(eventsHook.GetEventsChan()).Should(Receive(&event))
Expect(event.Type).To(Equal(controller.EventUpdated))
Expect(event.Object).To(Equal(backup))

close(done)
})
})

When("Backup Instance is deleted", func() {
It("should get processed by the controller", func(done Done) {
var event controller.Event

ctx, cancelFunc := context.WithCancel(context.Background())
go scheduleController.Run(ctx)
defer cancelFunc()

// Ignore added event
Eventually(eventsHook.GetEventsChan()).Should(Receive(&event))
Expect(event.Type).To(Equal(controller.EventAdded))

// Update Backup Instance
watcher.Delete(backup)

Eventually(eventsHook.GetEventsChan()).Should(Receive(&event))
Expect(event.Type).To(Equal(controller.EventDeleted))
Expect(event.Object).To(Equal(backup))

close(done)
})
})
})
23 changes: 23 additions & 0 deletions pkg/controller/backupinstance/fake.go
@@ -0,0 +1,23 @@
package backupinstance

import (
"k8s.io/apimachinery/pkg/watch"
kubeFake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/testing"

"github.com/grtl/mysql-operator/pkg/client/clientset/versioned/fake"
"github.com/grtl/mysql-operator/pkg/controller"
)

// NewFakeBackupInstanceController returns new operator controller among with
// prepended watcher. Created controller uses fake clientSets. Size indicates
// watcher events channel buffer.
func NewFakeBackupInstanceController(size int) (*watch.FakeWatcher, controller.Controller) {
kubeClientset := kubeFake.NewSimpleClientset()
clientset := fake.NewSimpleClientset()

watcher := watch.NewFakeWithChanSize(size, false)
clientset.PrependWatchReactor("mysqlbackupinstances", testing.DefaultWatchReactor(watcher, nil))
fakeController := NewBackupInstanceController(clientset, kubeClientset)
return watcher, fakeController
}
9 changes: 9 additions & 0 deletions pkg/controller/backupinstance/fake_test.go
@@ -0,0 +1,9 @@
package backupinstance_test

import (
. "github.com/onsi/ginkgo"
)

var _ = Describe("Fake", func() {

})
26 changes: 26 additions & 0 deletions pkg/controller/backupinstance/logging.go
@@ -0,0 +1,26 @@
package backupinstance

import (
crv1 "github.com/grtl/mysql-operator/pkg/apis/cr/v1"
"github.com/grtl/mysql-operator/pkg/logging"
)

// Event represents an event processed by the Backup Schedule controller.
type Event string

// Available event types.
const (
BackupInstanceAdded Event = "Added"
BackupInstanceUpdated Event = "Updated"
BackupInstanceDeleted Event = "Deleted"
)

func logBackupInstanceEventBegin(backup *crv1.MySQLBackupInstance, event Event) {
logging.LogBackupInstance(backup).WithField(
"event", event).Info("Received BackupInstance event")
}

func logBackupInstanceEventSuccess(backup *crv1.MySQLBackupInstance, event Event) {
logging.LogBackupInstance(backup).WithField(
"event", event).Info("Successfully processed BackupInstance event")
}

0 comments on commit 08e9bba

Please sign in to comment.