Skip to content

Commit

Permalink
Merge pull request #116 from swamibluedata/swami-systemd-native
Browse files Browse the repository at this point in the history
Support for nativeSystemdSupport flag in config CR
  • Loading branch information
joel-bluedata committed Dec 4, 2018
2 parents 37d0ed8 + 36c43b6 commit 928caad
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 69 deletions.
38 changes: 27 additions & 11 deletions cmd/kubedirector/main.go
Expand Up @@ -42,20 +42,10 @@ func main() {

sdk.ExposeMetricsPort()

resource := "kubedirector.bluedata.io/v1alpha1"
kind := "KubeDirectorCluster"
namespace, err := k8sutil.GetWatchNamespace()
if err != nil {
logrus.Fatalf("failed to get watch namespace: %v", err)
}
// The resync period essentially determines how granularly we can detect
// the completion of cluster config changes. Making this too small can
// actually be bad in that there is benefit to batch-resolving changes,
// within KubeDirector but also especially with the cluster's app config
// scripts.
resyncPeriod := time.Duration(30) * time.Second
logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod)

// Fetch our deployment object
kdName, err := k8sutil.GetOperatorName()
if err != nil {
Expand All @@ -78,7 +68,33 @@ func main() {
validator.StartValidationServer(handler)
}()

sdk.Watch(resource, kind, namespace, resyncPeriod)
type watchInfo struct {
kind string
resyncPeriod time.Duration
}

// Add all CR kinds that we want to watch.
watchParams := []watchInfo{
{
kind: "KubeDirectorCluster",
// The resync period essentially determines how granularly we can detect
// the completion of cluster config changes. Making this too small can
// actually be bad in that there is benefit to batch-resolving changes,
// within KubeDirector but also especially with the cluster's app config
// scripts.
resyncPeriod: time.Duration(30) * time.Second,
},
{
kind: "KubeDirectorConfig",
resyncPeriod: 0,
},
}

resource := "kubedirector.bluedata.io/v1alpha1"
for _, w := range watchParams {
logrus.Infof("Watching %s, %s, %s, %d", resource, w.kind, namespace, w.resyncPeriod)
sdk.Watch(resource, w.kind, namespace, w.resyncPeriod)
}
sdk.Handle(handler)
sdk.Run(context.TODO())
}
6 changes: 6 additions & 0 deletions deploy/example_config/cr-config-okd.yaml
@@ -0,0 +1,6 @@
apiVersion: "kubedirector.bluedata.io/v1alpha1"
kind: "KubeDirectorConfig"
metadata:
name: "kd-global-config"
spec:
nativeSystemdSupport: true
2 changes: 2 additions & 0 deletions deploy/kubedirector/crd-config.yaml
Expand Up @@ -27,3 +27,5 @@ spec:
defaultServiceType:
type: string
pattern: '^NodePort$|^LoadBalancer$'
nativeSystemdSupport:
type: boolean
2 changes: 2 additions & 0 deletions pkg/apis/kubedirector.bluedata.io/v1alpha1/register.go
Expand Up @@ -43,6 +43,8 @@ func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&KubeDirectorCluster{},
&KubeDirectorClusterList{},
&KubeDirectorConfig{},
&KubeDirectorConfigList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/apis/kubedirector.bluedata.io/v1alpha1/types.go
Expand Up @@ -226,6 +226,7 @@ type KubeDirectorConfig struct {

// ConfigSpec is the spec provided for an app definition.
type ConfigSpec struct {
StorageClass *string `json:"defaultStorageClassName,omitempty"`
ServiceType *string `json:"defaultServiceType,omitempty"`
StorageClass *string `json:"defaultStorageClassName,omitempty"`
ServiceType *string `json:"defaultServiceType,omitempty"`
NativeSystemdSupport bool `json:"nativeSystemdSupport"`
}
13 changes: 9 additions & 4 deletions pkg/executor/statefulset.go
Expand Up @@ -46,10 +46,11 @@ var defaultMountFolders = []string{"/usr", "/opt", "/var", "/etc"}
// implementing the given role.
func CreateStatefulSet(
cr *kdv1.KubeDirectorCluster,
nativeSystemdSupport bool,
role *kdv1.Role,
) (*appsv1.StatefulSet, error) {

statefulSet, err := getStatefulset(cr, role, 0)
statefulSet, err := getStatefulset(cr, nativeSystemdSupport, role, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -125,6 +126,7 @@ func DeleteStatefulSet(
// given role.
func getStatefulset(
cr *kdv1.KubeDirectorCluster,
nativeSystemdSupport bool,
role *kdv1.Role,
replicas int32,
) (*appsv1.StatefulSet, error) {
Expand Down Expand Up @@ -197,6 +199,7 @@ func getStatefulset(
volumeMounts, volumes, volumesErr := generateVolumeMounts(
cr,
role,
nativeSystemdSupport,
persistDirs,
)

Expand Down Expand Up @@ -382,11 +385,13 @@ func generateInitContainerLaunch(persistDirs []string) string {
}

// generateVolumeMounts generates all of an app container's volume and mount
// specs for persistent storage, tmpfs, and systemctl support that are
// appropriate for members of the given role.
// specs for persistent storage, tmpfs and systemctl support that are
// appropriate for members of the given role. For systemctl support,
// nativeSystemdSupport flag is examined along with the app requirement.
func generateVolumeMounts(
cr *kdv1.KubeDirectorCluster,
role *kdv1.Role,
nativeSystemdSupport bool,
persistDirs []string,
) ([]v1.VolumeMount, []v1.Volume, error) {

Expand All @@ -407,7 +412,7 @@ func generateVolumeMounts(
return volumeMounts, volumes, err
}

if isSystemdReqd {
if isSystemdReqd && !nativeSystemdSupport {
cgroupVolMnts, cgroupVols := generateSystemdSupport(cr)
volumeMounts = append(volumeMounts, cgroupVolMnts...)
volumes = append(volumes, cgroupVols...)
Expand Down
24 changes: 12 additions & 12 deletions pkg/reconciler/cluster.go
Expand Up @@ -34,7 +34,7 @@ import (
func syncCluster(
event sdk.Event,
cr *kdv1.KubeDirectorCluster,
handlerState *handlerClusterState,
handler *Handler,
) error {

// Exit early if deleting the resource.
Expand All @@ -44,13 +44,13 @@ func syncCluster(
shared.EventReasonCluster,
"deleted",
)
deleteStatusGen(cr, handlerState)
removeClusterAppReference(cr, handlerState)
deleteStatusGen(cr, handler)
removeClusterAppReference(cr, handler)
return nil
}

// Otherwise, make sure this cluster marks a reference to its app.
ensureClusterAppReference(cr, handlerState)
ensureClusterAppReference(cr, handler)

// Make sure we have a Status object to work with.
if cr.Status == nil {
Expand All @@ -68,7 +68,7 @@ func syncCluster(
maxWait := 4096 * time.Second
for {
cr.Status.GenerationUid = uuid.New().String()
writeStatusGen(cr, handlerState, cr.Status.GenerationUid)
writeStatusGen(cr, handler, cr.Status.GenerationUid)
updateErr := executor.UpdateStatus(cr)
if updateErr == nil {
return
Expand Down Expand Up @@ -105,7 +105,7 @@ func syncCluster(

// Ignore stale poll-driven handler for a resource we have since
// updated. Also for a new CR just update the status state/gen.
shouldProcessCR := handleStatusGen(cr, handlerState)
shouldProcessCR := handleStatusGen(cr, handler)

// Regardless of whether the status gen is as expected, make sure the CR
// finalizers are as we want them. We use a finalizer to prevent races
Expand Down Expand Up @@ -138,7 +138,7 @@ func syncCluster(
return clusterServiceErr
}

roles, state, rolesErr := syncRoles(cr)
roles, state, rolesErr := syncRoles(cr, handler)
if rolesErr != nil {
errLog("roles", rolesErr)
return rolesErr
Expand Down Expand Up @@ -199,11 +199,11 @@ func syncCluster(
// reject old/stale versions of the CR.
func handleStatusGen(
cr *kdv1.KubeDirectorCluster,
handlerState *handlerClusterState,
handler *Handler,
) bool {

incoming := cr.Status.GenerationUid
lastKnown, ok := ReadStatusGen(cr, handlerState)
lastKnown, ok := ReadStatusGen(cr, handler)
if !ok {
if incoming == "" {
shared.LogInfo(
Expand All @@ -220,9 +220,9 @@ func handleStatusGen(
"unknown with incoming gen uid %s",
incoming,
)
writeStatusGen(cr, handlerState, incoming)
ValidateStatusGen(cr, handlerState)
ensureClusterAppReference(cr, handlerState)
writeStatusGen(cr, handler, incoming)
ValidateStatusGen(cr, handler)
ensureClusterAppReference(cr, handler)
return true
}

Expand Down
41 changes: 41 additions & 0 deletions pkg/reconciler/config.go
@@ -0,0 +1,41 @@
// Copyright 2018 BlueData Software, Inc.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reconciler

import (
kdv1 "github.com/bluek8s/kubedirector/pkg/apis/kubedirector.bluedata.io/v1alpha1"
"github.com/operator-framework/operator-sdk/pkg/sdk"
)

// syncConfig runs the reconciliation logic for config cr. It is invoked because of a
// change in or addition of a KubeDirectorConfig resource, or a periodic
// polling to check on such a resource. Currently all we do is set the config data
// in handler structure on add/change and on deletes set config data to be nil
func syncConfig(
event sdk.Event,
cr *kdv1.KubeDirectorConfig,
handler *Handler,
) error {

// Exit early if deleting the resource.
if event.Deleted {
removeGlobalConfig(handler)
return nil
}

addGlobalConfig(handler, cr)

return nil
}
9 changes: 6 additions & 3 deletions pkg/reconciler/handler.go
Expand Up @@ -27,11 +27,12 @@ import (
// resources that we are watching.
func NewHandler() *Handler {
return &Handler{
ClusterState: handlerClusterState{
lock: sync.RWMutex{},
lock: sync.RWMutex{},
clusterState: handlerClusterState{
clusterStatusGens: make(map[types.UID]StatusGen),
clusterAppTypes: make(map[string]string),
},
globalConfig: nil,
}
}

Expand All @@ -40,7 +41,9 @@ func NewHandler() *Handler {
func (h *Handler) Handle(ctx context.Context, event sdk.Event) error {
switch o := event.Object.(type) {
case *v1alpha1.KubeDirectorCluster:
return syncCluster(event, o, &(h.ClusterState))
return syncCluster(event, o, h)
case *v1alpha1.KubeDirectorConfig:
return syncConfig(event, o, h)
}
return nil
}
12 changes: 9 additions & 3 deletions pkg/reconciler/roles.go
Expand Up @@ -34,6 +34,7 @@ import (
// can be referenced by the later syncs for other concerns.
func syncRoles(
cr *kdv1.KubeDirectorCluster,
handler *Handler,
) ([]*roleInfo, clusterStateInternal, error) {

// Construct the role info slice. Bail out now if that fails.
Expand Down Expand Up @@ -61,14 +62,14 @@ func syncRoles(
switch {
case r.statefulSet == nil && r.roleStatus == nil:
// Role did not previously exist. Create it now.
createErr := handleRoleCreate(cr, r, &anyMembersChanged)
createErr := handleRoleCreate(cr, r, handler, &anyMembersChanged)
if createErr != nil {
return nil, clusterMembersUnknown, createErr
}
case r.statefulSet == nil && r.roleStatus != nil:
// Role exists but there is no statefulset for it in k8s.
// Hmm, weird. Statefulset was deleted out-of-band? Let's fix.
reCreateErr := handleRoleReCreate(cr, r, &anyMembersChanged)
reCreateErr := handleRoleReCreate(cr, r, handler, &anyMembersChanged)
if reCreateErr != nil {
return nil, clusterMembersUnknown, reCreateErr
}
Expand Down Expand Up @@ -244,6 +245,7 @@ func calcRoleMembersByState(
func handleRoleCreate(
cr *kdv1.KubeDirectorCluster,
role *roleInfo,
handler *Handler,
anyMembersChanged *bool,
) error {

Expand All @@ -260,9 +262,12 @@ func handleRoleCreate(
role.roleSpec.Name,
)

nativeSystemdSupport := getNativeSystemdSupport(handler)

// Create the associated statefulset.
statefulSet, createErr := executor.CreateStatefulSet(
cr,
nativeSystemdSupport,
role.roleSpec,
)
if createErr != nil {
Expand Down Expand Up @@ -304,6 +309,7 @@ func handleRoleCreate(
func handleRoleReCreate(
cr *kdv1.KubeDirectorCluster,
role *roleInfo,
handler *Handler,
anyMembersChanged *bool,
) error {

Expand All @@ -314,7 +320,7 @@ func handleRoleReCreate(
role.roleStatus.StatefulSet = ""
} else {
// Create a new statefulset for the role.
return handleRoleCreate(cr, role, anyMembersChanged)
return handleRoleCreate(cr, role, handler, anyMembersChanged)
}
} else {
shared.LogInfof(
Expand Down
6 changes: 4 additions & 2 deletions pkg/reconciler/types.go
Expand Up @@ -17,6 +17,7 @@ package reconciler
import (
"sync"

"github.com/bluek8s/kubedirector/pkg/apis/kubedirector.bluedata.io/v1alpha1"
kdv1 "github.com/bluek8s/kubedirector/pkg/apis/kubedirector.bluedata.io/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -28,13 +29,14 @@ type StatusGen struct {
}

type handlerClusterState struct {
lock sync.RWMutex
clusterStatusGens map[types.UID]StatusGen
clusterAppTypes map[string]string
}

type Handler struct {
ClusterState handlerClusterState
lock sync.RWMutex
clusterState handlerClusterState
globalConfig *v1alpha1.KubeDirectorConfig
}

type clusterState string
Expand Down

0 comments on commit 928caad

Please sign in to comment.