Skip to content

Commit

Permalink
Merge pull request #284 from concourse/optimize-cache-busting
Browse files Browse the repository at this point in the history
optimize versions db cache busting
  • Loading branch information
Joshua Winters committed Jun 22, 2018
2 parents dd3361e + d9416f3 commit ef5c21f
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 107 deletions.
25 changes: 20 additions & 5 deletions db/build.go
Expand Up @@ -334,6 +334,13 @@ func (b *build) Finish(status BuildStatus) error {
}
}

if b.jobID != 0 {
err = bumpCacheIndex(tx, b.pipelineID)
if err != nil {
return err
}
}

err = tx.Commit()
if err != nil {
return err
Expand Down Expand Up @@ -742,6 +749,11 @@ func (b *build) SaveInput(input BuildInput) error {
return err
}

err = bumpCacheIndex(tx, b.pipelineID)
if err != nil {
return err
}

return tx.Commit()
}

Expand Down Expand Up @@ -793,6 +805,11 @@ func (b *build) UseInputs(inputs []BuildInput) error {
}
}

err = bumpCacheIndex(tx, b.pipelineID)
if err != nil {
return err
}

return tx.Commit()
}

Expand Down Expand Up @@ -904,8 +921,7 @@ func (b *build) GetVersionedResources() (SavedVersionedResources, error) {
vr.version,
vr.metadata,
vr.type,
r.name,
vr.modified_time
r.name
FROM builds b
INNER JOIN jobs j ON b.job_id = j.id
INNER JOIN build_inputs bi ON bi.build_id = b.id
Expand All @@ -920,8 +936,7 @@ func (b *build) GetVersionedResources() (SavedVersionedResources, error) {
vr.version,
vr.metadata,
vr.type,
r.name,
vr.modified_time
r.name
FROM builds b
INNER JOIN jobs j ON b.job_id = j.id
INNER JOIN build_outputs bo ON bo.build_id = b.id
Expand All @@ -945,7 +960,7 @@ func (b *build) getVersionedResources(resourceRequest string) (SavedVersionedRes
var versionedResource SavedVersionedResource
var versionJSON []byte
var metadataJSON []byte
err = rows.Scan(&versionedResource.ID, &versionedResource.Enabled, &versionJSON, &metadataJSON, &versionedResource.Type, &versionedResource.Resource, &versionedResource.ModifiedTime)
err = rows.Scan(&versionedResource.ID, &versionedResource.Enabled, &versionJSON, &metadataJSON, &versionedResource.Type, &versionedResource.Resource)
if err != nil {
return nil, err
}
Expand Down
118 changes: 82 additions & 36 deletions db/migration/bindata.go

Large diffs are not rendered by default.

@@ -0,0 +1,6 @@
BEGIN;
ALTER TABLE pipelines DROP COLUMN cache_index;
ALTER TABLE versioned_resources ADD COLUMN modified_time timestamp without time zone DEFAULT now() NOT NULL;
ALTER TABLE build_inputs ADD COLUMN modified_time timestamp without time zone DEFAULT now() NOT NULL;
ALTER TABLE build_outputs ADD COLUMN modified_time timestamp without time zone DEFAULT now() NOT NULL;
COMMIT;
@@ -0,0 +1,6 @@
BEGIN;
ALTER TABLE pipelines ADD COLUMN cache_index integer NOT NULL DEFAULT 1;
ALTER TABLE versioned_resources DROP COLUMN modified_time;
ALTER TABLE build_inputs DROP COLUMN modified_time;
ALTER TABLE build_outputs DROP COLUMN modified_time;
COMMIT;
118 changes: 65 additions & 53 deletions db/pipeline.go
Expand Up @@ -117,7 +117,7 @@ type pipeline struct {
paused bool
public bool

cachedAt time.Time
cacheIndex int
versionsDB *algorithm.VersionsDB

conn Conn
Expand Down Expand Up @@ -405,6 +405,11 @@ func (p *pipeline) SaveResourceVersions(config atc.ResourceConfig, versions []at
}
}

err = bumpCacheIndex(tx, p.id)
if err != nil {
return err
}

return tx.Commit()
}

Expand Down Expand Up @@ -574,7 +579,7 @@ func (p *pipeline) GetLatestVersionedResource(resourceName string) (SavedVersion
},
}

err := psql.Select("v.id, v.enabled, v.type, v.version, v.metadata, v.modified_time, v.check_order").
err := psql.Select("v.id, v.enabled, v.type, v.version, v.metadata, v.check_order").
From("versioned_resources v, resources r").
Where(sq.Eq{
"r.name": resourceName,
Expand All @@ -585,7 +590,7 @@ func (p *pipeline) GetLatestVersionedResource(resourceName string) (SavedVersion
Limit(1).
RunWith(p.conn).
QueryRow().
Scan(&svr.ID, &svr.Enabled, &svr.Type, &versionBytes, &metadataBytes, &svr.ModifiedTime, &svr.CheckOrder)
Scan(&svr.ID, &svr.Enabled, &svr.Type, &versionBytes, &metadataBytes, &svr.CheckOrder)
if err != nil {
if err == sql.ErrNoRows {
return SavedVersionedResource{}, false, nil
Expand Down Expand Up @@ -1008,12 +1013,18 @@ func (p *pipeline) Destroy() error {
}

func (p *pipeline) LoadVersionsDB() (*algorithm.VersionsDB, error) {
latestModifiedTime, err := p.getLatestModifiedTime()
var cacheIndex int
err := psql.Select("cache_index").
From("pipelines").
Where(sq.Eq{"id": p.id}).
RunWith(p.conn).
QueryRow().
Scan(&cacheIndex)
if err != nil {
return nil, err
}

if p.versionsDB != nil && p.cachedAt.Equal(latestModifiedTime) {
if p.versionsDB != nil && p.cacheIndex == cacheIndex {
return p.versionsDB, nil
}

Expand Down Expand Up @@ -1165,7 +1176,7 @@ func (p *pipeline) LoadVersionsDB() (*algorithm.VersionsDB, error) {
}

p.versionsDB = db
p.cachedAt = latestModifiedTime
p.cacheIndex = cacheIndex

return db, nil
}
Expand Down Expand Up @@ -1311,6 +1322,11 @@ func (p *pipeline) saveOutput(buildID int, vr VersionedResource) error {
return err
}

err = bumpCacheIndex(tx, p.id)
if err != nil {
return err
}

return tx.Commit()
}

Expand Down Expand Up @@ -1386,12 +1402,11 @@ func (p *pipeline) saveVersionedResource(tx Tx, resourceID int, vr VersionedReso

var id int
var enabled bool
var modifiedTime time.Time
var checkOrder int

result, err := tx.Exec(`
INSERT INTO versioned_resources (resource_id, type, version, metadata, modified_time)
SELECT $1, $2, $3, $4, now()
INSERT INTO versioned_resources (resource_id, type, version, metadata)
SELECT $1, $2, $3, $4
WHERE NOT EXISTS (
SELECT 1
FROM versioned_resources
Expand Down Expand Up @@ -1420,18 +1435,17 @@ func (p *pipeline) saveVersionedResource(tx Tx, resourceID int, vr VersionedReso
if len(vr.Metadata) > 0 {
err = psql.Update("versioned_resources").
Set("metadata", string(metadataJSON)).
Set("modified_time", sq.Expr("now()")).
Where(sq.Eq{
"resource_id": resourceID,
"type": vr.Type,
"version": string(versionJSON),
}).
Suffix("RETURNING id, enabled, metadata, modified_time, check_order").
Suffix("RETURNING id, enabled, metadata, check_order").
RunWith(tx).
QueryRow().
Scan(&id, &enabled, &savedMetadata, &modifiedTime, &checkOrder)
Scan(&id, &enabled, &savedMetadata, &checkOrder)
} else {
err = psql.Select("id, enabled, metadata, modified_time, check_order").
err = psql.Select("id, enabled, metadata, check_order").
From("versioned_resources").
Where(sq.Eq{
"resource_id": resourceID,
Expand All @@ -1440,7 +1454,7 @@ func (p *pipeline) saveVersionedResource(tx Tx, resourceID int, vr VersionedReso
}).
RunWith(tx).
QueryRow().
Scan(&id, &enabled, &savedMetadata, &modifiedTime, &checkOrder)
Scan(&id, &enabled, &savedMetadata, &checkOrder)
}
if err != nil {
return SavedVersionedResource{}, false, err
Expand All @@ -1453,9 +1467,8 @@ func (p *pipeline) saveVersionedResource(tx Tx, resourceID int, vr VersionedReso

created := rowsAffected != 0
return SavedVersionedResource{
ID: id,
Enabled: enabled,
ModifiedTime: modifiedTime,
ID: id,
Enabled: enabled,

VersionedResource: vr,
CheckOrder: checkOrder,
Expand All @@ -1482,11 +1495,17 @@ func (p *pipeline) incrementCheckOrderWhenNewerVersion(tx Tx, resourceID int, re
}

func (p *pipeline) toggleVersionedResource(versionedResourceID int, enable bool) error {
tx, err := p.conn.Begin()
if err != nil {
return err
}

defer Rollback(tx)

rows, err := psql.Update("versioned_resources").
Set("enabled", enable).
Set("modified_time", sq.Expr("now()")).
Where(sq.Eq{"id": versionedResourceID}).
RunWith(p.conn).
RunWith(tx).
Exec()
if err != nil {
return err
Expand All @@ -1501,41 +1520,12 @@ func (p *pipeline) toggleVersionedResource(versionedResourceID int, enable bool)
return nonOneRowAffectedError{rowsAffected}
}

return nil
}
err = bumpCacheIndex(tx, p.id)
if err != nil {
return err
}

func (p *pipeline) getLatestModifiedTime() (time.Time, error) {
var maxModifiedTime time.Time

err := p.conn.QueryRow(`
SELECT
CASE
WHEN b_max > vr_max AND b_max > bi_max THEN b_max
WHEN bi_max > vr_max THEN bi_max
ELSE vr_max
END
FROM
(
SELECT COALESCE(MAX(b.end_time), 'epoch') as b_max
FROM builds b
WHERE b.pipeline_id = $1
) bo,
(
SELECT COALESCE(MAX(bi.modified_time), 'epoch') as bi_max
FROM build_inputs bi
LEFT OUTER JOIN versioned_resources v ON v.id = bi.versioned_resource_id
LEFT OUTER JOIN resources r ON r.id = v.resource_id
WHERE r.pipeline_id = $1
) bi,
(
SELECT COALESCE(MAX(vr.modified_time), 'epoch') as vr_max
FROM versioned_resources vr
LEFT OUTER JOIN resources r ON r.id = vr.resource_id
WHERE r.pipeline_id = $1
) vr
`, p.id).Scan(&maxModifiedTime)

return maxModifiedTime, err
return tx.Commit()
}

func (p *pipeline) getBuildsFrom(view string) (map[string]Build, error) {
Expand Down Expand Up @@ -1563,6 +1553,28 @@ func (p *pipeline) getBuildsFrom(view string) (map[string]Build, error) {
return nextBuilds, nil
}

func bumpCacheIndex(tx Tx, pipelineID int) error {
res, err := psql.Update("pipelines").
Set("cache_index", sq.Expr("cache_index + 1")).
Where(sq.Eq{"id": pipelineID}).
RunWith(tx).
Exec()
if err != nil {
return err
}

rows, err := res.RowsAffected()
if err != nil {
return err
}

if rows != 1 {
return nonOneRowAffectedError{rows}
}

return nil
}

func getNewBuildNameForJob(tx Tx, jobName string, pipelineID int) (string, int, error) {
var buildName string
var jobID int
Expand Down
7 changes: 2 additions & 5 deletions db/pipeline_resource.go
@@ -1,8 +1,6 @@
package db

import (
"time"

"github.com/concourse/atc"
)

Expand All @@ -14,9 +12,8 @@ type VersionedResource struct {
}

type SavedVersionedResource struct {
ID int
Enabled bool
ModifiedTime time.Time
ID int
Enabled bool
VersionedResource
CheckOrder int
}
Expand Down

0 comments on commit ef5c21f

Please sign in to comment.