Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 70 additions & 35 deletions models/actions/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/subtle"
"errors"
"fmt"
"math/rand"
"time"

auth_model "code.gitea.io/gitea/models/auth"
Expand Down Expand Up @@ -223,6 +224,17 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask

e := db.GetEngine(ctx)

// Create a new task record as early as possible to be able to reserve jobs
task := &ActionTask{
RunnerID: runner.ID,
Status: StatusBlocked,
}
// This is a requirement of the database schema
if err := task.GenerateToken(); err != nil {
return nil, false, err
}
// Insert the task on demand if task.ID == 0, if at least one job matches avoid unnecessary id increment

jobCond := builder.NewCond()
if runner.RepoID != 0 {
jobCond = builder.Eq{"repo_id": runner.RepoID}
Expand All @@ -235,19 +247,56 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
jobCond = builder.In("run_id", builder.Select("id").From("action_run").Where(jobCond))
}

var jobs []*ActionRunJob
if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil {
return nil, false, err
}
var job *ActionRunJob

const limit = 10
// TODO store the last position to continue searching next time inside the runner record
// e.g. we would start again from zero if no job matches our known labels
// For stable paging
var lastUpdated timeutil.TimeStamp
// TODO: a more efficient way to filter labels
var job *ActionRunJob
log.Trace("runner labels: %v", runner.AgentLabels)
for _, v := range jobs {
if runner.CanMatchLabels(v.RunsOn) {
job = v
backoffGen := rand.New(rand.NewSource(time.Now().UnixNano() ^ runner.ID))
for page := 0; job == nil; page++ {
var jobs []*ActionRunJob
// Load only 10 job in a batch without all fields for memory / db load reduction
if err := e.Where("task_id=? AND status=? AND updated>?", 0, StatusWaiting, lastUpdated).Cols("id", "runs_on", "updated").And(jobCond).Asc("updated", "id").Limit(limit).Find(&jobs); err != nil {
return nil, false, err
}

for _, v := range jobs {
if runner.CanMatchLabels(v.RunsOn) {
// Insert on demand, auto removed by aborted transaction if no job matches
if task.ID == 0 {
if _, err := e.Insert(task); err != nil {
return nil, false, err
}
}
// Reserve our job before preparing task, otherwise continue searching
v.TaskID = task.ID
if n, err := UpdateRunJob(ctx, v, builder.Eq{"task_id": 0}, "task_id"); err != nil {
return nil, false, err
} else if n == 1 {
var exist bool
// reload to get all fields
if job, exist, err = db.GetByID[ActionRunJob](ctx, v.ID); err != nil || !exist {
return nil, false, err
}
break
}
}
lastUpdated = v.Updated
}
if len(jobs) < limit {
break
}
// Randomly distribute retries over time to reduce contention
jitter := time.Duration(backoffGen.Int63n(int64(util.Iif(page < 4, page+1, 5))*20)) * time.Millisecond // random jitter
select {
case <-ctx.Done():
return nil, false, ctx.Err()
case <-time.After(jitter):
}
}
if job == nil {
return nil, false, nil
Expand All @@ -261,32 +310,27 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
job.Started = now
job.Status = StatusRunning

task := &ActionTask{
JobID: job.ID,
Attempt: job.Attempt,
RunnerID: runner.ID,
Started: now,
Status: StatusRunning,
RepoID: job.RepoID,
OwnerID: job.OwnerID,
CommitSHA: job.CommitSHA,
IsForkPullRequest: job.IsForkPullRequest,
}
if err := task.GenerateToken(); err != nil {
return nil, false, err
}

workflowJob, err := job.ParseJob()
if err != nil {
return nil, false, fmt.Errorf("load job %d: %w", job.ID, err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ends in a fetch loop such job should be updated to failed?

}

if _, err := e.Insert(task); err != nil {
task.Job = job
task.JobID = job.ID
task.Attempt = job.Attempt
task.Started = now
task.Status = StatusRunning
task.RepoID = job.RepoID
task.OwnerID = job.OwnerID
task.CommitSHA = job.CommitSHA
task.IsForkPullRequest = job.IsForkPullRequest
task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID)

if err := UpdateTask(ctx, task, "job_id", "attempt", "started", "status", "repo_id", "owner_id", "commit_sha", "is_fork_pull_request", "log_filename"); err != nil {
return nil, false, err
}

task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID)
if err := UpdateTask(ctx, task, "log_filename"); err != nil {
if _, err := UpdateRunJob(ctx, job, builder.Eq{"id": job.ID}, "attempt", "started", "status"); err != nil {
return nil, false, err
}

Expand All @@ -308,15 +352,6 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
task.Steps = steps
}

job.TaskID = task.ID
if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil {
return nil, false, err
} else if n != 1 {
return nil, false, nil
}

task.Job = job

if err := committer.Commit(); err != nil {
return nil, false, err
}
Expand Down
19 changes: 13 additions & 6 deletions routers/api/actions/runner/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,21 @@ var withRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unar
return nil, status.Error(codes.Unauthenticated, "unregistered runner")
}

cols := []string{"last_online"}
runner.LastOnline = timeutil.TimeStampNow()
if methodName == "UpdateTask" || methodName == "UpdateLog" {
runner.LastActive = timeutil.TimeStampNow()
// Reduce db writes by only updating last active/online when needed
var cols []string
now := timeutil.TimeStampNow()
if runner.LastActive.AddDuration(actions_model.RunnerOfflineTime/2) < now {
runner.LastOnline = now
cols = append(cols, "last_online")
}
if (methodName == "UpdateTask" || methodName == "UpdateLog") && runner.LastActive.AddDuration(actions_model.RunnerIdleTime/2) < now {
runner.LastActive = now
cols = append(cols, "last_active")
}
if err := actions_model.UpdateRunner(ctx, runner, cols...); err != nil {
log.Error("can't update runner status: %v", err)
if cols != nil {
if err := actions_model.UpdateRunner(ctx, runner, cols...); err != nil {
log.Error("can't update runner status: %v", err)
}
}

ctx = context.WithValue(ctx, runnerCtxKey{}, runner)
Expand Down