Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix overscaling when the controller is much faster then the listener #3371

Merged
merged 6 commits into from
Mar 20, 2024
4 changes: 4 additions & 0 deletions apis/actions.github.com/v1alpha1/ephemeralrunner_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type EphemeralRunner struct {
Status EphemeralRunnerStatus `json:"status,omitempty"`
}

func (er *EphemeralRunner) IsDone() bool {
return er.Status.Phase == corev1.PodSucceeded || er.Status.Phase == corev1.PodFailed
}

// EphemeralRunnerSpec defines the desired state of EphemeralRunner
type EphemeralRunnerSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
type EphemeralRunnerSetSpec struct {
// Replicas is the number of desired EphemeralRunner resources in the k8s namespace.
Replicas int `json:"replicas,omitempty"`
// PatchID is the unique identifier for the patch issued by the listener app
PatchID int `json:"patchID"`

EphemeralRunnerSpec EphemeralRunnerSpec `json:"ephemeralRunnerSpec,omitempty"`
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6957,9 +6957,14 @@ spec:
- containers
type: object
type: object
patchID:
description: PatchID is the unique identifier for the patch issued by the listener app
type: integer
replicas:
description: Replicas is the number of desired EphemeralRunner resources in the k8s namespace.
type: integer
required:
- patchID
type: object
status:
description: EphemeralRunnerSetStatus defines the observed state of EphemeralRunnerSet
Expand Down
2 changes: 1 addition & 1 deletion cmd/ghalistener/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Listener interface {
//go:generate mockery --name Worker --output ./mocks --outpkg mocks --case underscore
type Worker interface {
HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error
HandleDesiredRunnerCount(ctx context.Context, count int) (int, error)
HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error)
}

func New(config config.Config) (*App, error) {
Expand Down
18 changes: 9 additions & 9 deletions cmd/ghalistener/app/mocks/worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions cmd/ghalistener/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func New(config Config) (*Listener, error) {
//go:generate mockery --name Handler --output ./mocks --outpkg mocks --case underscore
type Handler interface {
HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error
HandleDesiredRunnerCount(ctx context.Context, count int) (int, error)
HandleDesiredRunnerCount(ctx context.Context, count, jobsCompleted int) (int, error)
}

// Listen listens for incoming messages and handles them using the provided handler.
Expand Down Expand Up @@ -145,7 +145,7 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) error {
}
l.metrics.PublishStatistics(initialMessage.Statistics)

desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, initialMessage.Statistics.TotalAssignedJobs)
desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, initialMessage.Statistics.TotalAssignedJobs, 0)
if err != nil {
return fmt.Errorf("handling initial message failed: %w", err)
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func (l *Listener) handleMessage(ctx context.Context, handler Handler, msg *acti
l.metrics.PublishJobStarted(jobStarted)
}

desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, parsedMsg.statistics.TotalAssignedJobs)
desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, parsedMsg.statistics.TotalAssignedJobs, len(parsedMsg.jobsCompleted))
if err != nil {
return fmt.Errorf("failed to handle desired runner count: %w", err)
}
Expand Down Expand Up @@ -284,7 +284,6 @@ func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessa
}

return msg, nil

}

func (l *Listener) deleteLastMessage(ctx context.Context) error {
Expand Down
6 changes: 3 additions & 3 deletions cmd/ghalistener/listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func TestListener_Listen(t *testing.T) {

var called bool
handler := listenermocks.NewHandler(t)
handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything).
handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 0).
Return(0, nil).
Run(
func(mock.Arguments) {
Expand Down Expand Up @@ -485,11 +485,11 @@ func TestListener_Listen(t *testing.T) {
config.Client = client

handler := listenermocks.NewHandler(t)
handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything).
handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 0).
Return(0, nil).
Once()

handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything).
handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 0).
Return(0, nil).
Once()

Expand Down
4 changes: 2 additions & 2 deletions cmd/ghalistener/listener/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestInitialMetrics(t *testing.T) {
config.Client = client

handler := listenermocks.NewHandler(t)
handler.On("HandleDesiredRunnerCount", mock.Anything, sessionStatistics.TotalAssignedJobs).
handler.On("HandleDesiredRunnerCount", mock.Anything, sessionStatistics.TotalAssignedJobs, 0).
Return(sessionStatistics.TotalAssignedJobs, nil).
Once()

Expand Down Expand Up @@ -178,7 +178,7 @@ func TestHandleMessageMetrics(t *testing.T) {

handler := listenermocks.NewHandler(t)
handler.On("HandleJobStarted", mock.Anything, jobsStarted[0]).Return(nil).Once()
handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything).Return(desiredResult, nil).Once()
handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 2).Return(desiredResult, nil).Once()

