Skip to content

Commit

Permalink
fix #18
Browse files Browse the repository at this point in the history
make discover before cluster create
  • Loading branch information
little-guy-lxr committed Oct 22, 2021
1 parent 66c6d98 commit b980353
Show file tree
Hide file tree
Showing 14 changed files with 1,376 additions and 184 deletions.
2 changes: 1 addition & 1 deletion cmd/discover/discover.go
Expand Up @@ -39,7 +39,7 @@ var DiscoverCmd = &cobra.Command{
}

func init() {
DiscoverCmd.Flags().DurationVar(&discoverDevicesInterval, "discover-interval", 60*time.Minute, "interval between discovering devices (default 60m)")
DiscoverCmd.Flags().DurationVar(&discoverDevicesInterval, "discover-interval", 60*time.Second, "interval between discovering devices (default 60m)")
utilruntime.Must(topolvmv1.AddToScheme(scheme))
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
DiscoverCmd.RunE = discover
Expand Down
20 changes: 19 additions & 1 deletion cmd/operator/operator.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package operator

import (
"context"
"flag"
"fmt"
topolvmv1 "github.com/alauda/topolvm-operator/api/v2"
Expand All @@ -32,6 +33,7 @@ import (
"os"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

var OperatorCmd = &cobra.Command{
Expand All @@ -55,6 +57,10 @@ func addScheme() {
// +kubebuilder:scaffold:scheme
}

var AddToManagerFuncs = []func(manager.Manager, *cluster.Context, context.Context, cluster.OperatorConfig) error{
controllers.Add,
}

func startOperator(cmd *cobra.Command, args []string) error {

cluster.SetLogLevel()
Expand Down Expand Up @@ -111,8 +117,20 @@ func startOperator(cmd *cobra.Command, args []string) error {
os.Exit(1)
}

opctx := context.TODO()
config := cluster.OperatorConfig{
Image: operatorImage,
}
for _, f := range AddToManagerFuncs {
if err := f(mgr, ctx, opctx, config); err != nil {
return err
}
}
if err != nil {
logger.Error(err, "problem running manager")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

logger.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
logger.Error(err, "problem running manager")
Expand Down
1 change: 1 addition & 0 deletions cmd/topolvm/topolvm.go
Expand Up @@ -45,6 +45,7 @@ var RootCmd = &cobra.Command{
func init() {
RootCmd.PersistentFlags().StringVar(&cluster.LogLevelRaw, "log-level", "INFO", "logging level for logging/tracing output (valid values: CRITICAL,ERROR,WARNING,NOTICE,INFO,DEBUG,TRACE)")
RootCmd.Flags().StringVar(&cluster.CSIKubeletRootDir, "csi-kubelet-root-dir", "/var/lib/kubelet/", "csi kubelet root dir")
RootCmd.Flags().StringVar(&cluster.EnableDiscoverDevices, "enable-discover-devices", "false", "enable discover devices")
RootCmd.Flags().BoolVar(&cluster.IsOperatorHub, "is-operator-hub", true, "is operator or not")
RootCmd.Flags().DurationVar(&cluster.CheckStatusInterval, "check-status-interval", 10*time.Second, "check cluster status interval")
flags.SetFlagsFromEnv(RootCmd.Flags(), TopolvmEnvVarPrefix)
Expand Down
178 changes: 178 additions & 0 deletions controllers/config_controller.go
@@ -0,0 +1,178 @@
package controllers

import (
"context"
"github.com/alauda/topolvm-operator/pkg/cluster"
"github.com/alauda/topolvm-operator/pkg/operator/discover"
"github.com/alauda/topolvm-operator/pkg/operator/k8sutil"
"github.com/alauda/topolvm-operator/pkg/operator/node"
"github.com/coreos/pkg/capnslog"
"github.com/pkg/errors"
opcontroller "github.com/rook/rook/pkg/operator/ceph/controller"
v1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

const (
controllerName = "topolvm-operator-config-controller"
)

var configLogger = capnslog.NewPackageLogger("topolvm/operator", "config-setting")

type ReconcileConfig struct {
client client.Client
context *cluster.Context
config cluster.OperatorConfig
opManagerContext context.Context
}

// predicateOpController is the predicate function to trigger reconcile on operator configuration cm change
func predicateController(client client.Client) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
if cm, ok := e.Object.(*v1.ConfigMap); ok {
return cm.Name == cluster.OperatorSettingConfigMapName
}
return false
},

UpdateFunc: func(e event.UpdateEvent) bool {
if old, ok := e.ObjectOld.(*v1.ConfigMap); ok {
if new, ok := e.ObjectNew.(*v1.ConfigMap); ok {
if old.Name == cluster.OperatorSettingConfigMapName && new.Name == cluster.OperatorSettingConfigMapName {
// We still want to reconcile the operator manager if the configmap is updated
return true
}
}
}

return false
},

DeleteFunc: func(e event.DeleteEvent) bool {
if cm, ok := e.Object.(*v1.ConfigMap); ok {
if cm.Name == cluster.OperatorSettingConfigMapName {
configLogger.Debug("operator configmap deleted, not reconciling")
return false
}
}
return false
},

GenericFunc: func(e event.GenericEvent) bool {
return false
},
}
}

func newReconciler(mgr manager.Manager, context *cluster.Context, opManagerContext context.Context, config cluster.OperatorConfig) reconcile.Reconciler {
return &ReconcileConfig{
client: mgr.GetClient(),
context: context,
config: config,
opManagerContext: opManagerContext,
}
}

// Add creates a new Operator configuration Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, context *cluster.Context, opManagerContext context.Context, opConfig cluster.OperatorConfig) error {
return add(mgr, newReconciler(mgr, context, opManagerContext, opConfig))
}

func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}
configLogger.Infof("%s successfully started", controllerName)

// Watch for ConfigMap (operator config)
err = c.Watch(&source.Kind{
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(mgr.GetClient()))
if err != nil {
return err
}

return nil
}

// Reconcile reads that state of the cluster for a CephClient object and makes changes based on the state read
// and what is in the CephClient.Spec
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (c *ReconcileConfig) Reconcile(context context.Context, request reconcile.Request) (reconcile.Result, error) {
// workaround because the rook logging mechanism is not compatible with the controller-runtime logging interface
reconcileResponse, err := c.reconcile(request)
if err != nil {
configLogger.Errorf("failed to reconcile %v", err)
}

return reconcileResponse, err
}

func (c *ReconcileConfig) reconcile(request reconcile.Request) (reconcile.Result, error) {
// Fetch the operator's configmap
opConfig := &v1.ConfigMap{}
configLogger.Debugf("reconciling %s", request.NamespacedName)
err := c.client.Get(c.opManagerContext, request.NamespacedName, opConfig)
if err != nil {
if kerrors.IsNotFound(err) {
configLogger.Debug("operator's configmap resource not found. will use default value or env var.")
} else {
// Error reading the object - requeue the request.
return opcontroller.ImmediateRetryResult, errors.Wrap(err, "failed to get operator's configmap")
}
} else {
// Populate the operator's config
c.config.Parameters = opConfig.Data
}

// Reconcile discovery daemon
err = c.updateCsiDriver()
if err != nil {
return opcontroller.ImmediateRetryResult, err
}

err = c.starDiscoverDaemonset()
if err != nil {
return opcontroller.ImmediateRetryResult, err
}
// Reconcile webhook secret
// This is done in the predicate function

configLogger.Infof("%s done reconciling", controllerName)
return reconcile.Result{}, nil
}

func (c *ReconcileConfig) updateCsiDriver() error {
kubeletRootDir := k8sutil.GetValue(c.config.Parameters, cluster.KubeletRootPathEnv, cluster.CSIKubeletRootDir)
if kubeletRootDir != cluster.CSIKubeletRootDir {
if err := node.UpdateNodeDeploymentCSIKubeletRootPath(c.context.Clientset, kubeletRootDir); err != nil {
clusterLogger.Errorf("updater csi kubelet path failed err:%s", err.Error())
return err
}
}
return nil
}

func (c *ReconcileConfig) starDiscoverDaemonset() error {
enableDiscoverDevices := k8sutil.GetValue(c.config.Parameters, cluster.DiscoverDevicesEnv, cluster.EnableDiscoverDevices)
if enableDiscoverDevices == "true" {
if err := discover.MakeDiscoverDevicesDaemonset(c.context.Clientset, cluster.DiscoverAppName, c.config.Image, false); err != nil {
return err
}
}
return nil

}
2 changes: 1 addition & 1 deletion controllers/parepareVg_controller.go
Expand Up @@ -655,7 +655,7 @@ func checkLoopDevice(executor exec.Executor, disks []topolvmv1.Disk, loops *[]to
} else {
if !created {
vgLogger.Debugf("get loop %s back file", ele.Name)
s := topolvmv1.LoopState{Name: ele.Name, Status: cluster.LoopCreateSuccessful}
s := topolvmv1.LoopState{Name: ele.Name, Status: cluster.LoopCreateSuccessful, DeviceName: ele.Name}
file, err := sys.GetLoopBackFile(executor, ele.Name)
if err != nil {
vgLogger.Errorf("get loop %s back file failed %v", ele.Name, err)
Expand Down

0 comments on commit b980353

Please sign in to comment.