Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/.testcoverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ threshold:

# (optional; default 0)
# Minimum coverage percentage required for each package.
package: 60
package: 80

# (optional; default 0)
# Minimum overall project coverage percentage required.
total: 60
total: 80

# Holds regexp rules which will override thresholds for matched files or packages
# using their paths.
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ go.work.sum

# kubebuilder binaries
bin/

# vscode settings
.vscode/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# operation-cache-controller
[![Go Report Card](https://goreportcard.com/badge/github.com/Azure/operation-cache-controller)](https://goreportcard.com/report/github.com/Azure/operation-cache-controller)
[![Go Test Status](https://github.com/Azure/operation-cache-controller/.github/workflows/test.yml/badge.svg)](https://github.com/Azure/operation-cache-controller/.github/workflows/test.yml)
[![Go Test Status](https://github.com/Azure/operation-cache-controller/actions/workflows/test.yml/badge.svg)](https://github.com/Azure/operation-cache-controller/actions/workflows/test.yml)
[![Go Test Coverage](https://raw.githubusercontent.com/Azure/operation-cache-controller/badges/.badges/main/coverage.svg)](/.github/.testcoverage.yml)

A k8s controller used to manage operations and cache the outcome of that operation
Expand Down
15 changes: 13 additions & 2 deletions api/v1/cache_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,25 @@ type CacheSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

// Foo is an example field of Cache. Edit cache_types.go to remove/update
Foo string `json:"foo,omitempty"`
OperationTemplate OperationSpec `json:"operationTemplate"`

// Strategy is the cache strategy
// +kubebuilder:validation:optional
Strategy string `json:"strategy,omitempty"`

// ExpireTime is the RFC3339-format time when the cache will be expired. If not set, the cache is never expired.
// +kubebuilder:validation:optional
// +kubebuilder:validation:Pattern:=`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$`
ExpireTime string `json:"expireTime,omitempty"`
}

// CacheStatus defines the observed state of Cache.
type CacheStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
CacheKey string `json:"cacheKey"`
KeepAliveCount int32 `json:"keepAlive"`
AvailableCaches []string `json:"availableCaches,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
10 changes: 8 additions & 2 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7,544 changes: 7,543 additions & 1 deletion config/crd/bases/app.github.com_caches.yaml

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions internal/controller/appdeployment_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
func newTestJobSpec() batchv1.JobSpec {
return batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Expand Down
231 changes: 231 additions & 0 deletions internal/controller/cache_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package controller

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

appsv1 "github.com/Azure/operation-cache-controller/api/v1"
ctrlutils "github.com/Azure/operation-cache-controller/internal/utils/controller"
oputils "github.com/Azure/operation-cache-controller/internal/utils/controller/operation"
"github.com/Azure/operation-cache-controller/internal/utils/reconciler"
)

//go:generate mockgen -destination=./mocks/mock_cache_adapter.go -package=mocks github.com/Azure/operation-cache-controller/internal/controller CacheAdapterInterface
type CacheAdapterInterface interface {
CheckCacheExpiry(ctx context.Context) (reconciler.OperationResult, error)
EnsureCacheInitialized(ctx context.Context) (reconciler.OperationResult, error)
CalculateKeepAliveCount(ctx context.Context) (reconciler.OperationResult, error)
AdjustCache(ctx context.Context) (reconciler.OperationResult, error)
}

type CacheAdapter struct {
cache *appsv1.Cache
logger logr.Logger
client client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
setControllerReferenceFunc func(owner, controlled metav1.Object, scheme *runtime.Scheme, opts ...controllerutil.OwnerReferenceOption) error
}

func NewCacheAdapter(ctx context.Context,
cache *appsv1.Cache, logger logr.Logger, client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder,
fn func(owner, controlled metav1.Object, scheme *runtime.Scheme, opts ...controllerutil.OwnerReferenceOption) error) CacheAdapterInterface {
return &CacheAdapter{
cache: cache,
logger: logger,
client: client,
scheme: scheme,
recorder: recorder,
setControllerReferenceFunc: fn,
}
}

// updateStatus updates the status of the cache cr
func (c *CacheAdapter) updateStatus(ctx context.Context) error {
if err := c.client.Status().Update(ctx, c.cache); err != nil {
return fmt.Errorf("unable to update cache status: %w", err)
}
return nil
}

// CheckCacheExpiry checks if the cache cr is expired. If it is, the cr is deleted.
func (c *CacheAdapter) CheckCacheExpiry(ctx context.Context) (reconciler.OperationResult, error) {
if c.cache.Spec.ExpireTime == "" {
return reconciler.ContinueProcessing()
}
ce, err := time.Parse(time.RFC3339, c.cache.Spec.ExpireTime)
if err != nil {
c.logger.Error(err, "failed to parse expire time")
// TODO: set cache expiry condition if needed
return reconciler.ContinueProcessing()
}
if time.Now().After(ce) {
c.logger.Info("cache is expired, deleting cache cr")
if err := c.client.Delete(ctx, c.cache); err != nil {
return reconciler.RequeueWithError(err)
}
return reconciler.StopProcessing()
}
return reconciler.ContinueProcessing()
}

// EnsureCacheInitialized ensures the cache cr is initialized
func (c *CacheAdapter) EnsureCacheInitialized(ctx context.Context) (reconciler.OperationResult, error) {
// initialize the AvailableCaches in status if it is nil
if c.cache.Status.AvailableCaches == nil {
c.cache.Status.AvailableCaches = []string{}
}
if c.cache.Status.CacheKey == "" {
c.cache.Status.CacheKey = ctrlutils.NewCacheKeyFromApplications(c.cache.Spec.OperationTemplate.Applications)
}

return reconciler.RequeueOnErrorOrContinue(c.updateStatus(ctx))
}

// CalculateKeepAliveCount calculates the keepAliveCount for the cache cr
func (c *CacheAdapter) CalculateKeepAliveCount(ctx context.Context) (reconciler.OperationResult, error) {
// before we have cache service to provide the keepAliveCount, we use fixed value
c.cache.Status.KeepAliveCount = 5
return reconciler.RequeueOnErrorOrContinue(c.updateStatus(ctx))
}

func (c *CacheAdapter) createOperationsAsync(ctx context.Context, ops []*appsv1.Operation) error {
wg := sync.WaitGroup{}
errChan := make(chan error, len(ops))
for _, op := range ops {
wg.Add(1)
go func() {
defer wg.Done()
errChan <- c.client.Create(ctx, op)
}()
}
wg.Wait()
close(errChan)
var errs error
for err := range errChan {
errs = errors.Join(errs, err)
}
return errs
}

func (c *CacheAdapter) deleteOperationsAsync(ctx context.Context, ops []*appsv1.Operation) error {
wg := sync.WaitGroup{}
errChan := make(chan error, len(ops))
for _, op := range ops {
wg.Add(1)
go func() {
defer wg.Done()
errChan <- c.client.Delete(ctx, op)
}()
}
wg.Wait()
close(errChan)
var errs error
for err := range errChan {
errs = errors.Join(errs, err)
}
return errs
}

func operationReady(op *appsv1.Operation) bool {
return op.Status.Phase == oputils.PhaseReconciled
}

func (c *CacheAdapter) initOperationFromCache(operationName string) *appsv1.Operation {
op := &appsv1.Operation{}

annotations := op.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations[ctrlutils.AnnotationNameCacheMode] = ctrlutils.AnnotationValueTrue

labels := op.GetLabels()
if labels == nil {
labels = map[string]string{}
}
// TODO: set up requirement label instead
cacheKeyLabelValue := c.cache.Status.CacheKey
if len(c.cache.Status.CacheKey) > 63 {
cacheKeyLabelValue = cacheKeyLabelValue[:63]
}
labels[ctrlutils.LabelNameCacheKey] = cacheKeyLabelValue

op.SetAnnotations(annotations)
op.SetNamespace(c.cache.Namespace)
op.SetName(operationName)
op.SetLabels(labels)
op.Spec = c.cache.Spec.OperationTemplate
return op
}

func (c *CacheAdapter) AdjustCache(ctx context.Context) (reconciler.OperationResult, error) {
var ownedOps appsv1.OperationList
if err := c.client.List(ctx, &ownedOps, client.InNamespace(c.cache.Namespace), client.MatchingFields{cacheOwnerKey: c.cache.Name}); err != nil {
return reconciler.RequeueWithError(err)
}
availableCaches := []string{}
for _, op := range ownedOps.Items {
if operationReady(&op) {
availableCaches = append(availableCaches, op.Name)
}
}
c.cache.Status.AvailableCaches = availableCaches

keepAliveCount := int(c.cache.Status.KeepAliveCount)
cacheBalance := len(availableCaches) - keepAliveCount
switch {
case cacheBalance == 0:
// do nothing: should we remove the not available operations?
case cacheBalance > 0:
// remove all the not available operations and cut available operations down to keepAliveCount
availableCacheNumToRemove := cacheBalance
opsToRemove := []*appsv1.Operation{}
for _, op := range ownedOps.Items {
if !operationReady(&op) {
opsToRemove = append(opsToRemove, &op)
} else {
if availableCacheNumToRemove > 0 {
opsToRemove = append(opsToRemove, &op)
availableCacheNumToRemove--
}
}
}
c.logger.Info("removing operations", "operations", opsToRemove)
if err := c.deleteOperationsAsync(ctx, opsToRemove); err != nil {
return reconciler.RequeueWithError(err)
}
case cacheBalance < 0:
if len(ownedOps.Items) < keepAliveCount {
// also count not available operations, create new operations to meet the keepAliveCount
opsToCreate := []*appsv1.Operation{}
opsNumToCreate := keepAliveCount - len(ownedOps.Items)
for range opsNumToCreate {
opName := fmt.Sprintf("cached-operation-%s-%s", c.cache.Status.CacheKey[:8], strings.ToLower(ctrlutils.GenerateRandomString(5)))
opToCreate := c.initOperationFromCache(opName)
if err := c.setControllerReferenceFunc(c.cache, opToCreate, c.scheme); err != nil {
return reconciler.RequeueWithError(err)
}
opsToCreate = append(opsToCreate, opToCreate)
}
c.logger.Info("creating operations", "operations", opsToCreate)
if err := c.createOperationsAsync(ctx, opsToCreate); err != nil {
return reconciler.RequeueWithError(err)
}
}
// else do nothing: we assume that any not ready operations are in progress and will be ready
// we can bring in stuck operations handling if we consider that's one case for cache controller to solve
}
return reconciler.RequeueOnErrorOrContinue(c.updateStatus(ctx))
}
Loading
Loading