From dd29c58bd554721046176c1eb08566351627394b Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Fri, 14 Nov 2025 16:37:23 +0100 Subject: [PATCH 1/7] Improve FetchTask reliability / performance --- models/actions/task.go | 105 +++++++++++++--------- routers/api/actions/runner/interceptor.go | 19 ++-- 2 files changed, 78 insertions(+), 46 deletions(-) diff --git a/models/actions/task.go b/models/actions/task.go index 8b4ecf28f7a0b..dc51ea5243474 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -8,6 +8,7 @@ import ( "crypto/subtle" "errors" "fmt" + "math/rand" "time" auth_model "code.gitea.io/gitea/models/auth" @@ -223,6 +224,20 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask e := db.GetEngine(ctx) + // Create a new task record as early as possible to be able to reserve jobs + task := &ActionTask{ + RunnerID: runner.ID, + Status: StatusBlocked, + } + // This is a requirement of the database schema + if err := task.GenerateToken(); err != nil { + return nil, false, err + } + + if _, err := e.Insert(task); err != nil { + return nil, false, err + } + jobCond := builder.NewCond() if runner.RepoID != 0 { jobCond = builder.Eq{"repo_id": runner.RepoID} @@ -235,18 +250,46 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask jobCond = builder.In("run_id", builder.Select("id").From("action_run").Where(jobCond)) } - var jobs []*ActionRunJob - if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil { - return nil, false, err - } - - // TODO: a more efficient way to filter labels var job *ActionRunJob - log.Trace("runner labels: %v", runner.AgentLabels) - for _, v := range jobs { - if runner.CanMatchLabels(v.RunsOn) { - job = v - break + + const limit = 10 + // TODO store the last position to continue searching next time inside the runner record + // e.g. we would start again from zero if no job matches our known labels + // For stable paging + var lastUpdated timeutil.TimeStamp + for page := 0; job == nil; page++ { + var jobs []*ActionRunJob + // Load only 10 job in a batch without all fields for memory / db load reduction + if err := e.Where("task_id=? AND status=? AND updated>=?", 0, StatusWaiting, lastUpdated).Cols("id", "runs_on").And(jobCond).Asc("updated", "id").Limit(limit, page*limit).Find(&jobs); err != nil { + return nil, false, err + } + + // TODO: a more efficient way to filter labels + log.Trace("runner labels: %v", runner.AgentLabels) + backoffGen := rand.New(rand.NewSource(time.Now().UnixNano() ^ int64(runner.ID))) + for _, v := range jobs { + if runner.CanMatchLabels(v.RunsOn) { + // Reserve our job before preparing task, otherwise continue searching + v.TaskID = task.ID + if n, err := UpdateRunJob(ctx, v, builder.Eq{"task_id": 0}); err != nil { + return nil, false, err + } else if n == 1 { + var exist bool + // reload to get all fields + if job, exist, err = db.GetByID[ActionRunJob](ctx, v.ID); err != nil || !exist { + return nil, false, err + } + break + } + } + lastUpdated = v.Updated + } + // Randomly distribute retries over time to reduce contention + jitter := time.Duration(backoffGen.Int63n(int64(util.Iif(page < 4, page+1, 5))*20)) * time.Millisecond // random jitter + select { + case <-ctx.Done(): + return nil, false, ctx.Err() + case <-time.After(jitter): } } if job == nil { @@ -261,32 +304,23 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask job.Started = now job.Status = StatusRunning - task := &ActionTask{ - JobID: job.ID, - Attempt: job.Attempt, - RunnerID: runner.ID, - Started: now, - Status: StatusRunning, - RepoID: job.RepoID, - OwnerID: job.OwnerID, - CommitSHA: job.CommitSHA, - IsForkPullRequest: job.IsForkPullRequest, - } - if err := task.GenerateToken(); err != nil { - return nil, false, err - } - workflowJob, err := job.ParseJob() if err != nil { return nil, false, fmt.Errorf("load job %d: %w", job.ID, err) } - if _, err := e.Insert(task); err != nil { - return nil, false, err - } - + task.Job = job + task.JobID = job.ID + task.Attempt = job.Attempt + task.Started = now + task.Status = StatusRunning + task.RepoID = job.RepoID + task.OwnerID = job.OwnerID + task.CommitSHA = job.CommitSHA + task.IsForkPullRequest = job.IsForkPullRequest task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID) - if err := UpdateTask(ctx, task, "log_filename"); err != nil { + + if err := UpdateTask(ctx, task, "job_id", "attempt", "started", "status", "repo_id", "owner_id", "commit_sha", "is_fork_pull_request", "log_filename"); err != nil { return nil, false, err } @@ -308,15 +342,6 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask task.Steps = steps } - job.TaskID = task.ID - if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil { - return nil, false, err - } else if n != 1 { - return nil, false, nil - } - - task.Job = job - if err := committer.Commit(); err != nil { return nil, false, err } diff --git a/routers/api/actions/runner/interceptor.go b/routers/api/actions/runner/interceptor.go index 521ba910e3b7c..b88444a817e0b 100644 --- a/routers/api/actions/runner/interceptor.go +++ b/routers/api/actions/runner/interceptor.go @@ -45,14 +45,21 @@ var withRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unar return nil, status.Error(codes.Unauthenticated, "unregistered runner") } - cols := []string{"last_online"} - runner.LastOnline = timeutil.TimeStampNow() - if methodName == "UpdateTask" || methodName == "UpdateLog" { - runner.LastActive = timeutil.TimeStampNow() + // Reduce db writes by only updating last active/online when needed + var cols []string + now := timeutil.TimeStampNow() + if runner.LastActive.AddDuration(actions_model.RunnerOfflineTime/2) < now { + runner.LastOnline = now + cols = append(cols, "last_online") + } + if (methodName == "UpdateTask" || methodName == "UpdateLog") && runner.LastActive.AddDuration(actions_model.RunnerIdleTime/2) < now { + runner.LastActive = now cols = append(cols, "last_active") } - if err := actions_model.UpdateRunner(ctx, runner, cols...); err != nil { - log.Error("can't update runner status: %v", err) + if cols != nil { + if err := actions_model.UpdateRunner(ctx, runner, cols...); err != nil { + log.Error("can't update runner status: %v", err) + } } ctx = context.WithValue(ctx, runnerCtxKey{}, runner) From 6cca172a65a375c070caee02008cd976fa5879cc Mon Sep 17 00:00:00 2001 From: ChristopherHX Date: Fri, 14 Nov 2025 17:04:20 +0100 Subject: [PATCH 2/7] remove cast --- models/actions/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/actions/task.go b/models/actions/task.go index dc51ea5243474..45674f33098d0 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -266,7 +266,7 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask // TODO: a more efficient way to filter labels log.Trace("runner labels: %v", runner.AgentLabels) - backoffGen := rand.New(rand.NewSource(time.Now().UnixNano() ^ int64(runner.ID))) + backoffGen := rand.New(rand.NewSource(time.Now().UnixNano() ^ runner.ID)) for _, v := range jobs { if runner.CanMatchLabels(v.RunsOn) { // Reserve our job before preparing task, otherwise continue searching From 8699c5a30dfd982d02a54a013be5742b0233f202 Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Fri, 14 Nov 2025 17:24:12 +0100 Subject: [PATCH 3/7] fix paging --- models/actions/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/actions/task.go b/models/actions/task.go index 45674f33098d0..01035d60c487c 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -260,7 +260,7 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask for page := 0; job == nil; page++ { var jobs []*ActionRunJob // Load only 10 job in a batch without all fields for memory / db load reduction - if err := e.Where("task_id=? AND status=? AND updated>=?", 0, StatusWaiting, lastUpdated).Cols("id", "runs_on").And(jobCond).Asc("updated", "id").Limit(limit, page*limit).Find(&jobs); err != nil { + if err := e.Where("task_id=? AND status=? AND updated>=?", 0, StatusWaiting, lastUpdated).Cols("id", "runs_on").And(jobCond).Asc("updated", "id").Limit(limit).Find(&jobs); err != nil { return nil, false, err } From 03a2c06347c3bbf79bebddd14076c19180f6d0cb Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Fri, 14 Nov 2025 17:28:47 +0100 Subject: [PATCH 4/7] fixup --- models/actions/task.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/models/actions/task.go b/models/actions/task.go index 01035d60c487c..7fb21d82bed01 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -260,9 +260,12 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask for page := 0; job == nil; page++ { var jobs []*ActionRunJob // Load only 10 job in a batch without all fields for memory / db load reduction - if err := e.Where("task_id=? AND status=? AND updated>=?", 0, StatusWaiting, lastUpdated).Cols("id", "runs_on").And(jobCond).Asc("updated", "id").Limit(limit).Find(&jobs); err != nil { + if err := e.Where("task_id=? AND status=? AND updated>?", 0, StatusWaiting, lastUpdated).Cols("id", "runs_on").And(jobCond).Asc("updated", "id").Limit(limit).Find(&jobs); err != nil { return nil, false, err } + if len(jobs) == 0 { + break + } // TODO: a more efficient way to filter labels log.Trace("runner labels: %v", runner.AgentLabels) From fd08159c18d72505415091c13f3217e73807f3db Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Fri, 14 Nov 2025 17:30:29 +0100 Subject: [PATCH 5/7] fix more problems --- models/actions/task.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/models/actions/task.go b/models/actions/task.go index 7fb21d82bed01..2f635f97e7d1c 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -257,19 +257,16 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask // e.g. we would start again from zero if no job matches our known labels // For stable paging var lastUpdated timeutil.TimeStamp + // TODO: a more efficient way to filter labels + log.Trace("runner labels: %v", runner.AgentLabels) + backoffGen := rand.New(rand.NewSource(time.Now().UnixNano() ^ runner.ID)) for page := 0; job == nil; page++ { var jobs []*ActionRunJob // Load only 10 job in a batch without all fields for memory / db load reduction if err := e.Where("task_id=? AND status=? AND updated>?", 0, StatusWaiting, lastUpdated).Cols("id", "runs_on").And(jobCond).Asc("updated", "id").Limit(limit).Find(&jobs); err != nil { return nil, false, err } - if len(jobs) == 0 { - break - } - // TODO: a more efficient way to filter labels - log.Trace("runner labels: %v", runner.AgentLabels) - backoffGen := rand.New(rand.NewSource(time.Now().UnixNano() ^ runner.ID)) for _, v := range jobs { if runner.CanMatchLabels(v.RunsOn) { // Reserve our job before preparing task, otherwise continue searching @@ -287,6 +284,9 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask } lastUpdated = v.Updated } + if len(jobs) < limit { + break + } // Randomly distribute retries over time to reduce contention jitter := time.Duration(backoffGen.Int63n(int64(util.Iif(page < 4, page+1, 5))*20)) * time.Millisecond // random jitter select { From 4a5bd7508c87c9331fd75df057f1631631d1798c Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Fri, 14 Nov 2025 18:31:15 +0100 Subject: [PATCH 6/7] fix update job status to db --- models/actions/task.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/models/actions/task.go b/models/actions/task.go index 2f635f97e7d1c..495a7645a3445 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -263,7 +263,7 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask for page := 0; job == nil; page++ { var jobs []*ActionRunJob // Load only 10 job in a batch without all fields for memory / db load reduction - if err := e.Where("task_id=? AND status=? AND updated>?", 0, StatusWaiting, lastUpdated).Cols("id", "runs_on").And(jobCond).Asc("updated", "id").Limit(limit).Find(&jobs); err != nil { + if err := e.Where("task_id=? AND status=? AND updated>?", 0, StatusWaiting, lastUpdated).Cols("id", "runs_on", "updated").And(jobCond).Asc("updated", "id").Limit(limit).Find(&jobs); err != nil { return nil, false, err } @@ -271,7 +271,7 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask if runner.CanMatchLabels(v.RunsOn) { // Reserve our job before preparing task, otherwise continue searching v.TaskID = task.ID - if n, err := UpdateRunJob(ctx, v, builder.Eq{"task_id": 0}); err != nil { + if n, err := UpdateRunJob(ctx, v, builder.Eq{"task_id": 0}, "task_id"); err != nil { return nil, false, err } else if n == 1 { var exist bool @@ -327,6 +327,10 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask return nil, false, err } + if _, err := UpdateRunJob(ctx, job, builder.Eq{"id": job.ID}, "attempt", "started", "status"); err != nil { + return nil, false, err + } + if len(workflowJob.Steps) > 0 { steps := make([]*ActionTaskStep, len(workflowJob.Steps)) for i, v := range workflowJob.Steps { From e68da9a037e65ed969a40e811c8d79c3023eaf57 Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Fri, 14 Nov 2025 20:18:55 +0100 Subject: [PATCH 7/7] prevent increment task.ID in idle --- models/actions/task.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/models/actions/task.go b/models/actions/task.go index 495a7645a3445..4905afa3850ec 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -233,10 +233,7 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask if err := task.GenerateToken(); err != nil { return nil, false, err } - - if _, err := e.Insert(task); err != nil { - return nil, false, err - } + // Insert the task on demand if task.ID == 0, if at least one job matches avoid unnecessary id increment jobCond := builder.NewCond() if runner.RepoID != 0 { @@ -269,6 +266,12 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask for _, v := range jobs { if runner.CanMatchLabels(v.RunsOn) { + // Insert on demand, auto removed by aborted transaction if no job matches + if task.ID == 0 { + if _, err := e.Insert(task); err != nil { + return nil, false, err + } + } // Reserve our job before preparing task, otherwise continue searching v.TaskID = task.ID if n, err := UpdateRunJob(ctx, v, builder.Eq{"task_id": 0}, "task_id"); err != nil {