Skip to content

Commit

Permalink
Merge pull request #8504 from SimonXming/reduce-redundant-schedule
Browse files Browse the repository at this point in the history
Reduce redundant job schedule
  • Loading branch information
xtremerui committed Aug 5, 2022
2 parents 0b54be4 + bed96fa commit f7a0bb0
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 43 deletions.
56 changes: 55 additions & 1 deletion atc/db/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1259,13 +1259,67 @@ var _ = Describe("Build", func() {
Expect(buildOutputs[0].Version).To(Equal(outputVersion))
})

Context("with a job in a separate team downstream of the same resource config", func() {
Context("with a job in a separate team downstream of the same non-trigger resource config", func() {
var otherScenario *dbtest.Scenario

var beforeTime, otherJobBeforeTime time.Time
var otherTeamBeforeTime, otherTeamOtherJobBeforeTime time.Time

BeforeEach(func() {
otherScenario = dbtest.Setup(
builder.WithTeam("other-team"),
builder.WithPipeline(pipelineConfig),
builder.WithResourceVersions("some-resource", atc.Version{"some": "version"}),
builder.WithResourceVersions("some-other-resource", atc.Version{"some": "other-version"}),
)

beforeTime = scenario.Job("some-job").ScheduleRequestedTime()
otherTeamBeforeTime = otherScenario.Job("some-job").ScheduleRequestedTime()

otherJobBeforeTime = scenario.Job("some-other-job").ScheduleRequestedTime()
otherTeamOtherJobBeforeTime = otherScenario.Job("some-other-job").ScheduleRequestedTime()
})

It("requests schedule on jobs which use the same config", func() {
Expect(scenario.Job("some-job").ScheduleRequestedTime()).To(BeTemporally(">", beforeTime))
Expect(otherScenario.Job("some-job").ScheduleRequestedTime()).To(BeTemporally("==", otherTeamBeforeTime))

Expect(scenario.Job("some-other-job").ScheduleRequestedTime()).To(BeTemporally("==", otherJobBeforeTime))
Expect(otherScenario.Job("some-other-job").ScheduleRequestedTime()).To(BeTemporally("==", otherTeamOtherJobBeforeTime))
})
})

Context("with a job in a separate team downstream of the same trigger resource config", func() {
var otherScenario *dbtest.Scenario

var beforeTime, otherJobBeforeTime time.Time
var otherTeamBeforeTime, otherTeamOtherJobBeforeTime time.Time

BeforeEach(func() {
pipelineConfig.Jobs = atc.JobConfigs{
{
Name: "some-job",
PlanSequence: []atc.Step{
{
Config: &atc.GetStep{
Name: "some-resource",
Trigger: true,
},
},
},
},
{
Name: "some-other-job",
PlanSequence: []atc.Step{
{
Config: &atc.GetStep{
Name: "some-other-resource",
Trigger: true,
},
},
},
},
}
otherScenario = dbtest.Setup(
builder.WithTeam("other-team"),
builder.WithPipeline(pipelineConfig),
Expand Down
26 changes: 19 additions & 7 deletions atc/db/resource_config_scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,26 @@ func incrementCheckOrder(tx Tx, rcsID int, version string) error {
// Updating multiple rows using a SELECT subquery does not preserve the same
// order for the updates, which can lead to deadlocking.
func requestScheduleForJobsUsingResourceConfigScope(tx Tx, rcsID int) error {
rows, err := psql.Select("DISTINCT j.job_id").
From("job_inputs j").
Join("resources r ON r.id = j.resource_id").
Where(sq.Eq{
"r.resource_config_scope_id": rcsID,
"j.passed_job_id": nil,
rows, err := psql.Select("DISTINCT ji.job_id").
From("job_inputs ji").
Join("resources r ON r.id = ji.resource_id").
Join("jobs j ON j.id = ji.job_id").
Where(sq.Or{
sq.Eq{
"r.resource_config_scope_id": rcsID,
"ji.passed_job_id": nil,
"ji.trigger": true,
},
sq.And{
sq.Eq{
"r.resource_config_scope_id": rcsID,
"ji.passed_job_id": nil,
"ji.trigger": false,
},
sq.NotEq{"j.next_build_id": nil},
},
}).
OrderBy("j.job_id DESC").
OrderBy("ji.job_id DESC").
RunWith(tx).
Query()
if err != nil {
Expand Down
108 changes: 73 additions & 35 deletions atc/db/resource_config_scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,47 @@ import (
var _ = Describe("Resource Config Scope", func() {
var scenario *dbtest.Scenario
var resourceScope db.ResourceConfigScope

BeforeEach(func() {
scenario = dbtest.Setup(
builder.WithPipeline(atc.Config{
Resources: atc.ResourceConfigs{
{
Name: "some-resource",
Type: "some-base-resource-type",
Source: atc.Source{
"some": "source",
},
},
var pipelineConfig atc.Config = atc.Config{
Resources: atc.ResourceConfigs{
{
Name: "some-resource",
Type: "some-base-resource-type",
Source: atc.Source{
"some": "source",
},
Jobs: atc.JobConfigs{
},
},
Jobs: atc.JobConfigs{
{
Name: "some-job",
PlanSequence: []atc.Step{
{
Name: "some-job",
PlanSequence: []atc.Step{
{
Config: &atc.GetStep{
Name: "some-resource",
},
},
Config: &atc.GetStep{
Name: "some-resource",
},
},
},
},
{
Name: "downstream-job",
PlanSequence: []atc.Step{
{
Name: "downstream-job",
PlanSequence: []atc.Step{
{
Config: &atc.GetStep{
Name: "some-resource",
Passed: []string{"some-job"},
},
},
Config: &atc.GetStep{
Name: "some-resource",
Passed: []string{"some-job"},
},
},
{
Name: "some-other-job",
},
},
}),
},
{
Name: "some-other-job",
},
},
}

BeforeEach(func() {
scenario = dbtest.Setup(
builder.WithPipeline(pipelineConfig),
builder.WithResourceVersions("some-resource"),
)

Expand Down Expand Up @@ -138,8 +139,8 @@ var _ = Describe("Resource Config Scope", func() {
Expect(latestVR.CheckOrder()).To(Equal(2))
})

Context("when a new version is added", func() {
It("requests schedule on the jobs that use the resource", func() {
Context("when a new version is added for non-trigger resource", func() {
It("should not request schedule on the jobs that use the resource", func() {
err := resourceScope.SaveVersions(nil, originalVersionSlice)
Expect(err).ToNot(HaveOccurred())

Expand All @@ -152,7 +153,7 @@ var _ = Describe("Resource Config Scope", func() {
err = resourceScope.SaveVersions(nil, newVersions)
Expect(err).ToNot(HaveOccurred())

Expect(scenario.Job("some-job").ScheduleRequestedTime()).Should(BeTemporally(">", requestedSchedule))
Expect(scenario.Job("some-job").ScheduleRequestedTime()).Should(BeTemporally("==", requestedSchedule))
})

It("does not request schedule on the jobs that use the resource but through passed constraints", func() {
Expand Down Expand Up @@ -187,6 +188,43 @@ var _ = Describe("Resource Config Scope", func() {
Expect(scenario.Job("some-other-job").ScheduleRequestedTime()).Should(BeTemporally("==", requestedSchedule))
})
})

Context("when a new version is added for trigger resource", func() {
BeforeEach(func() {
pipelineConfig.Jobs = atc.JobConfigs{
{
Name: "some-job",
PlanSequence: []atc.Step{
{
Config: &atc.GetStep{
Name: "some-resource",
Trigger: true,
},
},
},
},
}
scenario.Run(
builder.WithPipeline(pipelineConfig),
)
})

It("requests schedule on the jobs that use the resource", func() {
err := resourceScope.SaveVersions(nil, originalVersionSlice)
Expect(err).ToNot(HaveOccurred())

requestedSchedule := scenario.Job("some-job").ScheduleRequestedTime()

newVersions := []atc.Version{
{"ref": "v0"},
{"ref": "v3"},
}
err = resourceScope.SaveVersions(nil, newVersions)
Expect(err).ToNot(HaveOccurred())

Expect(scenario.Job("some-job").ScheduleRequestedTime()).Should(BeTemporally(">", requestedSchedule))
})
})
})
})

Expand Down

0 comments on commit f7a0bb0

Please sign in to comment.