Skip to content

Commit

Permalink
Merge cfa8244 into b7d138e
Browse files Browse the repository at this point in the history
  • Loading branch information
bpineau authored Apr 25, 2018
2 parents b7d138e + cfa8244 commit 38c9942
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 212 deletions.
53 changes: 18 additions & 35 deletions cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/spf13/cobra"

"github.com/bpineau/katafygio/config"
"github.com/bpineau/katafygio/pkg/client"
"github.com/bpineau/katafygio/pkg/controller"
"github.com/bpineau/katafygio/pkg/event"
Expand All @@ -22,9 +20,21 @@ import (

const appName = "katafygio"

func runE(cmd *cobra.Command, args []string) (err error) {
var (
restcfg client.Interface

// RootCmd is our main entry point, launching runE()
RootCmd = &cobra.Command{
Use: appName,
Short: "Backup Kubernetes cluster as yaml files",
Long: "Backup Kubernetes cluster as yaml files in a git repository.\n" +
"--exclude-kind (-x) and --exclude-object (-y) may be specified several times.",
PreRun: bindConf,
RunE: runE,
}
)

resync := time.Duration(resyncInt) * time.Second
func runE(cmd *cobra.Command, args []string) (err error) {
logger := log.New(logLevel, logServer, logOutput)

if restcfg == nil {
Expand All @@ -34,34 +44,21 @@ func runE(cmd *cobra.Command, args []string) (err error) {
}
}

conf := &config.KfConfig{
DryRun: dryRun,
DumpMode: dumpMode,
Logger: logger,
LocalDir: localDir,
GitURL: gitURL,
Filter: filter,
ExcludeKind: exclkind,
ExcludeObject: exclobj,
HealthPort: healthP,
Client: restcfg,
ResyncIntv: resync,
}

repo, err := git.New(logger, dryRun, localDir, gitURL).Start()
if err != nil {
conf.Logger.Fatalf("failed to start git repo handler: %v", err)
return fmt.Errorf("failed to start git repo handler: %v", err)
}

evts := event.New()
fact := controller.NewFactory(logger, filter, resyncInt, exclobj)
reco := recorder.New(logger, evts, localDir, resyncInt*2, dryRun).Start()
obsv := observer.New(conf, evts, &controller.Factory{}).Start()
obsv := observer.New(logger, restcfg, evts, fact, exclkind).Start()
http := health.New(logger, healthP).Start()

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGTERM)
signal.Notify(sigterm, syscall.SIGINT)
if !conf.DumpMode {
if !dumpMode {
<-sigterm
}

Expand All @@ -73,20 +70,6 @@ func runE(cmd *cobra.Command, args []string) (err error) {
return nil
}

var (
restcfg client.Interface

// RootCmd is our main entry point, launching runE()
RootCmd = &cobra.Command{
Use: appName,
Short: "Backup Kubernetes cluster as yaml files",
Long: "Backup Kubernetes cluster as yaml files in a git repository.\n" +
"--exclude-kind (-x) and --exclude-object (-y) may be specified several times.",
PreRun: bindConf,
RunE: runE,
}
)

// Execute adds all child commands to the root command and sets their flags.
func Execute() error {
return RootCmd.Execute()
Expand Down
55 changes: 0 additions & 55 deletions config/config.go

This file was deleted.

30 changes: 0 additions & 30 deletions config/config_test.go

This file was deleted.

92 changes: 60 additions & 32 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"strings"
"time"

"github.com/bpineau/katafygio/config"
"github.com/bpineau/katafygio/pkg/event"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -37,25 +36,44 @@ type Interface interface {
Stop()
}

type logger interface {
Infof(format string, args ...interface{})
Errorf(format string, args ...interface{})
}

// Factory generate controllers
type Factory struct{}
type Factory struct {
logger logger
filter string
resyncIntv time.Duration
excluded []string
}

// Controller is a generic kubernetes controller
type Controller struct {
name string
stopCh chan struct{}
doneCh chan struct{}
syncCh chan struct{}
notifier event.Notifier
config *config.KfConfig
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
name string
stopCh chan struct{}
doneCh chan struct{}
syncCh chan struct{}
notifier event.Notifier
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
logger logger
resyncIntv time.Duration
excluded []string
}

// New return a kubernetes controller using the provided client
func New(client cache.ListerWatcher, notifier event.Notifier, name string, config *config.KfConfig) *Controller {

selector := metav1.ListOptions{LabelSelector: config.Filter}
func New(client cache.ListerWatcher,
notifier event.Notifier,
log logger,
name string,
filter string,
resync time.Duration,
excluded []string,
) *Controller {

selector := metav1.ListOptions{LabelSelector: filter}
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.List(selector)
Expand All @@ -68,7 +86,7 @@ func New(client cache.ListerWatcher, notifier event.Notifier, name string, confi
informer := cache.NewSharedIndexInformer(
lw,
&unstructured.Unstructured{},
config.ResyncIntv,
resync,
cache.Indexers{},
)

Expand Down Expand Up @@ -96,20 +114,22 @@ func New(client cache.ListerWatcher, notifier event.Notifier, name string, confi
})

return &Controller{
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
syncCh: make(chan struct{}, 1),
notifier: notifier,
name: name,
config: config,
queue: queue,
informer: informer,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
syncCh: make(chan struct{}, 1),
notifier: notifier,
name: name,
queue: queue,
informer: informer,
logger: log,
resyncIntv: resync,
excluded: excluded,
}
}

// Start launchs the controller in the background
func (c *Controller) Start() {
c.config.Logger.Infof("Starting %s controller", c.name)
c.logger.Infof("Starting %s controller", c.name)
defer utilruntime.HandleCrash()

go c.informer.Run(c.stopCh)
Expand All @@ -126,7 +146,7 @@ func (c *Controller) Start() {

// Stop halts the controller
func (c *Controller) Stop() {
c.config.Logger.Infof("Stopping %s controller", c.name)
c.logger.Infof("Stopping %s controller", c.name)
<-c.syncCh
close(c.stopCh)
c.queue.ShutDown()
Expand All @@ -148,7 +168,7 @@ func (c *Controller) processNextItem() bool {
defer c.queue.Done(key)

if strings.Compare(key.(string), canaryKey) == 0 {
c.config.Logger.Infof("Initial sync completed for %s controller", c.name)
c.logger.Infof("Initial sync completed for %s controller", c.name)
c.syncCh <- struct{}{}
c.queue.Forget(key)
return true
Expand All @@ -160,11 +180,11 @@ func (c *Controller) processNextItem() bool {
// No error, reset the ratelimit counters
c.queue.Forget(key)
} else if c.queue.NumRequeues(key) < maxProcessRetry {
c.config.Logger.Errorf("Error processing %s (will retry): %v", key, err)
c.logger.Errorf("Error processing %s (will retry): %v", key, err)
c.queue.AddRateLimited(key)
} else {
// err != nil and too many retries
c.config.Logger.Errorf("Error processing %s (giving up): %v", key, err)
c.logger.Errorf("Error processing %s (giving up): %v", key, err)
c.queue.Forget(key)
}

Expand All @@ -178,7 +198,7 @@ func (c *Controller) processItem(key string) error {
return fmt.Errorf("error fetching %s from store: %v", key, err)
}

for _, obj := range c.config.ExcludeObject {
for _, obj := range c.excluded {
if strings.Compare(strings.ToLower(obj), strings.ToLower(c.name+":"+key)) == 0 {
return nil
}
Expand All @@ -200,8 +220,6 @@ func (c *Controller) processItem(key string) error {
delete(md, attr)
}

c.config.Logger.Debugf("Found %s/%s [%s]", obj.GetAPIVersion(), obj.GetKind(), key)

yml, err := yaml.Marshal(obj)
if err != nil {
return fmt.Errorf("failed to marshal %s: %v", key, err)
Expand All @@ -215,7 +233,17 @@ func (c *Controller) enqueue(notif *event.Notification) {
c.notifier.Send(notif)
}

// NewFactory create a controller factory
func NewFactory(logger logger, filter string, resync int, excluded []string) *Factory {
return &Factory{
logger: logger,
filter: filter,
resyncIntv: time.Duration(resync) * time.Second,
excluded: excluded,
}
}

// NewController create a controller.Controller
func (f *Factory) NewController(client cache.ListerWatcher, notifier event.Notifier, name string, config *config.KfConfig) Interface {
return New(client, notifier, name, config)
func (f *Factory) NewController(client cache.ListerWatcher, notifier event.Notifier, name string) Interface {
return New(client, notifier, f.logger, name, f.filter, f.resyncIntv, f.excluded)
}
Loading

0 comments on commit 38c9942

Please sign in to comment.