Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ccremer committed Aug 24, 2022
1 parent aead2d7 commit f964a9d
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 287 deletions.
120 changes: 120 additions & 0 deletions pkg/operator/jobcontroller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package jobcontroller

import (
"context"
"fmt"

"github.com/ccremer/clustercode/pkg/api/v1alpha1"
"github.com/ccremer/clustercode/pkg/internal/pipe"
internaltypes "github.com/ccremer/clustercode/pkg/internal/types"
"github.com/ccremer/clustercode/pkg/internal/utils"
pipeline "github.com/ccremer/go-command-pipeline"
"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type (
// JobProvisioner reconciles Job objects
JobProvisioner struct {
Client client.Client
Log logr.Logger
}
// JobContext holds the parameters of a single reconciliation
JobContext struct {
context.Context
resolver pipeline.DependencyResolver[*JobContext]

job *batchv1.Job
jobType internaltypes.ClusterCodeJobType
task *v1alpha1.Task
log logr.Logger
sliceIndex int
}
)

func (r *JobProvisioner) NewObject() *batchv1.Job {
return &batchv1.Job{}
}

func (r *JobProvisioner) Provision(ctx context.Context, obj *batchv1.Job) (reconcile.Result, error) {
pctx := &JobContext{job: obj, Context: ctx, resolver: pipeline.NewDependencyRecorder[*JobContext]()}

if !r.isJobComplete(obj) {
r.Log.V(1).Info("job is not completed yet, ignoring reconcile")
return reconcile.Result{}, nil
}

p := pipeline.NewPipeline[*JobContext]().WithBeforeHooks(pipe.DebugLogger[*JobContext](pctx))
p.WithSteps(
p.NewStep("determine job type", r.getJobType),
p.NewStep("fetch task", r.fetchTask),
p.When(r.isJobType(internaltypes.JobTypeSplit), "reconcile split job", r.ensureCountJob),
p.WithNestedSteps("reconcile slice job", r.isJobType(internaltypes.JobTypeSlice),
p.NewStep("determine ", r.determineSliceIndex),
p.NewStep("update status", r.updateStatus),
),
p.When(r.isJobType(internaltypes.JobTypeMerge), "reconcile merge job", r.ensureCleanupJob),
)
err := p.RunWithContext(pctx)
return reconcile.Result{}, err
}

func (r *JobProvisioner) Deprovision(_ context.Context, _ *batchv1.Job) (reconcile.Result, error) {
r.Log.V(1).Info("job is being deleted, ignoring reconcile")
return reconcile.Result{}, nil
}

func (r *JobProvisioner) isJobComplete(job *batchv1.Job) bool {
conditions := castConditions(job.Status.Conditions)
return meta.IsStatusConditionPresentAndEqual(conditions, string(batchv1.JobComplete), metav1.ConditionTrue)
}

func (r *JobProvisioner) isJobType(jobType internaltypes.ClusterCodeJobType) func(ctx *JobContext) bool {
return func(ctx *JobContext) bool {
return ctx.jobType == jobType
}
}

func (r *JobProvisioner) getJobType(ctx *JobContext) error {
jobType, err := getJobType(ctx.job)
ctx.jobType = jobType
return err
}

func (r *JobProvisioner) fetchTask(ctx *JobContext) error {
ctx.task = &v1alpha1.Task{}
err := r.Client.Get(ctx, utils.GetOwner(ctx.job), ctx.task)
return err
}

func getJobType(job *batchv1.Job) (internaltypes.ClusterCodeJobType, error) {
set := labels.Set(job.Labels)
if !set.Has(internaltypes.ClustercodeTypeLabelKey) {
return "", fmt.Errorf("missing label key '%s", internaltypes.ClustercodeTypeLabelKey)
}
label := set.Get(internaltypes.ClustercodeTypeLabelKey)
for _, jobType := range internaltypes.JobTypes {
if label == string(jobType) {
return jobType, nil
}
}
return "", fmt.Errorf("value of label '%s' unrecognized: %s", internaltypes.ClustercodeTypeLabelKey, label)
}

func castConditions(conditions []batchv1.JobCondition) (converted []metav1.Condition) {
for _, c := range conditions {
converted = append(converted, metav1.Condition{
Type: string(c.Type),
Status: metav1.ConditionStatus(c.Status),
LastTransitionTime: c.LastTransitionTime,
Reason: c.Reason,
Message: c.Message,
})
}
return converted
}
160 changes: 160 additions & 0 deletions pkg/operator/jobcontroller/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package jobcontroller

