Skip to content

Commit

Permalink
Merge branch 'master' into nikola-jokic/indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
nikola-jokic committed Apr 18, 2024
2 parents 3e5215e + 9e191cd commit c46fdd1
Show file tree
Hide file tree
Showing 40 changed files with 1,228 additions and 398 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gha-e2e-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ env:
TARGET_ORG: actions-runner-controller
TARGET_REPO: arc_e2e_test_dummy
IMAGE_NAME: "arc-test-image"
IMAGE_VERSION: "0.9.0"
IMAGE_VERSION: "0.9.1"

concurrency:
# This will make sure we only apply the concurrency limits on pull requests
Expand Down
4 changes: 2 additions & 2 deletions charts/gha-runner-scale-set-controller/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.9.0
version: 0.9.1

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.9.0"
appVersion: "0.9.1"

home: https://github.com/actions/actions-runner-controller

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ spec:
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.topologySpreadConstraints }}
topologySpreadConstraints:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
Expand Down
10 changes: 10 additions & 0 deletions charts/gha-runner-scale-set-controller/tests/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ func TestTemplate_ControllerDeployment_Defaults(t *testing.T) {

assert.Len(t, deployment.Spec.Template.Spec.NodeSelector, 0)
assert.Nil(t, deployment.Spec.Template.Spec.Affinity)
assert.Len(t, deployment.Spec.Template.Spec.TopologySpreadConstraints, 0)
assert.Len(t, deployment.Spec.Template.Spec.Tolerations, 0)

managerImage := "ghcr.io/actions/gha-runner-scale-set-controller:dev"
Expand Down Expand Up @@ -424,6 +425,9 @@ func TestTemplate_ControllerDeployment_Customize(t *testing.T) {
"tolerations[0].key": "foo",
"affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[0].matchExpressions[0].key": "foo",
"affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[0].matchExpressions[0].operator": "bar",
"topologySpreadConstraints[0].labelSelector.matchLabels.foo": "bar",
"topologySpreadConstraints[0].maxSkew": "1",
"topologySpreadConstraints[0].topologyKey": "foo",
"priorityClassName": "test-priority-class",
"flags.updateStrategy": "eventual",
"flags.logLevel": "info",
Expand Down Expand Up @@ -487,6 +491,11 @@ func TestTemplate_ControllerDeployment_Customize(t *testing.T) {
assert.Equal(t, "foo", deployment.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Key)
assert.Equal(t, "bar", string(deployment.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Operator))

assert.Len(t, deployment.Spec.Template.Spec.TopologySpreadConstraints, 1)
assert.Equal(t, "bar", deployment.Spec.Template.Spec.TopologySpreadConstraints[0].LabelSelector.MatchLabels["foo"])
assert.Equal(t, int32(1), deployment.Spec.Template.Spec.TopologySpreadConstraints[0].MaxSkew)
assert.Equal(t, "foo", deployment.Spec.Template.Spec.TopologySpreadConstraints[0].TopologyKey)

assert.Len(t, deployment.Spec.Template.Spec.Tolerations, 1)
assert.Equal(t, "foo", deployment.Spec.Template.Spec.Tolerations[0].Key)

Expand Down Expand Up @@ -745,6 +754,7 @@ func TestTemplate_ControllerDeployment_WatchSingleNamespace(t *testing.T) {

assert.Len(t, deployment.Spec.Template.Spec.NodeSelector, 0)
assert.Nil(t, deployment.Spec.Template.Spec.Affinity)
assert.Len(t, deployment.Spec.Template.Spec.TopologySpreadConstraints, 0)
assert.Len(t, deployment.Spec.Template.Spec.Tolerations, 0)

managerImage := "ghcr.io/actions/gha-runner-scale-set-controller:dev"
Expand Down
8 changes: 5 additions & 3 deletions charts/gha-runner-scale-set-controller/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ tolerations: []

affinity: {}

topologySpreadConstraints: []

# Mount volumes in the container.
volumes: []
volumeMounts: []
Expand All @@ -81,9 +83,9 @@ volumeMounts: []
# PriorityClass: system-cluster-critical
priorityClassName: ""

## If `metrics:` object is not provided, or commented out, the following flags
## will be applied the controller-manager and listener pods with empty values:
## `--metrics-addr`, `--listener-metrics-addr`, `--listener-metrics-endpoint`.
## If `metrics:` object is not provided, or commented out, the following flags
## will be applied the controller-manager and listener pods with empty values:
## `--metrics-addr`, `--listener-metrics-addr`, `--listener-metrics-endpoint`.
## This will disable metrics.
##
## To enable metrics, uncomment the following lines.
Expand Down
4 changes: 2 additions & 2 deletions charts/gha-runner-scale-set/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.9.0
version: 0.9.1

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.9.0"
appVersion: "0.9.1"

home: https://github.com/actions/actions-runner-controller

Expand Down
2 changes: 1 addition & 1 deletion charts/gha-runner-scale-set/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ githubConfigSecret:
# kubernetesModeServiceAccount:
# annotations:

## template is the PodSpec for each listener Pod
## listenerTemplate is the PodSpec for each listener Pod
## For reference: https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/pod-v1/#PodSpec
# listenerTemplate:
# spec:
Expand Down
8 changes: 6 additions & 2 deletions cmd/ghalistener/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,19 @@ func (app *App) Run(ctx context.Context) error {
}

g, ctx := errgroup.WithContext(ctx)
metricsCtx, cancelMetrics := context.WithCancelCause(ctx)

g.Go(func() error {
app.logger.Info("Starting listener")
return app.listener.Listen(ctx, app.worker)
listnerErr := app.listener.Listen(ctx, app.worker)
cancelMetrics(fmt.Errorf("Listener exited: %w", listnerErr))
return listnerErr
})

if app.metrics != nil {
g.Go(func() error {
app.logger.Info("Starting metrics server")
return app.metrics.ListenAndServe(ctx)
return app.metrics.ListenAndServe(metricsCtx)
})
}

Expand Down
25 changes: 16 additions & 9 deletions cmd/ghalistener/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
type Client interface {
GetAcquirableJobs(ctx context.Context, runnerScaleSetId int) (*actions.AcquirableJobList, error)
CreateMessageSession(ctx context.Context, runnerScaleSetId int, owner string) (*actions.RunnerScaleSetSession, error)
GetMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, lastMessageId int64) (*actions.RunnerScaleSetMessage, error)
GetMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, lastMessageId int64, maxCapacity int) (*actions.RunnerScaleSetMessage, error)
DeleteMessage(ctx context.Context, messageQueueUrl, messageQueueAccessToken string, messageId int64) error
AcquireJobs(ctx context.Context, runnerScaleSetId int, messageQueueAccessToken string, requestIds []int64) ([]int64, error)
RefreshMessageSession(ctx context.Context, runnerScaleSetId int, sessionId *uuid.UUID) (*actions.RunnerScaleSetSession, error)
Expand Down Expand Up @@ -80,6 +80,7 @@ type Listener struct {

// updated fields
lastMessageID int64 // The ID of the last processed message.
maxCapacity int // The maximum number of runners that can be created.
session *actions.RunnerScaleSetSession // The session for managing the runner scale set.
}

Expand All @@ -89,10 +90,11 @@ func New(config Config) (*Listener, error) {
}

listener := &Listener{
scaleSetID: config.ScaleSetID,
client: config.Client,
logger: config.Logger,
metrics: metrics.Discard,
scaleSetID: config.ScaleSetID,
client: config.Client,
logger: config.Logger,
metrics: metrics.Discard,
maxCapacity: config.MaxRunners,
}

if config.Metrics != nil {
Expand Down Expand Up @@ -164,11 +166,16 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) error {
}

if msg == nil {
_, err := handler.HandleDesiredRunnerCount(ctx, 0, 0)
if err != nil {
return fmt.Errorf("handling nil message failed: %w", err)
}

continue
}

// New context is created to avoid cancelation during message handling.
if err := l.handleMessage(context.Background(), handler, msg); err != nil {
// Remove cancellation from the context to avoid cancelling the message handling.
if err := l.handleMessage(context.WithoutCancel(ctx), handler, msg); err != nil {
return fmt.Errorf("failed to handle message: %w", err)
}
}
Expand Down Expand Up @@ -262,7 +269,7 @@ func (l *Listener) createSession(ctx context.Context) error {

func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessage, error) {
l.logger.Info("Getting next message", "lastMessageID", l.lastMessageID)
msg, err := l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID)
msg, err := l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID, l.maxCapacity)
if err == nil { // if NO error
return msg, nil
}
Expand All @@ -278,7 +285,7 @@ func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessa

l.logger.Info("Getting next message", "lastMessageID", l.lastMessageID)

msg, err = l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID)
msg, err = l.client.GetMessage(ctx, l.session.MessageQueueUrl, l.session.MessageQueueAccessToken, l.lastMessageID, l.maxCapacity)
if err != nil { // if NO error
return nil, fmt.Errorf("failed to get next message after message session refresh: %w", err)
}
Expand Down
21 changes: 13 additions & 8 deletions cmd/ghalistener/listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,14 @@ func TestListener_getMessage(t *testing.T) {
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
MaxRunners: 10,
}

client := listenermocks.NewClient(t)
want := &actions.RunnerScaleSetMessage{
MessageId: 1,
}
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(want, nil).Once()
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(want, nil).Once()
config.Client = client

l, err := New(config)
Expand All @@ -148,10 +149,11 @@ func TestListener_getMessage(t *testing.T) {
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
MaxRunners: 10,
}

client := listenermocks.NewClient(t)
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.HttpClientSideError{Code: http.StatusNotFound}).Once()
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(nil, &actions.HttpClientSideError{Code: http.StatusNotFound}).Once()
config.Client = client

