Skip to content

Commit

Permalink
Merge pull request openshift#635 from tnozicka/recovery-controller
Browse files Browse the repository at this point in the history
Add cert regeneration controller
  • Loading branch information
openshift-merge-robot committed Jan 25, 2020
2 parents 303d32d + 9a80e99 commit e40a08e
Show file tree
Hide file tree
Showing 21 changed files with 778 additions and 169 deletions.
22 changes: 22 additions & 0 deletions bindata/v4.1.0/kube-apiserver/pod.yaml
Expand Up @@ -109,6 +109,28 @@ spec:
name: resource-dir
- mountPath: /etc/kubernetes/static-pod-certs
name: cert-dir
- name: kube-apiserver-cert-regeneration-controller-REVISION
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: ${OPERATOR_IMAGE}
imagePullPolicy: IfNotPresent
terminationMessagePolicy: FallbackToLogsOnError
command: ["cluster-kube-apiserver-operator", "cert-regeneration-controller"]
args:
- --kubeconfig=/etc/kubernetes/static-pod-resources/configmaps/kube-apiserver-cert-syncer-kubeconfig/kubeconfig
- --namespace=$(POD_NAMESPACE)
- --tls-server-name=localhost-recovery
- -v=2
resources:
requests:
memory: 50Mi
cpu: 10m
volumeMounts:
- mountPath: /etc/kubernetes/static-pod-resources
name: resource-dir
- name: kube-apiserver-insecure-readyz-REVISION
image: ${OPERATOR_IMAGE}
imagePullPolicy: IfNotPresent
Expand Down
7 changes: 5 additions & 2 deletions cmd/cluster-kube-apiserver-operator/main.go
@@ -1,6 +1,7 @@
package main

