Skip to content

Commit

Permalink
atc: refactor conn pools and algorithm
Browse files Browse the repository at this point in the history
Refactored the creation of all the different GC components.

Fixed a bug within the algorithm where if there are two inputs with the
same resource id and version, it would overwrite the version candidate
twice. This causes the restore to be set to the same value and not nil,
so when it restores, it doesn't properly set the candidate to empty.
Also it adds the feature of preferring outputs over inputs, where
outputs are always ordered before inputs so we will set the candidate to
the output version always (if they have the same resource id).

Added to the algorithm so that pinned version not found errors are added
to the next build inputs table so that it will appear in the
preparation.

[#3602]

Signed-off-by: Clara Fu <cfu@pivotal.io>
  • Loading branch information
clarafu committed Jul 25, 2019
1 parent 702e961 commit d70ddf2
Show file tree
Hide file tree
Showing 10 changed files with 403 additions and 251 deletions.
252 changes: 107 additions & 145 deletions atc/atccmd/command.go
Expand Up @@ -188,6 +188,8 @@ type RunCommand struct {
AuthFlags skycmd.AuthFlags
MainTeamFlags skycmd.AuthTeamFlags `group:"Authentication (Main Team)" namespace:"main-team"`
} `group:"Authentication"`

closeDBConns []Closer
}

var HelpError = errors.New("must specify one of `--current-db-version`, `--supported-db-version`, or `--migrate-db-to-version`")
Expand Down Expand Up @@ -505,7 +507,8 @@ func (cmd *RunCommand) Runner(positionalArguments []string) (ifrit.Runner, error
}

