diff --git a/models/actions/task.go b/models/actions/task.go index 8b4ecf28f7a0b..4905afa3850ec 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -8,6 +8,7 @@ import ( "crypto/subtle" "errors" "fmt" + "math/rand" "time" auth_model "code.gitea.io/gitea/models/auth" @@ -223,6 +224,17 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask e := db.GetEngine(ctx) + // Create a new task record as early as possible to be able to reserve jobs + task := &ActionTask{ + RunnerID: runner.ID, + Status: StatusBlocked, + } + // This is a requirement of the database schema + if err := task.GenerateToken(); err != nil { + return nil, false, err + } + // Insert the task on demand if task.ID == 0, if at least one job matches avoid unnecessary id increment + jobCond := builder.NewCond() if runner.RepoID != 0 { jobCond = builder.Eq{"repo_id": runner.RepoID} @@ -235,19 +247,56 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask jobCond = builder.In("run_id", builder.Select("id").From("action_run").Where(jobCond)) } - var jobs []*ActionRunJob - if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil { - return nil, false, err - } + var job *ActionRunJob + const limit = 10 + // TODO store the last position to continue searching next time inside the runner record + // e.g. we would start again from zero if no job matches our known labels + // For stable paging + var lastUpdated timeutil.TimeStamp // TODO: a more efficient way to filter labels - var job *ActionRunJob log.Trace("runner labels: %v", runner.AgentLabels) - for _, v := range jobs { - if runner.CanMatchLabels(v.RunsOn) { - job = v + backoffGen := rand.New(rand.NewSource(time.Now().UnixNano() ^ runner.ID)) + for page := 0; job == nil; page++ { + var jobs []*ActionRunJob + // Load only 10 job in a batch without all fields for memory / db load reduction + if err := e.Where("task_id=? AND status=? AND updated>?", 0, StatusWaiting, lastUpdated).Cols("id", "runs_on", "updated").And(jobCond).Asc("updated", "id").Limit(limit).Find(&jobs); err != nil { + return nil, false, err + } + + for _, v := range jobs { + if runner.CanMatchLabels(v.RunsOn) { + // Insert on demand, auto removed by aborted transaction if no job matches + if task.ID == 0 { + if _, err := e.Insert(task); err != nil { + return nil, false, err + } + } + // Reserve our job before preparing task, otherwise continue searching + v.TaskID = task.ID + if n, err := UpdateRunJob(ctx, v, builder.Eq{"task_id": 0}, "task_id"); err != nil { + return nil, false, err + } else if n == 1 { + var exist bool + // reload to get all fields + if job, exist, err = db.GetByID[ActionRunJob](ctx, v.ID); err != nil || !exist { + return nil, false, err + } + break + } + } + lastUpdated = v.Updated + } + if len(jobs) < limit { break } + // Randomly distribute retries over time to reduce contention + jitter := time.Duration(backoffGen.Int63n(int64(util.Iif(page < 4, page+1, 5))*20)) * time.Millisecond // random jitter + select { + case <-ctx.Done(): + return nil, false, ctx.Err() + case <-time.After(jitter): + } } if job == nil { return nil, false, nil @@ -261,32 +310,27 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask job.Started = now job.Status = StatusRunning - task := &ActionTask{ - JobID: job.ID, - Attempt: job.Attempt, - RunnerID: runner.ID, - Started: now, - Status: StatusRunning, - RepoID: job.RepoID, - OwnerID: job.OwnerID, - CommitSHA: job.CommitSHA, - IsForkPullRequest: job.IsForkPullRequest, - } - if err := task.GenerateToken(); err != nil { - return nil, false, err - } - workflowJob, err := job.ParseJob() if err != nil { return nil, false, fmt.Errorf("load job %d: %w", job.ID, err) } - if _, err := e.Insert(task); err != nil { + task.Job = job + task.JobID = job.ID + task.Attempt = job.Attempt + task.Started = now + task.Status = StatusRunning + task.RepoID = job.RepoID + task.OwnerID = job.OwnerID + task.CommitSHA = job.CommitSHA + task.IsForkPullRequest = job.IsForkPullRequest + task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID) + + if err := UpdateTask(ctx, task, "job_id", "attempt", "started", "status", "repo_id", "owner_id", "commit_sha", "is_fork_pull_request", "log_filename"); err != nil { return nil, false, err } - task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID) - if err := UpdateTask(ctx, task, "log_filename"); err != nil { + if _, err := UpdateRunJob(ctx, job, builder.Eq{"id": job.ID}, "attempt", "started", "status"); err != nil { return nil, false, err } @@ -308,15 +352,6 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask task.Steps = steps } - job.TaskID = task.ID - if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil { - return nil, false, err - } else if n != 1 { - return nil, false, nil - } - - task.Job = job - if err := committer.Commit(); err != nil { return nil, false, err } diff --git a/routers/api/actions/runner/interceptor.go b/routers/api/actions/runner/interceptor.go index 521ba910e3b7c..b88444a817e0b 100644 --- a/routers/api/actions/runner/interceptor.go +++ b/routers/api/actions/runner/interceptor.go @@ -45,14 +45,21 @@ var withRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unar return nil, status.Error(codes.Unauthenticated, "unregistered runner") } - cols := []string{"last_online"} - runner.LastOnline = timeutil.TimeStampNow() - if methodName == "UpdateTask" || methodName == "UpdateLog" { - runner.LastActive = timeutil.TimeStampNow() + // Reduce db writes by only updating last active/online when needed + var cols []string + now := timeutil.TimeStampNow() + if runner.LastActive.AddDuration(actions_model.RunnerOfflineTime/2) < now { + runner.LastOnline = now + cols = append(cols, "last_online") + } + if (methodName == "UpdateTask" || methodName == "UpdateLog") && runner.LastActive.AddDuration(actions_model.RunnerIdleTime/2) < now { + runner.LastActive = now cols = append(cols, "last_active") } - if err := actions_model.UpdateRunner(ctx, runner, cols...); err != nil { - log.Error("can't update runner status: %v", err) + if cols != nil { + if err := actions_model.UpdateRunner(ctx, runner, cols...); err != nil { + log.Error("can't update runner status: %v", err) + } } ctx = context.WithValue(ctx, runnerCtxKey{}, runner)