client := listenermocks.NewClient(t)
client.On("DeleteMessage", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
Expand Down
18 changes: 9 additions & 9 deletions cmd/ghalistener/listener/mocks/handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 17 additions & 9 deletions cmd/ghalistener/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,20 @@ type Config struct {
// The Worker's role is to process the messages it receives from the listener.
// It then initiates Kubernetes API requests to carry out the necessary actions.
type Worker struct {
clientset *kubernetes.Clientset
config Config
lastPatch int
logger *logr.Logger
clientset *kubernetes.Clientset
config Config
lastPatch int
lastPatchID int
logger *logr.Logger
}

var _ listener.Handler = (*Worker)(nil)

func New(config Config, options ...Option) (*Worker, error) {
w := &Worker{
config: config,
lastPatch: -1,
config: config,
lastPatch: -1,
lastPatchID: -1,
}

conf, err := rest.InClusterConfig()
Expand Down Expand Up @@ -161,7 +163,7 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart
// The function then scales the ephemeral runner set by applying the merge patch.
// Finally, it logs the scaled ephemeral runner set details and returns nil if successful.
// If any error occurs during the process, it returns an error with a descriptive message.
func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) {
func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) {
// Max runners should always be set by the resource builder either to the configured value,
// or the maximum int32 (resourcebuilder.newAutoScalingListener()).
targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners)
Expand All @@ -172,17 +174,22 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int,
"min", w.config.MinRunners,
"max", w.config.MaxRunners,
"currentRunnerCount", w.lastPatch,
"jobsCompleted", jobsCompleted,
}

if targetRunnerCount == w.lastPatch {
w.logger.Info("Skipping patching of EphemeralRunnerSet as the desired count has not changed", logValues...)
if w.lastPatch == targetRunnerCount && jobsCompleted == 0 {
w.logger.Info("Skipping patch", logValues...)
return targetRunnerCount, nil
}

w.lastPatchID++
w.lastPatch = targetRunnerCount

original, err := json.Marshal(
&v1alpha1.EphemeralRunnerSet{
Spec: v1alpha1.EphemeralRunnerSetSpec{
Replicas: -1,
PatchID: -1,
},
},
)
Expand All @@ -194,6 +201,7 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int,
&v1alpha1.EphemeralRunnerSet{
Spec: v1alpha1.EphemeralRunnerSetSpec{
Replicas: targetRunnerCount,
PatchID: w.lastPatchID,
},
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6957,9 +6957,14 @@ spec:
- containers
type: object
type: object
patchID:
description: PatchID is the unique identifier for the patch issued by the listener app
type: integer
replicas:
description: Replicas is the number of desired EphemeralRunner resources in the k8s namespace.
type: integer
required:
- patchID
type: object
status:
description: EphemeralRunnerSetStatus defines the observed state of EphemeralRunnerSet
Expand Down
1 change: 1 addition & 0 deletions controllers/actions.github.com/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const AutoscalingRunnerSetCleanupFinalizerName = "actions.github.com/cleanup-pro
const (
AnnotationKeyGitHubRunnerGroupName = "actions.github.com/runner-group-name"
AnnotationKeyGitHubRunnerScaleSetName = "actions.github.com/runner-scale-set-name"
AnnotationKeyPatchID = "actions.github.com/patch-id"
)

// Labels applied to listener roles
Expand Down
28 changes: 19 additions & 9 deletions controllers/actions.github.com/ephemeralrunner_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, nil
}

if ephemeralRunner.IsDone() {
log.Info("Cleaning up resources after after ephemeral runner termination", "phase", ephemeralRunner.Status.Phase)
done, err := r.cleanupResources(ctx, ephemeralRunner, log)
if err != nil {
log.Error(err, "Failed to clean up ephemeral runner owned resources")
return ctrl.Result{}, err
}
if !done {
log.Info("Waiting for ephemeral runner owned resources to be deleted")
return ctrl.Result{Requeue: true}, nil
}
// Stop reconciling on this object.
// The EphemeralRunnerSet is responsible for cleaning it up.
log.Info("EphemeralRunner has already finished. Stopping reconciliation and waiting for EphemeralRunnerSet to clean it up", "phase", ephemeralRunner.Status.Phase)
return ctrl.Result{}, nil
}

if !controllerutil.ContainsFinalizer(ephemeralRunner, ephemeralRunnerActionsFinalizerName) {
log.Info("Adding runner registration finalizer")
err := patch(ctx, r.Client, ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) {
Expand All @@ -159,13 +176,6 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, nil
}

if ephemeralRunner.Status.Phase == corev1.PodSucceeded || ephemeralRunner.Status.Phase == corev1.PodFailed {
// Stop reconciling on this object.
// The EphemeralRunnerSet is responsible for cleaning it up.
log.Info("EphemeralRunner has already finished. Stopping reconciliation and waiting for EphemeralRunnerSet to clean it up", "phase", ephemeralRunner.Status.Phase)
return ctrl.Result{}, nil
}

if ephemeralRunner.Status.RunnerId == 0 {
log.Info("Creating new ephemeral runner registration and updating status with runner config")
return r.updateStatusWithRunnerConfig(ctx, ephemeralRunner, log)
Expand Down Expand Up @@ -324,7 +334,7 @@ func (r *EphemeralRunnerReconciler) cleanupResources(ctx context.Context, epheme
}
}
return false, nil
case err != nil && !kerrors.IsNotFound(err):
case !kerrors.IsNotFound(err):
return false, err
}
log.Info("Pod is deleted")
Expand All @@ -341,7 +351,7 @@ func (r *EphemeralRunnerReconciler) cleanupResources(ctx context.Context, epheme
}
}
return false, nil
case err != nil && !kerrors.IsNotFound(err):
case !kerrors.IsNotFound(err):
return false, err
}
log.Info("Secret is deleted")
Expand Down