import (
"fmt"
"path/filepath"
"strconv"

"github.com/ccremer/clustercode/pkg/api/v1alpha1"
internaltypes "github.com/ccremer/clustercode/pkg/internal/types"
"github.com/ccremer/clustercode/pkg/internal/utils"
"github.com/ccremer/clustercode/pkg/operator/blueprintcontroller"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

func (r *JobProvisioner) ensureCountJob(ctx *JobContext) error {
ctx.resolver.MustRequireDependencyByFuncName(r.fetchTask)

taskId := ctx.task.Spec.TaskId
intermediateMountRoot := filepath.Join("/clustercode", internaltypes.IntermediateSubMountPath)
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%.*s-%s", 62-len(internaltypes.JobTypeCount), taskId, internaltypes.JobTypeCount),
Namespace: ctx.job.Namespace,
},
}
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, job, func() error {
job.Labels = labels.Merge(job.Labels, labels.Merge(internaltypes.ClusterCodeLabels, labels.Merge(internaltypes.JobTypeCount.AsLabels(), taskId.AsLabels())))
job.Spec = batchv1.JobSpec{
BackoffLimit: pointer.Int32Ptr(0),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
ServiceAccountName: ctx.job.Spec.Template.Spec.ServiceAccountName,
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
{
Name: "clustercode",
Image: blueprintcontroller.DefaultClusterCodeContainerImage,
ImagePullPolicy: corev1.PullIfNotPresent,
Args: []string{
"-d",
"count",
"--task-name=" + ctx.task.Name,
"--namespace=" + ctx.job.Namespace,
},
VolumeMounts: []corev1.VolumeMount{
{Name: internaltypes.IntermediateSubMountPath, MountPath: intermediateMountRoot, SubPath: ctx.task.Spec.Storage.IntermediatePvc.SubPath},
},
},
},
Volumes: []corev1.Volume{
{
Name: internaltypes.IntermediateSubMountPath,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: ctx.task.Spec.Storage.IntermediatePvc.ClaimName,
},
},
},
},
},
},
}
return controllerutil.SetControllerReference(ctx.task, job, r.Client.Scheme())
})
return err
}

func (r *JobProvisioner) ensureCleanupJob(ctx *JobContext) error {
ctx.resolver.MustRequireDependencyByFuncName(r.fetchTask)

taskId := ctx.task.Spec.TaskId
job := &batchv1.Job{ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%.*s-%s", 62-len(internaltypes.JobTypeCleanup), taskId, internaltypes.JobTypeCleanup),
Namespace: ctx.job.Namespace,
}}

_, err := controllerutil.CreateOrUpdate(ctx, r.Client, job, func() error {
job.Labels = labels.Merge(job.Labels, labels.Merge(internaltypes.ClusterCodeLabels, labels.Merge(internaltypes.JobTypeCleanup.AsLabels(), taskId.AsLabels())))
job.Spec = batchv1.JobSpec{
BackoffLimit: pointer.Int32Ptr(0),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
SecurityContext: &corev1.PodSecurityContext{
RunAsUser: pointer.Int64Ptr(1000),
RunAsGroup: pointer.Int64Ptr(0),
FSGroup: pointer.Int64Ptr(0),
},
ServiceAccountName: ctx.job.Spec.Template.Spec.ServiceAccountName,
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
{
Name: "clustercode",
Image: blueprintcontroller.DefaultClusterCodeContainerImage,
ImagePullPolicy: corev1.PullIfNotPresent,
Args: []string{
"-d",
"--namespace=" + ctx.job.Namespace,
"cleanup",
"--task-name=" + ctx.task.Name,
},
},
},
},
},
}
job.Spec.Template.Spec.Containers[0].VolumeMounts = append(job.Spec.Template.Spec.Containers[0].VolumeMounts,
corev1.VolumeMount{Name: internaltypes.SourceSubMountPath, MountPath: filepath.Join("/clustercode"), SubPath: internaltypes.SourceSubMountPath})
job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, corev1.Volume{
Name: internaltypes.SourceSubMountPath,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: volume.ClaimName,
},
}})
utils.AddPvcVolume(job, internaltypes.SourceSubMountPath, filepath.Join("/clustercode", internaltypes.SourceSubMountPath), ctx.task.Spec.Storage.SourcePvc)
utils.AddPvcVolume(job, internaltypes.IntermediateSubMountPath, filepath.Join("/clustercode", internaltypes.IntermediateSubMountPath), ctx.task.Spec.Storage.IntermediatePvc)

return controllerutil.SetOwnerReference(ctx.task, job, r.Client.Scheme())
})
return err
}

func (r *JobProvisioner) determineSliceIndex(ctx *JobContext) error {
indexStr, found := ctx.job.Labels[internaltypes.ClustercodeSliceIndexLabelKey]
if !found {
return fmt.Errorf("cannot determine slice index, missing label '%s'", internaltypes.ClustercodeSliceIndexLabelKey)
}
index, err := strconv.Atoi(indexStr)
if err != nil {
return fmt.Errorf("cannot determine slice index from label '%s': %w", internaltypes.ClustercodeSliceIndexLabelKey, err)
}
ctx.sliceIndex = index
return err
}

func (r *JobProvisioner) updateStatus(ctx *JobContext) error {
ctx.resolver.MustRequireDependencyByFuncName(r.fetchTask, r.determineSliceIndex)

finishedList := ctx.task.Status.SlicesFinished
finishedList = append(finishedList, v1alpha1.ClustercodeSliceRef{
SliceIndex: ctx.sliceIndex,
JobName: ctx.job.Name,
})
ctx.task.Status.SlicesFinished = finishedList
ctx.task.Status.SlicesFinishedCount = len(finishedList)

var scheduled []v1alpha1.ClustercodeSliceRef
for _, ref := range ctx.task.Status.SlicesScheduled {
if ref.SliceIndex != ctx.sliceIndex {
scheduled = append(scheduled, ref)
}
}
ctx.task.Status.SlicesScheduled = scheduled
return r.Client.Status().Update(ctx, ctx.task)
}
Loading

0 comments on commit f964a9d

Please sign in to comment.