Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pq.Array to avoid hitting parameter limits #8528

Merged
merged 2 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 7 additions & 16 deletions atc/db/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"
"github.com/lib/pq"
"time"

"code.cloudfoundry.org/lager"
Expand Down Expand Up @@ -993,36 +992,28 @@ func (p *pipeline) DeleteBuildEventsByBuildIDs(buildIDs []int) error {
return nil
}

interfaceBuildIDs := make([]interface{}, len(buildIDs))
for i, buildID := range buildIDs {
interfaceBuildIDs[i] = buildID
}

indexStrings := make([]string, len(buildIDs))
for i := range indexStrings {
indexStrings[i] = "$" + strconv.Itoa(i+1)
}

tx, err := p.conn.Begin()
if err != nil {
return err
}

defer Rollback(tx)

a := pq.Array(buildIDs)

_, err = tx.Exec(`
DELETE FROM `+p.eventsTable()+`
WHERE build_id IN (`+strings.Join(indexStrings, ",")+`)
`, interfaceBuildIDs...)
WHERE build_id = ANY($1)
`, a)
if err != nil {
return err
}

_, err = tx.Exec(`
UPDATE builds
SET reap_time = now()
WHERE id IN (`+strings.Join(indexStrings, ",")+`)
`, interfaceBuildIDs...)
WHERE id = ANY($1)
`, a)
if err != nil {
return err
}
Expand Down
46 changes: 42 additions & 4 deletions atc/db/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package db_test

import (
"fmt"
"strconv"
"time"

"code.cloudfoundry.org/clock"
"fmt"
"github.com/concourse/concourse/atc/creds"
"github.com/concourse/concourse/atc/creds/credsfakes"
"github.com/concourse/concourse/atc/db/dbtest"
"github.com/concourse/concourse/vars"
"github.com/lib/pq"
"strconv"
"time"

"github.com/concourse/concourse/atc"
"github.com/concourse/concourse/atc/db"
Expand Down Expand Up @@ -1452,6 +1452,44 @@ var _ = Describe("Pipeline", func() {
// Not required behavior, just a sanity check for what I think will happen
Expect(build4DB.ReapTime()).To(Equal(build1DB.ReapTime()))
})
It("deletes all build logs when there are more than 65_536", func() {
txn, err := dbConn.Begin()
Expect(err).ToNot(HaveOccurred())

// we break the abstractions so that we can efficiently bulk insert a heap of build events
// if we did this the "right" way (like the test above) it's excruciatingly slow.
stmt, err := txn.Prepare(pq.CopyIn(fmt.Sprintf("pipeline_build_events_%d", pipeline.ID()), "event_id", "build_id", "type", "version", "payload"))
Expect(err).ToNot(HaveOccurred())

numToInsert := 250_000
ids := make([]int, numToInsert)
for i := 0; i < numToInsert; i++ {
_, err = stmt.Exec(i, i, i, i, "")
Expect(err).ToNot(HaveOccurred())
ids[i] = i
}

_, err = stmt.Exec()
Expect(err).ToNot(HaveOccurred())

err = stmt.Close()
Expect(err).ToNot(HaveOccurred())

err = txn.Commit()
Expect(err).ToNot(HaveOccurred())

var count int
err = dbConn.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM pipeline_build_events_%d", pipeline.ID())).Scan(&count)
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(numToInsert))

err = pipeline.DeleteBuildEventsByBuildIDs(ids)
Expect(err).ToNot(HaveOccurred())

err = dbConn.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM pipeline_build_events_%d", pipeline.ID())).Scan(&count)
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(0))
})
})

Describe("Jobs", func() {
Expand Down