onExit := func() {
for _, closer := range []Closer{lockConn, apiConn, backendConn, buildGCConn, workerGCConn, resourceCacheUseGCConn, resourceConfigGCConn, resourceCacheGCConn, resourceConfigCheckSessionGCConn, artifactGCConn, containerGCConn, volumeGCConn, storage} {
closeDBConns := append([]Closer{lockConn, storage}, cmd.closeDBConns...)
for _, closer := range closeDBConns {
closer.Close()
}
}
Expand Down Expand Up @@ -925,157 +928,113 @@ func (cmd *RunCommand) constructGCMembers(
return nil, err
}

// Construct build collector with it's designated connection pool
bCollectorBuildFactory := db.NewBuildFactory(dbConn["build-collector"], lockFactory, cmd.GC.OneOffBuildGracePeriod)
members = append(members, grouper.Member{
Name: "build-collector", Runner: lockrunner.NewRunner(
logger.Session("build-collector-runner"),
gc.NewBuildCollector(bCollectorBuildFactory),
"build-collector",
lockFactory,
clock.NewClock(),
cmd.GC.Interval,
)})

// Construct worker collector
wCollectorWorkerLifecycle := db.NewWorkerLifecycle(dbConn["worker-collector"])
members = append(members, grouper.Member{
Name: "worker-collector", Runner: lockrunner.NewRunner(
logger.Session("worker-collector-runner"),
gc.NewWorkerCollector(wCollectorWorkerLifecycle),
"worker-collector",
lockFactory,
clock.NewClock(),
cmd.GC.Interval,
)})

// Construct resource cache use collector
rcacheuCollectorResourceCacheLifecycle := db.NewResourceCacheLifecycle(dbConn["resource-cache-use-collector"])
members = append(members, grouper.Member{
Name: "resource-cache-use-collector", Runner: lockrunner.NewRunner(
logger.Session("resource-cache-use-collector-runner"),
gc.NewResourceCacheUseCollector(rcacheuCollectorResourceCacheLifecycle),
"resource-cache-use-collector",
lockFactory,
clock.NewClock(),
cmd.GC.Interval,
)})

// Construct resource config collector
rconfigCollectorResourceConfigFactory := db.NewResourceConfigFactory(dbConn["resource-config-collector"], lockFactory)
members = append(members, grouper.Member{
Name: "resource-config-collector", Runner: lockrunner.NewRunner(
logger.Session("resource-config-collector-runner"),
gc.NewResourceConfigCollector(rconfigCollectorResourceConfigFactory),
"resource-config-collector",
lockFactory,
clock.NewClock(),
cmd.GC.Interval,
)})

// Construct resource cache collector
rcacheCollectorResourceCacheLifecycle := db.NewResourceCacheLifecycle(dbConn["resource-cache-use-collector"])
members = append(members, grouper.Member{
Name: "resource-cache-collector", Runner: lockrunner.NewRunner(
logger.Session("resource-cache-collector-runner"),
gc.NewResourceCacheCollector(rcacheCollectorResourceCacheLifecycle),
"resource-cache-collector",
lockFactory,
clock.NewClock(),
cmd.GC.Interval,
)})

// Construct resource config check session collector
rccsCollectorResourceConfigCheckSessionLifecycle := db.NewResourceConfigCheckSessionLifecycle(dbConn["resource-config-check-session-collector"])
members = append(members, grouper.Member{
Name: "resource-config-check-session-collector", Runner: lockrunner.NewRunner(
logger.Session("resource-config-check-session-collector-runner"),
gc.NewResourceConfigCheckSessionCollector(
rccsCollectorResourceConfigCheckSessionLifecycle,
),
"resource-config-check-session-collector",
lockFactory,
clock.NewClock(),
cmd.GC.Interval,
)})
// A list of all the garbage collector components
gcConns := []string{
"build-collector",
"worker-collector",
"resource-cache-use-collector",
"resource-config-collector",
"resource-cache-collector",
"resource-config-check-session-collector",
"artifact-collector",
"volume-collector",
"container-collector",
}

// Construct artifact collector
aCollectorArtifactLifecycle := db.NewArtifactLifecycle(dbConn["artifact-collector"])
members = append(members, grouper.Member{
Name: "artifact-collector", Runner: lockrunner.NewRunner(
logger.Session("artifact-collector-runner"),
gc.NewArtifactCollector(aCollectorArtifactLifecycle),
"artifact-collector",
lockFactory,
clock.NewClock(),
cmd.GC.Interval,
)})
for _, name := range gcConns {
gcConn, err := cmd.constructDBConn(retryingDriverName, logger, 1, name, lockFactory)
if err != nil {
return nil, err
}

// Construct volume collector
vCollectorVolumeRepository := db.NewVolumeRepository(dbConn["volume-collector"])
members = append(members, grouper.Member{
Name: "volume-collector", Runner: lockrunner.NewRunner(
logger.Session("volume-collector-runner"),
gc.NewVolumeCollector(
vCollectorVolumeRepository,
var collector gc.Collector
switch name {
case "build-collector":
buildFactory := db.NewBuildFactory(gcConn, lockFactory, cmd.GC.OneOffBuildGracePeriod)
collector = gc.NewBuildCollector(buildFactory)
case "worker-collector":
workerLifecycle := db.NewWorkerLifecycle(gcConn)
collector = gc.NewWorkerCollector(workerLifecycle)
case "resource-cache-use-collector":
resourceCacheLifecycle := db.NewResourceCacheLifecycle(gcConn)
collector = gc.NewResourceCacheUseCollector(resourceCacheLifecycle)
case "resource-config-collector":
resourceConfigFactory := db.NewResourceConfigFactory(gcConn, lockFactory)
collector = gc.NewResourceConfigCollector(resourceConfigFactory)
case "resource-cache-collector":
resourceCacheLifecycle := db.NewResourceCacheLifecycle(gcConn)
collector = gc.NewResourceCacheCollector(resourceCacheLifecycle)
case "resource-config-check-session-collector":
resourceConfigCheckSessionLifecycle := db.NewResourceConfigCheckSessionLifecycle(gcConn)
collector = gc.NewResourceConfigCheckSessionCollector(
resourceConfigCheckSessionLifecycle,
)
case "artifact-collector":
artifactLifecycle := db.NewArtifactLifecycle(gcConn)
collector = gc.NewArtifactCollector(artifactLifecycle)
case "volume-collector":
volumeRepository := db.NewVolumeRepository(gcConn)
collector = gc.NewVolumeCollector(
volumeRepository,
cmd.GC.MissingGracePeriod,
),
"volume-collector",
lockFactory,
clock.NewClock(),
cmd.GC.Interval,
)})

// Construct container collector
cCollectorContainerRepository := db.NewContainerRepository(dbConn["container-collector"])
cCollectorResourceCacheFactory := db.NewResourceCacheFactory(dbConn["container-collector"], lockFactory)
cCollectorResourceConfigFactory := db.NewResourceConfigFactory(dbConn["container-collector"], lockFactory)
cCollectorFetchSourceFactory := resource.NewFetchSourceFactory(cCollectorResourceCacheFactory, resourceFactory)
cCollectorResourceFetcher := resource.NewFetcher(clock.NewClock(), lockFactory, cCollectorFetchSourceFactory)
cCollectorImageResourceFetcherFactory := image.NewImageResourceFetcherFactory(
cCollectorResourceCacheFactory,
cCollectorResourceConfigFactory,
cCollectorResourceFetcher,
resourceFactory,
)
cCollectorWorkerBaseResourceTypeFactory := db.NewWorkerBaseResourceTypeFactory(dbConn["container-collector"])
cCollectorTaskCacheFactory := db.NewTaskCacheFactory(dbConn["container-collector"])
cCollectorWorkerTaskCacheFactory := db.NewWorkerTaskCacheFactory(dbConn["container-collector"])
cCollectorVolumeRepository := db.NewVolumeRepository(dbConn["container-collector"])
cCollectorTeamFactory := db.NewTeamFactory(dbConn["container-collector"], lockFactory)
cCollectorWorkerFactory := db.NewWorkerFactory(dbConn["container-collector"])
cCollectorWorkerProvider := worker.NewDBWorkerProvider(
lockFactory,
retryhttp.NewExponentialBackOffFactory(5*time.Minute),
image.NewImageFactory(cCollectorImageResourceFetcherFactory),
cCollectorResourceCacheFactory,
cCollectorResourceConfigFactory,
cCollectorWorkerBaseResourceTypeFactory,
cCollectorTaskCacheFactory,
cCollectorWorkerTaskCacheFactory,
cCollectorVolumeRepository,
cCollectorTeamFactory,
cCollectorWorkerFactory,
workerVersion,
cmd.BaggageclaimResponseHeaderTimeout,
)
members = append(members, grouper.Member{
Name: "container-collector", Runner: lockrunner.NewRunner(
logger.Session("container-collector-runner"),
gc.NewContainerCollector(
cCollectorContainerRepository,
)
case "container-collector":
containerRepository := db.NewContainerRepository(gcConn)
resourceCacheFactory := db.NewResourceCacheFactory(gcConn, lockFactory)
resourceConfigFactory := db.NewResourceConfigFactory(gcConn, lockFactory)
fetchSourceFactory := resource.NewFetchSourceFactory(resourceCacheFactory, resourceFactory)
resourceFetcher := resource.NewFetcher(clock.NewClock(), lockFactory, fetchSourceFactory)
imageResourceFetcherFactory := image.NewImageResourceFetcherFactory(
resourceCacheFactory,
resourceConfigFactory,
resourceFetcher,
resourceFactory,
)
workerBaseResourceTypeFactory := db.NewWorkerBaseResourceTypeFactory(gcConn)
taskCacheFactory := db.NewTaskCacheFactory(gcConn)
workerTaskCacheFactory := db.NewWorkerTaskCacheFactory(gcConn)
volumeRepository := db.NewVolumeRepository(gcConn)
teamFactory := db.NewTeamFactory(gcConn, lockFactory)
workerFactory := db.NewWorkerFactory(gcConn)
workerProvider := worker.NewDBWorkerProvider(
lockFactory,
retryhttp.NewExponentialBackOffFactory(5*time.Minute),
image.NewImageFactory(imageResourceFetcherFactory),
resourceCacheFactory,
resourceConfigFactory,
workerBaseResourceTypeFactory,
taskCacheFactory,
workerTaskCacheFactory,
volumeRepository,
teamFactory,
workerFactory,
workerVersion,
cmd.BaggageclaimResponseHeaderTimeout,
)
collector = gc.NewContainerCollector(
containerRepository,
gc.NewWorkerJobRunner(
logger.Session("container-collector-worker-job-runner"),
cCollectorWorkerProvider,
logger.Session(name+"-runner"),
workerProvider,
time.Minute,
),
cmd.GC.MissingGracePeriod,
),
"container-collector",
lockFactory,
clock.NewClock(),
cmd.GC.Interval,
)})
)
default:
panic("garbage collector does not exist: " + name)
}

members = append(members, grouper.Member{
Name: name,
Runner: lockrunner.NewRunner(
logger.Session(name+"-runner"),
collector,
name,
lockFactory,
clock.NewClock(),
cmd.GC.Interval,
)})
}

return members, nil
}
Expand Down Expand Up @@ -1366,6 +1325,9 @@ func (cmd *RunCommand) constructDBConn(
// Prepare
dbConn.SetMaxOpenConns(maxConn)

// Add connection to the connection pool list
cmd.closeDBConns = append(cmd.closeDBConns, dbConn)

return dbConn, nil
}

