Skip to content

Commit

Permalink
internal/postgres: update imported-by counts in chunks
Browse files Browse the repository at this point in the history
If too many rows change imported-by counts, then the
/update-imported-by-count endpoint times out after 30 minutes, and
nothing gets inserted because it runs as one transaction.

Instead, break it into multiple smaller transactions. Since we only
update changed counts, then as long as one transaction completes we
have made some counts identical, so we will make progress.

For golang/go#47555

Change-Id: I546aaabcc5e0f99d71efe38748475274871382c4
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/341249
Reviewed-by: Jamal Carvalho <jamal@golang.org>
Reviewed-by: Julie Qiu <julie@golang.org>
Trust: Julie Qiu <julie@golang.org>
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Julie Qiu <julie@golang.org>
  • Loading branch information
jba committed Aug 16, 2021
1 parent 1895d95 commit 24920d8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 9 deletions.
36 changes: 28 additions & 8 deletions internal/postgres/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,16 +845,27 @@ func (db *DB) UpdateSearchDocumentsImportedByCount(ctx context.Context) (nUpdate
return db.UpdateSearchDocumentsImportedByCountWithCounts(ctx, changedCounts)
}

// How many imported-by counts to update at a time.
// A variable for testing.
var countBatchSize = 20_000

func (db *DB) UpdateSearchDocumentsImportedByCountWithCounts(ctx context.Context, counts map[string]int) (nUpdated int64, err error) {
defer derrors.WrapStack(&err, "UpdateSearchDocumentsImportedByCountWithCounts")
err = db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
if err := insertImportedByCounts(ctx, tx, counts); err != nil {
for len(counts) > 0 {
var nu int64
err := db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
if err := insertImportedByCounts(ctx, tx, counts, countBatchSize); err != nil {
return err
}
nu, err = updateImportedByCounts(ctx, tx)
return err
})
if err != nil {
return nUpdated, err
}
nUpdated, err = updateImportedByCounts(ctx, tx)
return err
})
return nUpdated, err
nUpdated += nu
}
return nUpdated, nil
}

// getSearchPackages returns the set of package paths that are in the search_documents table,
Expand Down Expand Up @@ -916,7 +927,10 @@ func (db *DB) computeImportedByCounts(ctx context.Context, curCounts map[string]
return newCounts, nil
}

func insertImportedByCounts(ctx context.Context, db *database.DB, counts map[string]int) (err error) {
// insertImportedByCounts creates a temporary table and inserts at most limit
// rows into it, where each row is a key and value from the counts map. The
// inserted keys are deleted from counts.
func insertImportedByCounts(ctx context.Context, db *database.DB, counts map[string]int, limit int) (err error) {
defer derrors.WrapStack(&err, "insertImportedByCounts(ctx, db, counts)")

const createTableQuery = `
Expand All @@ -928,9 +942,15 @@ func insertImportedByCounts(ctx context.Context, db *database.DB, counts map[str
if _, err := db.Exec(ctx, createTableQuery); err != nil {
return fmt.Errorf("CREATE TABLE: %v", err)
}
values := make([]interface{}, 0, 2*len(counts))
var values []interface{}
i := 0
for p, c := range counts {
if i >= limit {
break
}
values = append(values, p, c)
delete(counts, p)
i++
}
columns := []string{"package_path", "imported_by_count"}
return db.BulkInsert(ctx, "computed_imported_by_counts", columns, values, "")
Expand Down
20 changes: 19 additions & 1 deletion internal/postgres/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ func TestUpsertSearchDocumentLongerModulePath(t *testing.T) {
}

func TestUpdateSearchDocumentsImportedByCount(t *testing.T) {
t.Parallel()
// Dont' run in parallel because it changes countBatchSize.
ctx := context.Background()

pkgPath := func(m *internal.Module) string { return m.Packages()[0].Path }
Expand Down Expand Up @@ -1189,6 +1189,24 @@ func TestUpdateSearchDocumentsImportedByCount(t *testing.T) {
updateImportedByCount(testDB)
validateImportedByCountAndGetSearchDocument(t, testDB, "mod.com/B/B", 1)
})
t.Run("multiple", func(t *testing.T) {
defer func(old int) { countBatchSize = old }(countBatchSize)
countBatchSize = 1

testDB, release := acquire(t)
defer release()

// Two modules with importers.
mA := insertPackageVersion(t, testDB, "A", "v1.0.0", nil)
mB := insertPackageVersion(t, testDB, "B", "v1.0.0", nil)
insertPackageVersion(t, testDB, "C", "v1.0.0", []string{"A"})
insertPackageVersion(t, testDB, "D", "v1.0.0", []string{"A"})
insertPackageVersion(t, testDB, "E", "v1.0.0", []string{"B"})

updateImportedByCount(testDB)
_ = validateImportedByCountAndGetSearchDocument(t, testDB, pkgPath(mA), 2)
_ = validateImportedByCountAndGetSearchDocument(t, testDB, pkgPath(mB), 1)
})
}

func TestGetPackagesForSearchDocumentUpsert(t *testing.T) {
Expand Down

0 comments on commit 24920d8

Please sign in to comment.