Skip to content

Commit 35034a8

Browse files
authored
migrations: Backfill migration_logs content on startup if empty (sourcegraph#30511)
1 parent 7d8d927 commit 35034a8

File tree

2 files changed

+158
-21
lines changed

2 files changed

+158
-21
lines changed

internal/database/migration/store/store.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ func (s *Store) Transact(ctx context.Context) (*Store, error) {
5454

5555
const currentMigrationLogSchemaVersion = 1
5656

57+
// EnsureSchemaTable creates the bookeeping tables required to track this schema
58+
// if they do not already exist. If old versions of the tables exist, this method
59+
// will attempt to update them in a backward-compatible manner.
5760
func (s *Store) EnsureSchemaTable(ctx context.Context) (err error) {
5861
ctx, endObservation := s.operations.ensureSchemaTable.With(ctx, &err, observation.Args{})
5962
defer endObservation(1, observation.Args{})
@@ -73,6 +76,39 @@ func (s *Store) EnsureSchemaTable(ctx context.Context) (err error) {
7376
sqlf.Sprintf(`ALTER TABLE migration_logs ADD COLUMN IF NOT EXISTS error_message text`),
7477
}
7578

79+
minMigrationVersions := map[string]int{
80+
"schema_migrations": 1528395834,
81+
"codeintel_schema_migrations": 1000000015,
82+
"codeinsights_schema_migrations": 1000000000,
83+
"test_migrations_table_backfill": 1000000000, // used in tests
84+
}
85+
if minMigrationVersion, ok := minMigrationVersions[s.schemaName]; ok {
86+
queries = append(queries, sqlf.Sprintf(`
87+
WITH
88+
schema_version AS (SELECT * FROM %s LIMIT 1),
89+
min_log AS (SELECT MIN(version) AS version FROM migration_logs WHERE schema = %s),
90+
target_version AS (SELECT MIN(version) AS version FROM (SELECT version FROM schema_version UNION SELECT version - 1 FROM min_log) s)
91+
INSERT INTO migration_logs (
92+
migration_logs_schema_version,
93+
schema,
94+
version,
95+
up,
96+
success,
97+
started_at,
98+
finished_at
99+
)
100+
SELECT %s, %s, version, true, true, NOW(), NOW()
101+
FROM generate_series(%s, (SELECT version FROM target_version)) version
102+
WHERE NOT (SELECT dirty FROM schema_version)
103+
`,
104+
quote(s.schemaName),
105+
s.schemaName,
106+
currentMigrationLogSchemaVersion,
107+
s.schemaName,
108+
minMigrationVersion,
109+
))
110+
}
111+
76112
tx, err := s.Transact(ctx)
77113
if err != nil {
78114
return err

internal/database/migration/store/store_test.go

Lines changed: 122 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,126 @@ func TestEnsureSchemaTable(t *testing.T) {
2424
store := testStore(db)
2525
ctx := context.Background()
2626

27-
if err := store.Exec(ctx, sqlf.Sprintf("SELECT * FROM test_migrations_table")); err == nil {
28-
t.Fatalf("expected query to fail due to missing schema table")
27+
tableNames := []string{
28+
"migration_logs",
29+
defaultTestTableName,
2930
}
3031

31-
if err := store.Exec(ctx, sqlf.Sprintf("SELECT * FROM migration_logs")); err == nil {
32-
t.Fatalf("expected query to fail due to missing logs table")
32+
// Test initially missing tables
33+
for _, tableName := range tableNames {
34+
if err := store.Exec(ctx, sqlf.Sprintf("SELECT * FROM %s", quote(tableName))); err == nil {
35+
t.Fatalf("expected query to fail due to missing table %q", tableName)
36+
}
3337
}
3438

3539
if err := store.EnsureSchemaTable(ctx); err != nil {
3640
t.Fatalf("unexpected error ensuring schema table exists: %s", err)
3741
}
3842

39-
if err := store.Exec(ctx, sqlf.Sprintf("SELECT * FROM test_migrations_table")); err != nil {
40-
t.Fatalf("unexpected error querying version table: %s", err)
41-
}
42-
43-
if err := store.Exec(ctx, sqlf.Sprintf("SELECT * FROM migration_logs")); err != nil {
44-
t.Fatalf("unexpected error querying logs table: %s", err)
43+
// Test tables were created
44+
for _, tableName := range tableNames {
45+
if err := store.Exec(ctx, sqlf.Sprintf("SELECT * FROM %s", quote(tableName))); err != nil {
46+
t.Fatalf("unexpected error querying %q: %s", tableName, err)
47+
}
4548
}
4649

50+
// Test idempotency
4751
if err := store.EnsureSchemaTable(ctx); err != nil {
4852
t.Fatalf("expected method to be idempotent, got error: %s", err)
4953
}
5054
}
5155

56+
func TestEnsureTableBackfills(t *testing.T) {
57+
t.Run("fresh database", func(t *testing.T) {
58+
db := dbtest.NewDB(t)
59+
store := testStore(db)
60+
ctx := context.Background()
61+
62+
if err := store.EnsureSchemaTable(ctx); err != nil {
63+
t.Fatalf("unexpected error ensuring schema table exists: %s", err)
64+
}
65+
66+
assertLogs(t, ctx, store, nil)
67+
})
68+
69+
k := 5
70+
n := k * 5
71+
tableName := "test_migrations_table_backfill"
72+
73+
expectedLogs := make([]migrationLog, 0, n)
74+
for i := 0; i <= n; i++ {
75+
s := true
76+
expectedLogs = append(expectedLogs, migrationLog{Schema: tableName, Version: 1000000000 + i, Up: true, Success: &s})
77+
}
78+
79+
runTest := func(k int) func(t *testing.T) {
80+
return func(t *testing.T) {
81+
db := dbtest.NewDB(t)
82+
store := NewWithDB(db, tableName, NewOperations(&observation.TestContext))
83+
ctx := context.Background()
84+
85+
if err := store.Exec(ctx, sqlf.Sprintf(`CREATE TABLE %s(version bigint NOT NULL PRIMARY KEY, dirty boolean NOT NULL)`, quote(tableName))); err != nil {
86+
t.Fatalf("unexpected error: %s", err)
87+
}
88+
if err := store.Exec(ctx, sqlf.Sprintf(`INSERT INTO %s VALUES (%s, false)`, quote(tableName), 1000000000+n)); err != nil {
89+
t.Fatalf("unexpected error: %s", err)
90+
}
91+
92+
if err := store.Exec(ctx, sqlf.Sprintf(`
93+
CREATE TABLE migration_logs(
94+
id SERIAL PRIMARY KEY,
95+
migration_logs_schema_version integer NOT NULL,
96+
schema text NOT NULL,
97+
version integer NOT NULL,
98+
up bool NOT NULL,
99+
started_at timestamptz NOT NULL,
100+
finished_at timestamptz,
101+
success boolean,
102+
error_message text
103+
)
104+
`)); err != nil {
105+
t.Fatalf("unexpected error: %s", err)
106+
}
107+
108+
for i := 0; i < k; i++ {
109+
if err := store.Exec(ctx, sqlf.Sprintf(`
110+
INSERT INTO migration_logs(
111+
migration_logs_schema_version,
112+
schema,
113+
version,
114+
up,
115+
started_at,
116+
finished_at,
117+
success
118+
) VALUES (
119+
1,
120+
%s,
121+
%s,
122+
true,
123+
NOW(),
124+
NOW(),
125+
true
126+
)`,
127+
tableName,
128+
1000000000+n-i,
129+
)); err != nil {
130+
t.Fatalf("unexpected error: %s", err)
131+
}
132+
}
133+
134+
if err := store.EnsureSchemaTable(ctx); err != nil {
135+
t.Fatalf("unexpected error ensuring schema table exists: %s", err)
136+
}
137+
138+
assertLogs(t, ctx, store, expectedLogs)
139+
}
140+
}
141+
142+
t.Run("full insert", runTest(0)) // no migration logs
143+
t.Run("partial insert", runTest(k)) // k migration logs
144+
t.Run("nothing to insert", runTest(n)) // n migration logs
145+
}
146+
52147
func TestVersion(t *testing.T) {
53148
db := dbtest.NewDB(t)
54149
store := testStore(db)
@@ -77,10 +172,10 @@ func TestVersion(t *testing.T) {
77172

78173
for _, testCase := range testCases {
79174
t.Run(testCase.name, func(t *testing.T) {
80-
if err := store.Exec(ctx, sqlf.Sprintf(`DELETE FROM test_migrations_table`)); err != nil {
175+
if err := store.Exec(ctx, sqlf.Sprintf(`DELETE FROM %s`, quote(defaultTestTableName))); err != nil {
81176
t.Fatalf("unexpected error clearing data: %s", err)
82177
}
83-
if err := store.Exec(ctx, sqlf.Sprintf(`INSERT INTO test_migrations_table VALUES (%s, %s)`, testCase.version, testCase.dirty)); err != nil {
178+
if err := store.Exec(ctx, sqlf.Sprintf(`INSERT INTO %s VALUES (%s, %s)`, quote(defaultTestTableName), testCase.version, testCase.dirty)); err != nil {
84179
t.Fatalf("unexpected error inserting data: %s", err)
85180
}
86181

@@ -154,7 +249,7 @@ func TestVersions(t *testing.T) {
154249
error_message
155250
) VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s)`,
156251
currentMigrationLogSchemaVersion,
157-
"test_migrations_table",
252+
defaultTestTableName,
158253
migrationLog.version,
159254
migrationLog.up,
160255
migrationLog.success,
@@ -227,7 +322,7 @@ func TestWrappedUp(t *testing.T) {
227322
if err := store.EnsureSchemaTable(ctx); err != nil {
228323
t.Fatalf("unexpected error ensuring schema table exists: %s", err)
229324
}
230-
if err := store.Exec(ctx, sqlf.Sprintf(`INSERT INTO test_migrations_table VALUES (15, false)`)); err != nil {
325+
if err := store.Exec(ctx, sqlf.Sprintf(`INSERT INTO %s VALUES (15, false)`, quote(defaultTestTableName))); err != nil {
231326
t.Fatalf("unexpected error setting initial version: %s", err)
232327
}
233328

@@ -268,7 +363,7 @@ func TestWrappedUp(t *testing.T) {
268363

269364
assertLogs(t, ctx, store, []migrationLog{
270365
{
271-
Schema: "test_migrations_table",
366+
Schema: defaultTestTableName,
272367
Version: 16,
273368
Up: true,
274369
Success: boolPtr(true),
@@ -331,7 +426,7 @@ func TestWrappedUp(t *testing.T) {
331426

332427
assertLogs(t, ctx, store, []migrationLog{
333428
{
334-
Schema: "test_migrations_table",
429+
Schema: defaultTestTableName,
335430
Version: 17,
336431
Up: true,
337432
Success: boolPtr(false),
@@ -374,7 +469,7 @@ func TestWrappedDown(t *testing.T) {
374469
if err := store.EnsureSchemaTable(ctx); err != nil {
375470
t.Fatalf("unexpected error ensuring schema table exists: %s", err)
376471
}
377-
if err := store.Exec(ctx, sqlf.Sprintf(`INSERT INTO test_migrations_table VALUES (14, false)`)); err != nil {
472+
if err := store.Exec(ctx, sqlf.Sprintf(`INSERT INTO %s VALUES (14, false)`, quote(defaultTestTableName))); err != nil {
378473
t.Fatalf("unexpected error setting initial version: %s", err)
379474
}
380475
if err := store.Exec(ctx, sqlf.Sprintf(`
@@ -429,7 +524,7 @@ func TestWrappedDown(t *testing.T) {
429524

430525
assertLogs(t, ctx, store, []migrationLog{
431526
{
432-
Schema: "test_migrations_table",
527+
Schema: defaultTestTableName,
433528
Version: 14,
434529
Up: false,
435530
Success: boolPtr(true),
@@ -487,7 +582,7 @@ func TestWrappedDown(t *testing.T) {
487582

488583
assertLogs(t, ctx, store, []migrationLog{
489584
{
490-
Schema: "test_migrations_table",
585+
Schema: defaultTestTableName,
491586
Version: 13,
492587
Up: false,
493588
Success: boolPtr(false),
@@ -675,8 +770,14 @@ retryLoop:
675770
}
676771
}
677772

773+
const defaultTestTableName = "test_migrations_table"
774+
678775
func testStore(db dbutil.DB) *Store {
679-
return NewWithDB(db, "test_migrations_table", NewOperations(&observation.TestContext))
776+
return testStoreWithName(db, defaultTestTableName)
777+
}
778+
779+
func testStoreWithName(db dbutil.DB, name string) *Store {
780+
return NewWithDB(db, name, NewOperations(&observation.TestContext))
680781
}
681782

682783
func strPtr(v string) *string {
@@ -698,7 +799,7 @@ func truncateLogs(t *testing.T, ctx context.Context, store *Store) {
698799
func assertLogs(t *testing.T, ctx context.Context, store *Store, expectedLogs []migrationLog) {
699800
t.Helper()
700801

701-
logs, err := scanMigrationLogs(store.Query(ctx, sqlf.Sprintf(`SELECT schema, version, up, success FROM migration_logs ORDER BY started_at`)))
802+
logs, err := scanMigrationLogs(store.Query(ctx, sqlf.Sprintf(`SELECT schema, version, up, success FROM migration_logs ORDER BY version`)))
702803
if err != nil {
703804
t.Fatalf("unexpected error scanning logs: %s", err)
704805
}

0 commit comments

Comments
 (0)