Skip to content

Commit 5ca6e1a

Browse files
authored
migrations: Add IndexStatus to store (sourcegraph#30270)
1 parent 4c35739 commit 5ca6e1a

File tree

3 files changed

+256
-0
lines changed

3 files changed

+256
-0
lines changed

internal/database/migration/store/observability.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
type Operations struct {
1111
down *observation.Operation
1212
ensureSchemaTable *observation.Operation
13+
indexStatus *observation.Operation
1314
lock *observation.Operation
1415
tryLock *observation.Operation
1516
up *observation.Operation
@@ -35,6 +36,7 @@ func NewOperations(observationContext *observation.Context) *Operations {
3536
return &Operations{
3637
down: op("Down"),
3738
ensureSchemaTable: op("EnsureSchemaTable"),
39+
indexStatus: op("IndexStatus"),
3840
lock: op("Lock"),
3941
tryLock: op("TryLock"),
4042
up: op("Up"),

internal/database/migration/store/store.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,45 @@ type Store struct {
2323
operations *Operations
2424
}
2525

26+
// IndexStatus describes the state of an index. Is{Valid,Ready,Live} is taken
27+
// from the `pg_index` system table. If the index is currently being created,
28+
// then the remaining reference fields will be populated describing the index
29+
// creation progress.
30+
type IndexStatus struct {
31+
IsValid bool
32+
IsReady bool
33+
IsLive bool
34+
Phase *string
35+
LockersDone *int
36+
LockersTotal *int
37+
BlocksDone *int
38+
BlocksTotal *int
39+
TuplesDone *int
40+
TuplesTotal *int
41+
}
42+
43+
// CreateIndexConcurrentlyPhases is an ordered list of phases that occur during
44+
// a CREATE INDEX CONCURRENTLY operation. The phase of an ongoing operation can
45+
// found in the system view `view pg_stat_progress_create_index` (since PG 12).
46+
//
47+
// If the phase value found in the system view may not match these values exactly
48+
// and may only indicate a prefix. The phase may have more specific information
49+
// following the initial phase description. Do not compare phase values exactly.
50+
//
51+
// See https://www.postgresql.org/docs/12/progress-reporting.html#CREATE-INDEX-PROGRESS-REPORTING.
52+
var CreateIndexConcurrentlyPhases = []string{
53+
"initializing",
54+
"waiting for writers before build",
55+
"building index",
56+
"waiting for writers before validation",
57+
"index validation: scanning index",
58+
"index validation: sorting tuples",
59+
"index validation: scanning table",
60+
"waiting for old snapshots",
61+
"waiting for readers before marking dead",
62+
"waiting for readers before dropping",
63+
}
64+
2665
func NewWithDB(db dbutil.DB, migrationsTable string, operations *Operations) *Store {
2766
return &Store{
2867
Store: basestore.NewWithDB(db, sql.TxOptions{}),
@@ -198,6 +237,36 @@ func (s *Store) Down(ctx context.Context, definition definition.Definition) (err
198237
return nil
199238
}
200239

240+
// IndexStatus returns an object describing the current validity status and creation progress of the
241+
// index with the given name. If the index does not exist, a false-valued flag is returned.
242+
func (s *Store) IndexStatus(ctx context.Context, tableName, indexName string) (_ IndexStatus, _ bool, err error) {
243+
ctx, endObservation := s.operations.indexStatus.With(ctx, &err, observation.Args{})
244+
defer endObservation(1, observation.Args{})
245+
246+
return scanFirstIndexStatus(s.Query(ctx, sqlf.Sprintf(indexStatusQuery, tableName, indexName)))
247+
}
248+
249+
const indexStatusQuery = `
250+
-- source: internal/database/migration/store/store.go:IndexStatus
251+
SELECT
252+
pi.indisvalid,
253+
pi.indisready,
254+
pi.indislive,
255+
p.phase,
256+
p.lockers_total,
257+
p.lockers_done,
258+
p.blocks_total,
259+
p.blocks_done,
260+
p.tuples_total,
261+
p.tuples_done
262+
FROM pg_stat_all_indexes ai
263+
JOIN pg_index pi ON pi.indexrelid = ai.indexrelid
264+
LEFT JOIN pg_stat_progress_create_index p ON p.relid = ai.relid AND p.index_relid = ai.indexrelid
265+
WHERE
266+
ai.relname = %s AND
267+
ai.indexrelname = %s
268+
`
269+
201270
func (s *Store) runMigrationQuery(ctx context.Context, definitionVersion int, up bool, query *sqlf.Query) (err error) {
202271
targetVersion := definitionVersion
203272
expectedCurrentVersion := definitionVersion - 1
@@ -329,3 +398,32 @@ func scanMigrationLogs(rows *sql.Rows, queryErr error) (_ []migrationLog, err er
329398

330399
return logs, nil
331400
}
401+
402+
// scanFirstIndexStatus scans a slice of index status objects from the return value of `*Store.query`.
403+
func scanFirstIndexStatus(rows *sql.Rows, queryErr error) (status IndexStatus, _ bool, err error) {
404+
if queryErr != nil {
405+
return IndexStatus{}, false, queryErr
406+
}
407+
defer func() { err = basestore.CloseRows(rows, err) }()
408+
409+
if rows.Next() {
410+
if err := rows.Scan(
411+
&status.IsValid,
412+
&status.IsReady,
413+
&status.IsLive,
414+
&status.Phase,
415+
&status.LockersDone,
416+
&status.LockersTotal,
417+
&status.BlocksDone,
418+
&status.BlocksTotal,
419+
&status.TuplesDone,
420+
&status.TuplesTotal,
421+
); err != nil {
422+
return IndexStatus{}, false, err
423+
}
424+
425+
return status, true, nil
426+
}
427+
428+
return IndexStatus{}, false, nil
429+
}

internal/database/migration/store/store_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ package store
22

33
import (
44
"context"
5+
"database/sql"
56
"strings"
67
"testing"
8+
"time"
79

810
"github.com/google/go-cmp/cmp"
911
"github.com/keegancsmith/sqlf"
12+
"golang.org/x/sync/errgroup"
1013

1114
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
1215
"github.com/sourcegraph/sourcegraph/internal/database/dbtest"
@@ -425,6 +428,159 @@ func TestDown(t *testing.T) {
425428
})
426429
}
427430

431+
func TestIndexStatus(t *testing.T) {
432+
db := dbtest.NewDB(t)
433+
store := testStore(db)
434+
ctx := context.Background()
435+
436+
if _, err := db.ExecContext(ctx, "CREATE TABLE tbl (id text, name text);"); err != nil {
437+
t.Fatalf("unexpected error: %s", err)
438+
}
439+
440+
// Index does not (yet) exist
441+
if _, ok, err := store.IndexStatus(ctx, "tbl", "idx"); err != nil {
442+
t.Fatalf("unexpected error: %s", err)
443+
} else if ok {
444+
t.Fatalf("unexpected index status")
445+
}
446+
447+
// Wrap context in a small timeout; we do tight for-loops here to determine
448+
// when we can continue on to/unblock the next operation, but none of the
449+
// steps should take any significant time.
450+
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
451+
group, groupCtx := errgroup.WithContext(ctx)
452+
defer cancel()
453+
454+
whileEmpty := func(ctx context.Context, conn dbutil.DB, query string) error {
455+
for {
456+
rows, err := conn.QueryContext(ctx, query)
457+
if err != nil {
458+
return err
459+
}
460+
461+
lockVisible := rows.Next()
462+
463+
if err := basestore.CloseRows(rows, nil); err != nil {
464+
return err
465+
}
466+
467+
if lockVisible {
468+
return nil
469+
}
470+
}
471+
}
472+
473+
// Create separate connections to precise control contention of resources
474+
// so we can examine what this method returns while an index is being created.
475+
476+
conns := make([]*sql.Conn, 3)
477+
for i := 0; i < 3; i++ {
478+
conn, err := db.Conn(ctx)
479+
if err != nil {
480+
t.Fatalf("failed to open new connection: %s", err)
481+
}
482+
t.Cleanup(func() { conn.Close() })
483+
484+
conns[i] = conn
485+
}
486+
connA, connB, connC := conns[0], conns[1], conns[2]
487+
488+
lockQuery := `SELECT pg_advisory_lock(10, 10)`
489+
unlockQuery := `SELECT pg_advisory_unlock(10, 10)`
490+
createIndexQuery := `CREATE INDEX CONCURRENTLY idx ON tbl(id)`
491+
492+
// Session A
493+
// Successfully take and hold advisory lock
494+
if _, err := connA.ExecContext(ctx, lockQuery); err != nil {
495+
t.Fatalf("unexpected error: %s", err)
496+
}
497+
498+
// Session B
499+
// Try to take advisory lock; blocked by Session A
500+
group.Go(func() error {
501+
_, err := connB.ExecContext(groupCtx, lockQuery)
502+
return err
503+
})
504+
505+
// Session C
506+
// try to create index concurrently; blocked by session B waiting on session A
507+
group.Go(func() error {
508+
// Wait until we can see Session B's lock before attempting to create index
509+
if err := whileEmpty(groupCtx, connC, "SELECT 1 FROM pg_locks WHERE locktype = 'advisory' AND NOT granted"); err != nil {
510+
t.Fatalf("unexpected error: %s", err)
511+
}
512+
513+
_, err := connC.ExecContext(groupCtx, createIndexQuery)
514+
return err
515+
})
516+
517+
// Wait until we can see Session C's lock before querying index status
518+
if err := whileEmpty(ctx, db, "SELECT 1 FROM pg_locks WHERE locktype = 'relation' AND mode = 'ShareUpdateExclusiveLock'"); err != nil {
519+
t.Fatalf("unexpected error: %s", err)
520+
}
521+
522+
// "waiting for old snapshots" will be the phase that is blocked by the concurrent
523+
// sessions holding advisory locks. We may happen to hit one of the earlier phases
524+
// if we're quick enough, so we'll keep polling progress until we hit the target.
525+
blockingPhase := "waiting for old snapshots"
526+
nonblockingPhasePrefixes := make([]string, 0, len(CreateIndexConcurrentlyPhases))
527+
for _, prefix := range CreateIndexConcurrentlyPhases {
528+
if prefix == blockingPhase {
529+
break
530+
}
531+
532+
nonblockingPhasePrefixes = append(nonblockingPhasePrefixes, prefix)
533+
}
534+
compareWithPrefix := func(value, prefix string) bool {
535+
return value == prefix || strings.HasPrefix(value, prefix+":")
536+
}
537+
538+
retryLoop:
539+
for {
540+
if status, ok, err := store.IndexStatus(ctx, "tbl", "idx"); err != nil {
541+
t.Fatalf("unexpected error: %s", err)
542+
} else if !ok {
543+
t.Fatalf("expected index status")
544+
} else if status.Phase == nil {
545+
t.Fatalf("unexpected phase. want=%q have=nil", blockingPhase)
546+
} else if *status.Phase == blockingPhase {
547+
break
548+
} else {
549+
for _, prefix := range nonblockingPhasePrefixes {
550+
if compareWithPrefix(*status.Phase, prefix) {
551+
continue retryLoop
552+
}
553+
}
554+
555+
t.Fatalf("unexpected phase. want=%q have=%q", blockingPhase, *status.Phase)
556+
}
557+
}
558+
559+
// Session A
560+
// Unlock, unblocking both Session B and Session C
561+
if _, err := connA.ExecContext(ctx, unlockQuery); err != nil {
562+
t.Fatalf("unexpected error: %s", err)
563+
}
564+
565+
// Wait for index creation to complete
566+
if err := group.Wait(); err != nil {
567+
t.Fatalf("unexpected error: %s", err)
568+
}
569+
570+
if status, ok, err := store.IndexStatus(ctx, "tbl", "idx"); err != nil {
571+
t.Fatalf("unexpected error: %s", err)
572+
} else if !ok {
573+
t.Fatalf("expected index status")
574+
} else {
575+
if !status.IsValid {
576+
t.Fatalf("unexpected isvalid. want=%v have=%v", true, status.IsValid)
577+
}
578+
if status.Phase != nil {
579+
t.Fatalf("unexpected phase. want=%v have=%v", nil, status.Phase)
580+
}
581+
}
582+
}
583+
428584
func testStore(db dbutil.DB) *Store {
429585
return NewWithDB(db, "test_migrations_table", NewOperations(&observation.TestContext))
430586
}

0 commit comments

Comments
 (0)