Skip to content

Commit

Permalink
fix juicefs cache capacity (#2186)
Browse files Browse the repository at this point in the history
Signed-off-by: zwwhdls <zww@hdls.me>

Signed-off-by: zwwhdls <zww@hdls.me>
Signed-off-by: cheyang <cheyang@163.com>
  • Loading branch information
zwwhdls authored and cheyang committed Jan 3, 2023
1 parent c01bea6 commit f5ec9a1
Show file tree
Hide file tree
Showing 4 changed files with 421 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pkg/ddc/juicefs/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (j *JuiceFSEngine) queryCacheStatus() (states cacheStates, err error) {
err = e
return
}
states.cacheCapacity = utils.BytesSize(float64(cachesize))
states.cacheCapacity = utils.BytesSize(float64(cachesize * uint64(j.runtime.Spec.Replicas)))
}

var pods []v1.Pod
Expand Down
1 change: 1 addition & 0 deletions pkg/ddc/juicefs/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (j *JuiceFSEngine) CheckAndUpdateRuntimeStatus() (ready bool, err error) {
if err != nil {
return err
}
j.runtime = runtime

runtimeToUpdate := runtime.DeepCopy()
if reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) {
Expand Down
132 changes: 131 additions & 1 deletion pkg/ddc/juicefs/sync_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,139 @@ limitations under the License.

package juicefs

import cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
import (
"context"
"reflect"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/ctrl"
fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
)

// SyncRuntime syncs the runtime spec
func (j *JuiceFSEngine) SyncRuntime(ctx cruntime.ReconcileRequestContext) (changed bool, err error) {
runtime, err := j.getRuntime()
if err != nil {
return changed, err
}

// 1. sync workers
workerChanged, err := j.syncWorkerSpec(ctx, runtime)
if err != nil {
return
}
if workerChanged {
j.Log.Info("Worker Spec is updated", "name", ctx.Name, "namespace", ctx.Namespace)
return workerChanged, err
}

// 2. sync fuse
fuseChanged, err := j.syncFuseSpec(ctx, runtime)
if err != nil {
return
}
if fuseChanged {
j.Log.Info("Fuse Spec is updated", "name", ctx.Name, "namespace", ctx.Namespace)
return fuseChanged, err
}
return
}

func (j *JuiceFSEngine) syncWorkerSpec(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.JuiceFSRuntime) (changed bool, err error) {
j.Log.V(1).Info("syncWorkerSpec")
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
workers, err := ctrl.GetWorkersAsStatefulset(j.Client,
types.NamespacedName{Namespace: j.namespace, Name: j.getWorkerName()})
if err != nil {
return err
}

workersToUpdate := workers.DeepCopy()
if len(workersToUpdate.Spec.Template.Spec.Containers) == 1 {
workerResources := runtime.Spec.Worker.Resources
if !utils.ResourceRequirementsEqual(workersToUpdate.Spec.Template.Spec.Containers[0].Resources, workerResources) {
j.Log.Info("The resource requirement is different.", "worker sts", workersToUpdate.Spec.Template.Spec.Containers[0].Resources, "runtime", workerResources)
workersToUpdate.Spec.Template.Spec.Containers[0].Resources = workerResources
changed = true
} else {
j.Log.V(1).Info("The resource requirement of workers is the same, skip")
}

if changed {
if reflect.DeepEqual(workers, workersToUpdate) {
changed = false
j.Log.V(1).Info("The resource requirement of worker is not changed, skip")
return nil
}
j.Log.Info("The resource requirement of worker is updated")

err = j.Client.Update(context.TODO(), workersToUpdate)
if err != nil {
j.Log.Error(err, "Failed to update the sts spec")
}
} else {
j.Log.V(1).Info("The resource requirement of worker is not set, skip")
}
}

return err
})

if fluiderrs.IsDeprecated(err) {
j.Log.Info("Warning: the current runtime is created by runtime controller before v0.7.0, update specs are not supported. To support these features, please create a new dataset", "details", err)
return false, nil
}

return
}

func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.JuiceFSRuntime) (changed bool, err error) {
j.Log.V(1).Info("syncFuseSpec")
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
fuses, err := kubeclient.GetDaemonset(j.Client, j.getFuseDaemonsetName(), j.namespace)
if err != nil {
return err
}

fusesToUpdate := fuses.DeepCopy()
if len(fusesToUpdate.Spec.Template.Spec.Containers) == 1 {
fuseResource := runtime.Spec.Fuse.Resources
if !utils.ResourceRequirementsEqual(fusesToUpdate.Spec.Template.Spec.Containers[0].Resources, fuseResource) {
j.Log.Info("The resource requirement is different.", "fuse ds", fuses.Spec.Template.Spec.Containers[0].Resources, "runtime", fuseResource)
fusesToUpdate.Spec.Template.Spec.Containers[0].Resources = fuseResource
changed = true
} else {
j.Log.V(1).Info("The resource requirement of fuse is the same, skip")
}

if changed {
if reflect.DeepEqual(fuses, fusesToUpdate) {
changed = false
j.Log.V(1).Info("The resource requirement of fuse is not changed, skip")
return nil
}
j.Log.Info("The resource requirement of fuse is updated")
err = j.Client.Update(context.TODO(), fusesToUpdate)
if err != nil {
j.Log.Error(err, "Failed to update the sts spec")
}
} else {
j.Log.V(1).Info("The resource requirement of fuse is not set, skip")
}
}

return err
})

if fluiderrs.IsDeprecated(err) {
j.Log.Info("Warning: the current runtime is created by runtime controller before v0.7.0, update specs are not supported. To support these features, please create a new dataset", "details", err)
return false, nil
}
return
}
Loading

0 comments on commit f5ec9a1

Please sign in to comment.