Expand Down
1 change: 1 addition & 0 deletions atc/db/build.go
Expand Up @@ -364,6 +364,7 @@ func (b *build) Finish(status BuildStatus) error {
}

// XXX: NEED TESTS
// XXX: Should we order the outputs returned by the version? greatest to least if it the same resource ID
rows, err := psql.Select("o.resource_id", "o.version_md5").
From("build_resource_config_version_outputs o").
Where(sq.Eq{
Expand Down
11 changes: 6 additions & 5 deletions atc/db/build_test.go
Expand Up @@ -1467,10 +1467,10 @@ var _ = Describe("Build", func() {
versionsDB, err := pipeline.LoadVersionsDB()
Expect(err).ToNot(HaveOccurred())

buildID, found, err := versionsDB.LatestConstraintBuildID(build.ID(), otherJob.ID())
passedJobs := map[int]bool{otherJob.ID(): true}
buildPipes, err := versionsDB.LatestBuildPipes(build.ID(), passedJobs)
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeTrue())
Expect(buildID).To(Equal(otherBuild.ID()))
Expect(buildPipes[otherJob.ID()]).To(Equal(otherBuild.ID()))
})
})

Expand All @@ -1492,9 +1492,10 @@ var _ = Describe("Build", func() {
versionsDB, err := pipeline.LoadVersionsDB()
Expect(err).ToNot(HaveOccurred())

_, found, err := versionsDB.LatestConstraintBuildID(build.ID(), otherJob.ID())
passedJobs := map[int]bool{otherJob.ID(): true}
buildPipes, err := versionsDB.LatestBuildPipes(build.ID(), passedJobs)
Expect(err).ToNot(HaveOccurred())
Expect(found).To(BeFalse())
Expect(buildPipes).To(HaveLen(0))
})
})
})
Expand Down
5 changes: 4 additions & 1 deletion atc/db/pipeline.go
Expand Up @@ -15,6 +15,8 @@ import (
gocache "github.com/patrickmn/go-cache"
)

const algorithmLimitRows = 100

type ErrResourceNotFound struct {
Name string
}
Expand Down Expand Up @@ -686,7 +688,8 @@ var schedulerCache = gocache.New(10*time.Second, 10*time.Second)

func (p *pipeline) LoadVersionsDB() (*VersionsDB, error) {
db := &VersionsDB{
Conn: p.conn,
Conn: p.conn,
LimitRows: algorithmLimitRows,

Cache: schedulerCache,

Expand Down

0 comments on commit d70ddf2

Please sign in to comment.