l, err := New(config)
Expand All @@ -170,6 +172,7 @@ func TestListener_getMessage(t *testing.T) {
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
MaxRunners: 10,
}

client := listenermocks.NewClient(t)
Expand All @@ -185,12 +188,12 @@ func TestListener_getMessage(t *testing.T) {
}
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()

client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.MessageQueueTokenExpiredError{}).Once()
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(nil, &actions.MessageQueueTokenExpiredError{}).Once()

want := &actions.RunnerScaleSetMessage{
MessageId: 1,
}
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(want, nil).Once()
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(want, nil).Once()

config.Client = client

Expand All @@ -214,6 +217,7 @@ func TestListener_getMessage(t *testing.T) {
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
MaxRunners: 10,
}

client := listenermocks.NewClient(t)
Expand All @@ -229,7 +233,7 @@ func TestListener_getMessage(t *testing.T) {
}
client.On("RefreshMessageSession", ctx, mock.Anything, mock.Anything).Return(session, nil).Once()

client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, &actions.MessageQueueTokenExpiredError{}).Twice()
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).Return(nil, &actions.MessageQueueTokenExpiredError{}).Twice()

config.Client = client

Expand Down Expand Up @@ -450,6 +454,7 @@ func TestListener_Listen(t *testing.T) {
config := Config{
ScaleSetID: 1,
Metrics: metrics.Discard,
MaxRunners: 10,
}

client := listenermocks.NewClient(t)
Expand All @@ -470,7 +475,7 @@ func TestListener_Listen(t *testing.T) {
MessageType: "RunnerScaleSetJobMessages",
Statistics: &actions.RunnerScaleSetStatistic{},
}
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything).
client.On("GetMessage", ctx, mock.Anything, mock.Anything, mock.Anything, 10).
Return(msg, nil).
Run(
func(mock.Arguments) {
Expand All @@ -479,8 +484,8 @@ func TestListener_Listen(t *testing.T) {
).
Once()

// Ensure delete message is called with background context
client.On("DeleteMessage", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
// Ensure delete message is called without cancel
client.On("DeleteMessage", context.WithoutCancel(ctx), mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()

config.Client = client

Expand Down
18 changes: 9 additions & 9 deletions cmd/ghalistener/listener/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion cmd/ghalistener/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http"
"strconv"
"time"

"github.com/actions/actions-runner-controller/github/actions"
"github.com/go-logr/logr"
Expand Down Expand Up @@ -338,7 +339,9 @@ func (e *exporter) ListenAndServe(ctx context.Context) error {
e.logger.Info("starting metrics server", "addr", e.srv.Addr)
go func() {
<-ctx.Done()
e.logger.Info("stopping metrics server")
e.logger.Info("stopping metrics server", "err", ctx.Err())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
e.srv.Shutdown(ctx)
}()
return e.srv.ListenAndServe()
Expand Down
8 changes: 4 additions & 4 deletions cmd/ghalistener/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,12 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo
"jobsCompleted", jobsCompleted,
}

if w.lastPatch == targetRunnerCount && jobsCompleted == 0 {
w.logger.Info("Skipping patch", logValues...)
return targetRunnerCount, nil
if count == 0 && jobsCompleted == 0 {
w.lastPatchID = 0
} else {
w.lastPatchID++
}

w.lastPatchID++
w.lastPatch = targetRunnerCount

original, err := json.Marshal(
Expand Down
4 changes: 2 additions & 2 deletions cmd/githubrunnerscalesetlistener/autoScalerMessageListener.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (m *AutoScalerClient) Close() error {
return m.client.Close()
}

func (m *AutoScalerClient) GetRunnerScaleSetMessage(ctx context.Context, handler func(msg *actions.RunnerScaleSetMessage) error) error {
func (m *AutoScalerClient) GetRunnerScaleSetMessage(ctx context.Context, handler func(msg *actions.RunnerScaleSetMessage) error, maxCapacity int) error {
if m.initialMessage != nil {
err := handler(m.initialMessage)
if err != nil {
Expand All @@ -141,7 +141,7 @@ func (m *AutoScalerClient) GetRunnerScaleSetMessage(ctx context.Context, handler
}

for {
message, err := m.client.GetMessage(ctx, m.lastMessageId)
message, err := m.client.GetMessage(ctx, m.lastMessageId, maxCapacity)
if err != nil {
return fmt.Errorf("get message failed from refreshing client. %w", err)
}
Expand Down
Loading

0 comments on commit c46fdd1

Please sign in to comment.