diff --git a/cmd/kubedirector/main.go b/cmd/kubedirector/main.go index 2e6caf41..fc363c7c 100644 --- a/cmd/kubedirector/main.go +++ b/cmd/kubedirector/main.go @@ -42,20 +42,10 @@ func main() { sdk.ExposeMetricsPort() - resource := "kubedirector.bluedata.io/v1alpha1" - kind := "KubeDirectorCluster" namespace, err := k8sutil.GetWatchNamespace() if err != nil { logrus.Fatalf("failed to get watch namespace: %v", err) } - // The resync period essentially determines how granularly we can detect - // the completion of cluster config changes. Making this too small can - // actually be bad in that there is benefit to batch-resolving changes, - // within KubeDirector but also especially with the cluster's app config - // scripts. - resyncPeriod := time.Duration(30) * time.Second - logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) - // Fetch our deployment object kdName, err := k8sutil.GetOperatorName() if err != nil { @@ -78,7 +68,33 @@ func main() { validator.StartValidationServer(handler) }() - sdk.Watch(resource, kind, namespace, resyncPeriod) + type watchInfo struct { + kind string + resyncPeriod time.Duration + } + + // Add all CR kinds that we want to watch. + watchParams := []watchInfo{ + { + kind: "KubeDirectorCluster", + // The resync period essentially determines how granularly we can detect + // the completion of cluster config changes. Making this too small can + // actually be bad in that there is benefit to batch-resolving changes, + // within KubeDirector but also especially with the cluster's app config + // scripts. + resyncPeriod: time.Duration(30) * time.Second, + }, + { + kind: "KubeDirectorConfig", + resyncPeriod: 0, + }, + } + + resource := "kubedirector.bluedata.io/v1alpha1" + for _, w := range watchParams { + logrus.Infof("Watching %s, %s, %s, %d", resource, w.kind, namespace, w.resyncPeriod) + sdk.Watch(resource, w.kind, namespace, w.resyncPeriod) + } sdk.Handle(handler) sdk.Run(context.TODO()) } diff --git a/deploy/example_config/cr-config-okd.yaml b/deploy/example_config/cr-config-okd.yaml new file mode 100644 index 00000000..2537dd0c --- /dev/null +++ b/deploy/example_config/cr-config-okd.yaml @@ -0,0 +1,6 @@ +apiVersion: "kubedirector.bluedata.io/v1alpha1" +kind: "KubeDirectorConfig" +metadata: + name: "kd-global-config" +spec: + nativeSystemdSupport: true diff --git a/deploy/kubedirector/crd-config.yaml b/deploy/kubedirector/crd-config.yaml index 6eab7b39..f38554a1 100644 --- a/deploy/kubedirector/crd-config.yaml +++ b/deploy/kubedirector/crd-config.yaml @@ -27,3 +27,5 @@ spec: defaultServiceType: type: string pattern: '^NodePort$|^LoadBalancer$' + nativeSystemdSupport: + type: boolean diff --git a/pkg/apis/kubedirector.bluedata.io/v1alpha1/register.go b/pkg/apis/kubedirector.bluedata.io/v1alpha1/register.go index f30dac1f..92f0cce8 100644 --- a/pkg/apis/kubedirector.bluedata.io/v1alpha1/register.go +++ b/pkg/apis/kubedirector.bluedata.io/v1alpha1/register.go @@ -43,6 +43,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &KubeDirectorCluster{}, &KubeDirectorClusterList{}, + &KubeDirectorConfig{}, + &KubeDirectorConfigList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/kubedirector.bluedata.io/v1alpha1/types.go b/pkg/apis/kubedirector.bluedata.io/v1alpha1/types.go index c3cc48cd..3eb86c81 100644 --- a/pkg/apis/kubedirector.bluedata.io/v1alpha1/types.go +++ b/pkg/apis/kubedirector.bluedata.io/v1alpha1/types.go @@ -226,6 +226,7 @@ type KubeDirectorConfig struct { // ConfigSpec is the spec provided for an app definition. type ConfigSpec struct { - StorageClass *string `json:"defaultStorageClassName,omitempty"` - ServiceType *string `json:"defaultServiceType,omitempty"` + StorageClass *string `json:"defaultStorageClassName,omitempty"` + ServiceType *string `json:"defaultServiceType,omitempty"` + NativeSystemdSupport bool `json:"nativeSystemdSupport"` } diff --git a/pkg/executor/statefulset.go b/pkg/executor/statefulset.go index fbbbcd66..e79f6494 100644 --- a/pkg/executor/statefulset.go +++ b/pkg/executor/statefulset.go @@ -46,10 +46,11 @@ var defaultMountFolders = []string{"/usr", "/opt", "/var", "/etc"} // implementing the given role. func CreateStatefulSet( cr *kdv1.KubeDirectorCluster, + nativeSystemdSupport bool, role *kdv1.Role, ) (*appsv1.StatefulSet, error) { - statefulSet, err := getStatefulset(cr, role, 0) + statefulSet, err := getStatefulset(cr, nativeSystemdSupport, role, 0) if err != nil { return nil, err } @@ -125,6 +126,7 @@ func DeleteStatefulSet( // given role. func getStatefulset( cr *kdv1.KubeDirectorCluster, + nativeSystemdSupport bool, role *kdv1.Role, replicas int32, ) (*appsv1.StatefulSet, error) { @@ -197,6 +199,7 @@ func getStatefulset( volumeMounts, volumes, volumesErr := generateVolumeMounts( cr, role, + nativeSystemdSupport, persistDirs, ) @@ -382,11 +385,13 @@ func generateInitContainerLaunch(persistDirs []string) string { } // generateVolumeMounts generates all of an app container's volume and mount -// specs for persistent storage, tmpfs, and systemctl support that are -// appropriate for members of the given role. +// specs for persistent storage, tmpfs and systemctl support that are +// appropriate for members of the given role. For systemctl support, +// nativeSystemdSupport flag is examined along with the app requirement. func generateVolumeMounts( cr *kdv1.KubeDirectorCluster, role *kdv1.Role, + nativeSystemdSupport bool, persistDirs []string, ) ([]v1.VolumeMount, []v1.Volume, error) { @@ -407,7 +412,7 @@ func generateVolumeMounts( return volumeMounts, volumes, err } - if isSystemdReqd { + if isSystemdReqd && !nativeSystemdSupport { cgroupVolMnts, cgroupVols := generateSystemdSupport(cr) volumeMounts = append(volumeMounts, cgroupVolMnts...) volumes = append(volumes, cgroupVols...) diff --git a/pkg/reconciler/cluster.go b/pkg/reconciler/cluster.go index db269774..8c551e32 100644 --- a/pkg/reconciler/cluster.go +++ b/pkg/reconciler/cluster.go @@ -34,7 +34,7 @@ import ( func syncCluster( event sdk.Event, cr *kdv1.KubeDirectorCluster, - handlerState *handlerClusterState, + handler *Handler, ) error { // Exit early if deleting the resource. @@ -44,13 +44,13 @@ func syncCluster( shared.EventReasonCluster, "deleted", ) - deleteStatusGen(cr, handlerState) - removeClusterAppReference(cr, handlerState) + deleteStatusGen(cr, handler) + removeClusterAppReference(cr, handler) return nil } // Otherwise, make sure this cluster marks a reference to its app. - ensureClusterAppReference(cr, handlerState) + ensureClusterAppReference(cr, handler) // Make sure we have a Status object to work with. if cr.Status == nil { @@ -68,7 +68,7 @@ func syncCluster( maxWait := 4096 * time.Second for { cr.Status.GenerationUid = uuid.New().String() - writeStatusGen(cr, handlerState, cr.Status.GenerationUid) + writeStatusGen(cr, handler, cr.Status.GenerationUid) updateErr := executor.UpdateStatus(cr) if updateErr == nil { return @@ -105,7 +105,7 @@ func syncCluster( // Ignore stale poll-driven handler for a resource we have since // updated. Also for a new CR just update the status state/gen. - shouldProcessCR := handleStatusGen(cr, handlerState) + shouldProcessCR := handleStatusGen(cr, handler) // Regardless of whether the status gen is as expected, make sure the CR // finalizers are as we want them. We use a finalizer to prevent races @@ -138,7 +138,7 @@ func syncCluster( return clusterServiceErr } - roles, state, rolesErr := syncRoles(cr) + roles, state, rolesErr := syncRoles(cr, handler) if rolesErr != nil { errLog("roles", rolesErr) return rolesErr @@ -199,11 +199,11 @@ func syncCluster( // reject old/stale versions of the CR. func handleStatusGen( cr *kdv1.KubeDirectorCluster, - handlerState *handlerClusterState, + handler *Handler, ) bool { incoming := cr.Status.GenerationUid - lastKnown, ok := ReadStatusGen(cr, handlerState) + lastKnown, ok := ReadStatusGen(cr, handler) if !ok { if incoming == "" { shared.LogInfo( @@ -220,9 +220,9 @@ func handleStatusGen( "unknown with incoming gen uid %s", incoming, ) - writeStatusGen(cr, handlerState, incoming) - ValidateStatusGen(cr, handlerState) - ensureClusterAppReference(cr, handlerState) + writeStatusGen(cr, handler, incoming) + ValidateStatusGen(cr, handler) + ensureClusterAppReference(cr, handler) return true } diff --git a/pkg/reconciler/config.go b/pkg/reconciler/config.go new file mode 100644 index 00000000..db4b6660 --- /dev/null +++ b/pkg/reconciler/config.go @@ -0,0 +1,41 @@ +// Copyright 2018 BlueData Software, Inc. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reconciler + +import ( + kdv1 "github.com/bluek8s/kubedirector/pkg/apis/kubedirector.bluedata.io/v1alpha1" + "github.com/operator-framework/operator-sdk/pkg/sdk" +) + +// syncConfig runs the reconciliation logic for config cr. It is invoked because of a +// change in or addition of a KubeDirectorConfig resource, or a periodic +// polling to check on such a resource. Currently all we do is set the config data +// in handler structure on add/change and on deletes set config data to be nil +func syncConfig( + event sdk.Event, + cr *kdv1.KubeDirectorConfig, + handler *Handler, +) error { + + // Exit early if deleting the resource. + if event.Deleted { + removeGlobalConfig(handler) + return nil + } + + addGlobalConfig(handler, cr) + + return nil +} diff --git a/pkg/reconciler/handler.go b/pkg/reconciler/handler.go index 6c8eee1f..d6feb067 100644 --- a/pkg/reconciler/handler.go +++ b/pkg/reconciler/handler.go @@ -27,11 +27,12 @@ import ( // resources that we are watching. func NewHandler() *Handler { return &Handler{ - ClusterState: handlerClusterState{ - lock: sync.RWMutex{}, + lock: sync.RWMutex{}, + clusterState: handlerClusterState{ clusterStatusGens: make(map[types.UID]StatusGen), clusterAppTypes: make(map[string]string), }, + globalConfig: nil, } } @@ -40,7 +41,9 @@ func NewHandler() *Handler { func (h *Handler) Handle(ctx context.Context, event sdk.Event) error { switch o := event.Object.(type) { case *v1alpha1.KubeDirectorCluster: - return syncCluster(event, o, &(h.ClusterState)) + return syncCluster(event, o, h) + case *v1alpha1.KubeDirectorConfig: + return syncConfig(event, o, h) } return nil } diff --git a/pkg/reconciler/roles.go b/pkg/reconciler/roles.go index b1c111d8..adda6100 100644 --- a/pkg/reconciler/roles.go +++ b/pkg/reconciler/roles.go @@ -34,6 +34,7 @@ import ( // can be referenced by the later syncs for other concerns. func syncRoles( cr *kdv1.KubeDirectorCluster, + handler *Handler, ) ([]*roleInfo, clusterStateInternal, error) { // Construct the role info slice. Bail out now if that fails. @@ -61,14 +62,14 @@ func syncRoles( switch { case r.statefulSet == nil && r.roleStatus == nil: // Role did not previously exist. Create it now. - createErr := handleRoleCreate(cr, r, &anyMembersChanged) + createErr := handleRoleCreate(cr, r, handler, &anyMembersChanged) if createErr != nil { return nil, clusterMembersUnknown, createErr } case r.statefulSet == nil && r.roleStatus != nil: // Role exists but there is no statefulset for it in k8s. // Hmm, weird. Statefulset was deleted out-of-band? Let's fix. - reCreateErr := handleRoleReCreate(cr, r, &anyMembersChanged) + reCreateErr := handleRoleReCreate(cr, r, handler, &anyMembersChanged) if reCreateErr != nil { return nil, clusterMembersUnknown, reCreateErr } @@ -244,6 +245,7 @@ func calcRoleMembersByState( func handleRoleCreate( cr *kdv1.KubeDirectorCluster, role *roleInfo, + handler *Handler, anyMembersChanged *bool, ) error { @@ -260,9 +262,12 @@ func handleRoleCreate( role.roleSpec.Name, ) + nativeSystemdSupport := getNativeSystemdSupport(handler) + // Create the associated statefulset. statefulSet, createErr := executor.CreateStatefulSet( cr, + nativeSystemdSupport, role.roleSpec, ) if createErr != nil { @@ -304,6 +309,7 @@ func handleRoleCreate( func handleRoleReCreate( cr *kdv1.KubeDirectorCluster, role *roleInfo, + handler *Handler, anyMembersChanged *bool, ) error { @@ -314,7 +320,7 @@ func handleRoleReCreate( role.roleStatus.StatefulSet = "" } else { // Create a new statefulset for the role. - return handleRoleCreate(cr, role, anyMembersChanged) + return handleRoleCreate(cr, role, handler, anyMembersChanged) } } else { shared.LogInfof( diff --git a/pkg/reconciler/types.go b/pkg/reconciler/types.go index 0ec08f7a..664da860 100644 --- a/pkg/reconciler/types.go +++ b/pkg/reconciler/types.go @@ -17,6 +17,7 @@ package reconciler import ( "sync" + "github.com/bluek8s/kubedirector/pkg/apis/kubedirector.bluedata.io/v1alpha1" kdv1 "github.com/bluek8s/kubedirector/pkg/apis/kubedirector.bluedata.io/v1alpha1" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/types" @@ -28,13 +29,14 @@ type StatusGen struct { } type handlerClusterState struct { - lock sync.RWMutex clusterStatusGens map[types.UID]StatusGen clusterAppTypes map[string]string } type Handler struct { - ClusterState handlerClusterState + lock sync.RWMutex + clusterState handlerClusterState + globalConfig *v1alpha1.KubeDirectorConfig } type clusterState string diff --git a/pkg/reconciler/util.go b/pkg/reconciler/util.go index f6502a99..4ad7fe9a 100644 --- a/pkg/reconciler/util.go +++ b/pkg/reconciler/util.go @@ -22,11 +22,11 @@ import ( // validated flag. func ReadStatusGen( cr *kdv1.KubeDirectorCluster, - handlerState *handlerClusterState, + handler *Handler, ) (StatusGen, bool) { - handlerState.lock.RLock() - defer handlerState.lock.RUnlock() - val, ok := handlerState.clusterStatusGens[cr.UID] + handler.lock.RLock() + defer handler.lock.RUnlock() + val, ok := handler.clusterState.clusterStatusGens[cr.UID] return val, ok } @@ -34,46 +34,46 @@ func ReadStatusGen( // The validated flag will begin as false. func writeStatusGen( cr *kdv1.KubeDirectorCluster, - handlerState *handlerClusterState, + handler *Handler, newGenUid string, ) { - handlerState.lock.Lock() - defer handlerState.lock.Unlock() - handlerState.clusterStatusGens[cr.UID] = StatusGen{Uid: newGenUid} + handler.lock.Lock() + defer handler.lock.Unlock() + handler.clusterState.clusterStatusGens[cr.UID] = StatusGen{Uid: newGenUid} } // ValidateStatusGen provides threadsafe mark-validated of a status gen. func ValidateStatusGen( cr *kdv1.KubeDirectorCluster, - handlerState *handlerClusterState, + handler *Handler, ) { - handlerState.lock.Lock() - defer handlerState.lock.Unlock() - val, ok := handlerState.clusterStatusGens[cr.UID] + handler.lock.Lock() + defer handler.lock.Unlock() + val, ok := handler.clusterState.clusterStatusGens[cr.UID] if ok { val.Validated = true - handlerState.clusterStatusGens[cr.UID] = val + handler.clusterState.clusterStatusGens[cr.UID] = val } } // deleteStatusGen provides threadsafe delete of a status gen. func deleteStatusGen( cr *kdv1.KubeDirectorCluster, - handlerState *handlerClusterState, + handler *Handler, ) { - handlerState.lock.Lock() - defer handlerState.lock.Unlock() - delete(handlerState.clusterStatusGens, cr.UID) + handler.lock.Lock() + defer handler.lock.Unlock() + delete(handler.clusterState.clusterStatusGens, cr.UID) } // ClustersUsingApp returns the list of cluster names referencing the given app. func ClustersUsingApp( app string, - handlerState *handlerClusterState, + handler *Handler, ) []string { var clusters []string - handlerState.lock.RLock() - defer handlerState.lock.RUnlock() + handler.lock.RLock() + defer handler.lock.RUnlock() // This is a relationship that needs to be query-able given either ONLY // the app name (in this function) or ONLY the cluster name (in // removeClusterAppReference). Since the app CR deletion/update triggers @@ -81,7 +81,7 @@ func ClustersUsingApp( // check by just walking the list of associations. It's also nice to go // ahead and gather all the offending cluster CR names to report back to // the client. - for clusterKey, appName := range handlerState.clusterAppTypes { + for clusterKey, appName := range handler.clusterState.clusterAppTypes { if appName == app { clusters = append(clusters, clusterKey) } @@ -92,22 +92,52 @@ func ClustersUsingApp( // ensureClusterAppReference notes that an app type is in use by this cluster. func ensureClusterAppReference( cr *kdv1.KubeDirectorCluster, - handlerState *handlerClusterState, + handler *Handler, ) { clusterKey := cr.Namespace + "/" + cr.Name - handlerState.lock.Lock() - defer handlerState.lock.Unlock() - handlerState.clusterAppTypes[clusterKey] = cr.Spec.AppID + handler.lock.Lock() + defer handler.lock.Unlock() + handler.clusterState.clusterAppTypes[clusterKey] = cr.Spec.AppID } // removeClusterAppReference notes that an app type is no longer in use by // this cluster. func removeClusterAppReference( cr *kdv1.KubeDirectorCluster, - handlerState *handlerClusterState, + handler *Handler, ) { clusterKey := cr.Namespace + "/" + cr.Name - handlerState.lock.Lock() - defer handlerState.lock.Unlock() - delete(handlerState.clusterAppTypes, clusterKey) + handler.lock.Lock() + defer handler.lock.Unlock() + delete(handler.clusterState.clusterAppTypes, clusterKey) +} + +// removeGlobalConfig removes the globalConfig from handler structure +func removeGlobalConfig(handler *Handler) { + handler.lock.Lock() + defer handler.lock.Unlock() + handler.globalConfig = nil +} + +// addGlobalConfig adds the globalConfig CR data to handler structure +func addGlobalConfig( + handler *Handler, + cr *kdv1.KubeDirectorConfig, +) { + handler.lock.Lock() + defer handler.lock.Unlock() + handler.globalConfig = cr +} + +// getNativeSystemdSupport extracts the flag definition from globalConfig CR data +// if present, otherwise returns false +func getNativeSystemdSupport( + handler *Handler, +) bool { + handler.lock.RLock() + defer handler.lock.RUnlock() + if handler.globalConfig != nil { + return handler.globalConfig.Spec.NativeSystemdSupport + } + return false } diff --git a/pkg/validator/app.go b/pkg/validator/app.go index 69953118..748a3152 100644 --- a/pkg/validator/app.go +++ b/pkg/validator/app.go @@ -185,7 +185,7 @@ func admitAppCR( if ar.Request.Operation == v1beta1.Update || ar.Request.Operation == v1beta1.Delete { references := reconciler.ClustersUsingApp( ar.Request.Name, - &(handlerState.ClusterState), + handlerState, ) if len(references) != 0 { referencesStr := strings.Join(references, ", ") diff --git a/pkg/validator/cluster.go b/pkg/validator/cluster.go index dcf1ba40..23c1f486 100644 --- a/pkg/validator/cluster.go +++ b/pkg/validator/cluster.go @@ -418,7 +418,7 @@ func admitClusterCR( if clusterCR.Status != nil { expectedStatusGen, ok := reconciler.ReadStatusGen( &clusterCR, - &(handlerState.ClusterState), + handlerState, ) // Reject this write if either of: // - KubeDirector doesn't know about the cluster resource @@ -443,7 +443,7 @@ func admitClusterCR( } reconciler.ValidateStatusGen( &clusterCR, - &(handlerState.ClusterState), + handlerState, ) // Shortcut out of here if the spec is not being changed. Among other