import (
"context"
goflag "flag"
"fmt"
"math/rand"
Expand All @@ -13,6 +14,7 @@ import (
utilflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/logs"

"github.com/openshift/cluster-kube-apiserver-operator/pkg/cmd/certregenerationcontroller"
"github.com/openshift/cluster-kube-apiserver-operator/pkg/cmd/insecurereadyz"
operatorcmd "github.com/openshift/cluster-kube-apiserver-operator/pkg/cmd/operator"
"github.com/openshift/cluster-kube-apiserver-operator/pkg/cmd/recoveryapiserver"
Expand All @@ -35,14 +37,14 @@ func main() {
logs.InitLogs()
defer logs.FlushLogs()

command := NewOperatorCommand()
command := NewOperatorCommand(context.Background())
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

func NewOperatorCommand() *cobra.Command {
func NewOperatorCommand(ctx context.Context) *cobra.Command {
cmd := &cobra.Command{
Use: "cluster-kube-apiserver-operator",
Short: "OpenShift cluster kube-apiserver operator",
Expand All @@ -66,6 +68,7 @@ func NewOperatorCommand() *cobra.Command {
cmd.AddCommand(certsyncpod.NewCertSyncControllerCommand(operator.CertConfigMaps, operator.CertSecrets))
cmd.AddCommand(recoveryapiserver.NewRecoveryAPIServerCommand())
cmd.AddCommand(regeneratecerts.NewRegenerateCertsCommand())
cmd.AddCommand(certregenerationcontroller.NewCertRegenerationControllerCommand(ctx))
cmd.AddCommand(insecurereadyz.NewInsecureReadyzCommand())

return cmd
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -20,7 +20,7 @@ require (
github.com/kubernetes-sigs/kube-storage-version-migrator v0.0.0-20191127225502-51849bc15f17
github.com/openshift/api v0.0.0-20200122114642-1108c9abdb99
github.com/openshift/client-go v0.0.0-20200116152001-92a2713fa240
github.com/openshift/library-go v0.0.0-20200123002050-ef5fb66e6346
github.com/openshift/library-go v0.0.0-20200123173517-9d0011759106
github.com/prometheus/client_golang v1.1.0
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -307,8 +307,8 @@ github.com/openshift/api v0.0.0-20200122114642-1108c9abdb99 h1:WWUaFPHcREzq0p/Zf
github.com/openshift/api v0.0.0-20200122114642-1108c9abdb99/go.mod h1:fT6U/JfG8uZzemTRwZA2kBDJP5nWz7v05UHnty/D+pk=
github.com/openshift/client-go v0.0.0-20200116152001-92a2713fa240 h1:XYfJWv2Ch+qInGLDEedHRtDsJwnxyU1L8U7SY56NcA8=
github.com/openshift/client-go v0.0.0-20200116152001-92a2713fa240/go.mod h1:4riOwdj99Hd/q+iAcJZfNCsQQQMwURnZV6RL4WHYS5w=
github.com/openshift/library-go v0.0.0-20200123002050-ef5fb66e6346 h1:UH7+xLg4C6sOEBV5JZuOXHbeQhuvyakO3MwXD/2+M2Y=
github.com/openshift/library-go v0.0.0-20200123002050-ef5fb66e6346/go.mod h1:/P1rPwPkaaNtylv8PLYkOTbf6tCdaNYDNqL9Y8GzJfE=
github.com/openshift/library-go v0.0.0-20200123173517-9d0011759106 h1:ZNHyJJhPOXh3dhGtdOmBRHiwP1POmco9Iq0X3rOoIUU=
github.com/openshift/library-go v0.0.0-20200123173517-9d0011759106/go.mod h1:/P1rPwPkaaNtylv8PLYkOTbf6tCdaNYDNqL9Y8GzJfE=
github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down
145 changes: 145 additions & 0 deletions pkg/cmd/certregenerationcontroller/cabundlesyncer.go
@@ -0,0 +1,145 @@
package certregenerationcontroller

import (
"context"
"fmt"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/v1helpers"

"github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/operatorclient"
"github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/targetconfigcontroller"
)

const workQueueKey = "key"

// CABundleController composes individual certs into CA bundle that is used
// by kube-apiserver to validate clients.
// Cert recovery refreshes "kube-control-plane-signer-ca" and needs the containing
// bundle regenerated so kube-controller-manager and kube-scheduler can connect
// using client certs.
type CABundleController struct {
configMapGetter corev1client.ConfigMapsGetter
configMapLister corev1listers.ConfigMapLister

eventRecorder events.Recorder

cachesToSync []cache.InformerSynced

// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface
}

func NewCABundleController(
configMapGetter corev1client.ConfigMapsGetter,
kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces,
eventRecorder events.Recorder,
) (*CABundleController, error) {
c := &CABundleController{
configMapGetter: configMapGetter,
configMapLister: kubeInformersForNamespaces.ConfigMapLister(),
eventRecorder: eventRecorder.WithComponentSuffix("manage-client-ca-bundle-recovery-controller"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CABundleRecoveryController"),
}

handler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(workQueueKey) },
DeleteFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
}

// we react to some config changes
namespaces := []string{
operatorclient.GlobalUserSpecifiedConfigNamespace,
operatorclient.GlobalMachineSpecifiedConfigNamespace,
operatorclient.OperatorNamespace,
operatorclient.TargetNamespace,
}
for _, namespace := range namespaces {
informers := kubeInformersForNamespaces.InformersFor(namespace)
informers.Core().V1().ConfigMaps().Informer().AddEventHandler(handler)
c.cachesToSync = append(c.cachesToSync, informers.Core().V1().ConfigMaps().Informer().HasSynced)
}

return c, nil
}

func (c *CABundleController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()

// FIXME: These are missing a wait group to track goroutines and handle graceful termination
// (@deads2k wants time to think it through)

klog.Info("Starting CA bundle controller")
defer func() {
klog.Info("Shutting down CA bundle controller")
c.queue.ShutDown()
klog.Info("CA bundle controller shut down")
}()

if !cache.WaitForNamedCacheSync("CABundleController", ctx.Done(), c.cachesToSync...) {
return
}

go func() {
wait.UntilWithContext(ctx, c.runWorker, time.Second)
}()

<-ctx.Done()
}

func (c *CABundleController) runWorker(ctx context.Context) {
for c.processNextItem(ctx) {
}
}

func (c *CABundleController) processNextItem(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)

err := c.sync(ctx)

if err == nil {
c.queue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %w", key, err))
c.queue.AddRateLimited(key)

return true
}

func (c *CABundleController) sync(ctx context.Context) error {
// Always start 10 seconds later after a change occurred. Makes us less likely to steal work and logs from the operator.
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
select {
case <-timer.C:
case <-ctx.Done():
return nil
}

_, changed, err := targetconfigcontroller.ManageClientCABundle(c.configMapLister, c.configMapGetter, c.eventRecorder)
if err != nil {
return err
}

if changed {
klog.V(2).Info("Refreshed client CA bundle.")
}

return nil
}
153 changes: 153 additions & 0 deletions pkg/cmd/certregenerationcontroller/cmd.go
@@ -0,0 +1,153 @@
package certregenerationcontroller

import (
"context"
"fmt"
"time"

"github.com/spf13/cobra"

"k8s.io/client-go/kubernetes"

operatorv1 "github.com/openshift/api/operator/v1"
configeversionedclient "github.com/openshift/client-go/config/clientset/versioned"
configexternalinformers "github.com/openshift/client-go/config/informers/externalversions"
"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/openshift/library-go/pkg/operator/certrotation"
"github.com/openshift/library-go/pkg/operator/genericoperatorclient"
"github.com/openshift/library-go/pkg/operator/v1helpers"

"github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/certrotationcontroller"
"github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/operatorclient"
"github.com/openshift/cluster-kube-apiserver-operator/pkg/version"
)

type Options struct {
controllerContext *controllercmd.ControllerContext

TLSServerName string
}

func NewCertRegenerationControllerCommand(ctx context.Context) *cobra.Command {
o := &Options{
TLSServerName: "localhost-recovery",
}

cmd := controllercmd.
NewControllerCommandConfig("cert-regeneration-controller", version.Get(), func(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
o.controllerContext = controllerContext

err := o.Validate(ctx)
if err != nil {
return err
}

err = o.Complete(ctx)
if err != nil {
return err
}

err = o.Run(ctx)
if err != nil {
return err
}

return nil
}).NewCommandWithContext(ctx)
cmd.Use = "cert-regeneration-controller"
cmd.Short = "Start the Cluster Certificate Regeneration Controller"

cmd.PersistentFlags().StringVarP(&o.TLSServerName, "tls-server-name", "", o.TLSServerName, "The SNI hostname to set for the server in kubeconfig")

return cmd
}

func (o *Options) Validate(ctx context.Context) error {
return nil
}

func (o *Options) Complete(ctx context.Context) error {
return nil
}

func (o *Options) Run(ctx context.Context) error {
if len(o.TLSServerName) != 0 {
// TLSServerName chooses the SNI serving endpoint on the apiserver.
// Particularly useful when connecting to "localhost" and wanting to choose a special
// serving endpoint like "localhost-recovery" that has long-lived serving certs
// for localhost connections.
o.controllerContext.KubeConfig.TLSClientConfig.ServerName = o.TLSServerName
o.controllerContext.ProtoKubeConfig.TLSClientConfig.ServerName = o.TLSServerName
}

kubeClient, err := kubernetes.NewForConfig(o.controllerContext.ProtoKubeConfig)
if err != nil {
return fmt.Errorf("can't build kubernetes client: %w", err)
}

configClient, err := configeversionedclient.NewForConfig(o.controllerContext.KubeConfig)
if err != nil {
return fmt.Errorf("failed to create config client: %w", err)
}

configInformers := configexternalinformers.NewSharedInformerFactory(configClient, 10*time.Minute)

kubeAPIServerInformersForNamespaces := v1helpers.NewKubeInformersForNamespaces(
kubeClient,
operatorclient.GlobalMachineSpecifiedConfigNamespace,
operatorclient.GlobalUserSpecifiedConfigNamespace,
operatorclient.OperatorNamespace,
operatorclient.TargetNamespace,
)

operatorClient, dynamicInformers, err := genericoperatorclient.NewStaticPodOperatorClient(o.controllerContext.KubeConfig, operatorv1.GroupVersion.WithResource("kubeapiservers"))
if err != nil {
return err
}

certRotationScale, err := certrotation.GetCertRotationScale(kubeClient, operatorclient.GlobalUserSpecifiedConfigNamespace)
if err != nil {
return err
}

kubeAPIServerCertRotationController, err := certrotationcontroller.NewCertRotationControllerOnlyWhenExpired(
kubeClient,
operatorClient,
configInformers,
kubeAPIServerInformersForNamespaces,
o.controllerContext.EventRecorder,
certRotationScale,
)
if err != nil {
return err
}

caBundleController, err := NewCABundleController(
kubeClient.CoreV1(),
kubeAPIServerInformersForNamespaces,
o.controllerContext.EventRecorder,
)
if err != nil {
return err
}

// We can't start informers until after the resources have been requested. Now is the time.
configInformers.Start(ctx.Done())
kubeAPIServerInformersForNamespaces.Start(ctx.Done())
dynamicInformers.Start(ctx.Done())

// FIXME: These are missing a wait group to track goroutines and handle graceful termination
// (@deads2k wants time to think it through)

go func() {
kubeAPIServerCertRotationController.Run(ctx, 1)
}()

go func() {
caBundleController.Run(ctx)
}()

<-ctx.Done()

return nil
}
1 change: 1 addition & 0 deletions pkg/cmd/regeneratecerts/regenerate_certificates.go
Expand Up @@ -157,6 +157,7 @@ func (o *Options) Run() error {
Namespace: "openshift-kube-apiserver-operator",
}) // this is a fake object-reference that should hopefully place us in the correct namespace

// On manual request we want to rotate even if the certs are close to expiry to avoid case when some other cert becomes invalid just after.
kubeAPIServerCertRotationController, err := kubeapiservercertrotationcontroller.NewCertRotationController(
kubeClient,
nil,
Expand Down

0 comments on commit e40a08e

Please sign in to comment.