@@ -19,8 +19,8 @@ import (
19
19
20
20
type Store struct {
21
21
* basestore.Store
22
- migrationsTable string
23
- operations * Operations
22
+ schemaName string
23
+ operations * Operations
24
24
}
25
25
26
26
// IndexStatus describes the state of an index. Is{Valid,Ready,Live} is taken
@@ -64,17 +64,17 @@ var CreateIndexConcurrentlyPhases = []string{
64
64
65
65
func NewWithDB (db dbutil.DB , migrationsTable string , operations * Operations ) * Store {
66
66
return & Store {
67
- Store : basestore .NewWithDB (db , sql.TxOptions {}),
68
- migrationsTable : migrationsTable ,
69
- operations : operations ,
67
+ Store : basestore .NewWithDB (db , sql.TxOptions {}),
68
+ schemaName : migrationsTable ,
69
+ operations : operations ,
70
70
}
71
71
}
72
72
73
73
func (s * Store ) With (other basestore.ShareableStore ) * Store {
74
74
return & Store {
75
- Store : s .Store .With (other ),
76
- migrationsTable : s .migrationsTable ,
77
- operations : s .operations ,
75
+ Store : s .Store .With (other ),
76
+ schemaName : s .schemaName ,
77
+ operations : s .operations ,
78
78
}
79
79
}
80
80
@@ -85,9 +85,9 @@ func (s *Store) Transact(ctx context.Context) (*Store, error) {
85
85
}
86
86
87
87
return & Store {
88
- Store : txBase ,
89
- migrationsTable : s .migrationsTable ,
90
- operations : s .operations ,
88
+ Store : txBase ,
89
+ schemaName : s .schemaName ,
90
+ operations : s .operations ,
91
91
}, nil
92
92
}
93
93
@@ -98,8 +98,8 @@ func (s *Store) EnsureSchemaTable(ctx context.Context) (err error) {
98
98
defer endObservation (1 , observation.Args {})
99
99
100
100
queries := []* sqlf.Query {
101
- sqlf .Sprintf (`CREATE TABLE IF NOT EXISTS %s(version bigint NOT NULL PRIMARY KEY)` , quote (s .migrationsTable )),
102
- sqlf .Sprintf (`ALTER TABLE %s ADD COLUMN IF NOT EXISTS dirty boolean NOT NULL` , quote (s .migrationsTable )),
101
+ sqlf .Sprintf (`CREATE TABLE IF NOT EXISTS %s(version bigint NOT NULL PRIMARY KEY)` , quote (s .schemaName )),
102
+ sqlf .Sprintf (`ALTER TABLE %s ADD COLUMN IF NOT EXISTS dirty boolean NOT NULL` , quote (s .schemaName )),
103
103
104
104
sqlf .Sprintf (`CREATE TABLE IF NOT EXISTS migration_logs(id SERIAL PRIMARY KEY)` ),
105
105
sqlf .Sprintf (`ALTER TABLE migration_logs ADD COLUMN IF NOT EXISTS migration_logs_schema_version integer NOT NULL` ),
@@ -131,7 +131,7 @@ func (s *Store) Version(ctx context.Context) (version int, dirty bool, ok bool,
131
131
ctx , endObservation := s .operations .version .With (ctx , & err , observation.Args {})
132
132
defer endObservation (1 , observation.Args {})
133
133
134
- rows , err := s .Query (ctx , sqlf .Sprintf (`SELECT version, dirty FROM %s` , quote (s .migrationsTable )))
134
+ rows , err := s .Query (ctx , sqlf .Sprintf (`SELECT version, dirty FROM %s` , quote (s .schemaName )))
135
135
if err != nil {
136
136
return 0 , false , false , err
137
137
}
@@ -212,7 +212,7 @@ func (s *Store) TryLock(ctx context.Context) (_ bool, _ func(err error) error, e
212
212
}
213
213
214
214
func (s * Store ) lockKey () int32 {
215
- return locker .StringKey (fmt .Sprintf ("%s:migrations" , s .migrationsTable ))
215
+ return locker .StringKey (fmt .Sprintf ("%s:migrations" , s .schemaName ))
216
216
}
217
217
218
218
// Up runs the given definition's up query.
@@ -276,16 +276,21 @@ func (s *Store) WithMigrationLog(ctx context.Context, definition definition.Defi
276
276
expectedCurrentVersion = definitionVersion
277
277
}
278
278
279
- logID , err := s .setVersion (ctx , up , expectedCurrentVersion , targetVersion , definitionVersion )
279
+ logID , err := s .createMigrationLog (ctx , up , expectedCurrentVersion , targetVersion , definitionVersion )
280
280
if err != nil {
281
281
return err
282
282
}
283
283
284
+ defer func () {
285
+ if err == nil {
286
+ err = s .Exec (ctx , sqlf .Sprintf (`UPDATE %s SET dirty = false` , quote (s .schemaName )))
287
+ }
288
+ }()
284
289
defer func () {
285
290
if execErr := s .Exec (ctx , sqlf .Sprintf (
286
291
`UPDATE migration_logs SET finished_at = NOW(), success = %s, error_message = %s WHERE id = %d` ,
287
292
err == nil ,
288
- strPtr (err ),
293
+ errMsgPtr (err ),
289
294
logID ,
290
295
)); execErr != nil {
291
296
err = multierror .Append (err , execErr )
@@ -296,14 +301,10 @@ func (s *Store) WithMigrationLog(ctx context.Context, definition definition.Defi
296
301
return err
297
302
}
298
303
299
- if err := s .Exec (ctx , sqlf .Sprintf (`UPDATE %s SET dirty=false` , quote (s .migrationsTable ))); err != nil {
300
- return err
301
- }
302
-
303
304
return nil
304
305
}
305
306
306
- func (s * Store ) setVersion (ctx context.Context , up bool , expectedCurrentVersion , targetVersion , sourceVersion int ) (_ int , err error ) {
307
+ func (s * Store ) createMigrationLog (ctx context.Context , up bool , expectedCurrentVersion , targetVersion , sourceVersion int ) (_ int , err error ) {
307
308
tx , err := s .Transact (ctx )
308
309
if err != nil {
309
310
return 0 , err
@@ -314,7 +315,6 @@ func (s *Store) setVersion(ctx context.Context, up bool, expectedCurrentVersion,
314
315
cta := "This condition should not be reachable by normal use of the migration store via the runner and indicates a bug. Please report this issue."
315
316
return errors .Errorf (description + "\n \n " + cta + "\n " , args ... )
316
317
}
317
-
318
318
if currentVersion , dirty , ok , err := tx .Version (ctx ); err != nil {
319
319
return 0 , err
320
320
} else if dirty {
@@ -324,12 +324,12 @@ func (s *Store) setVersion(ctx context.Context, up bool, expectedCurrentVersion,
324
324
return 0 , assertionFailure ("expected schema to have version %d, but has version %d\n " , expectedCurrentVersion , currentVersion )
325
325
}
326
326
327
- if err := tx .Exec (ctx , sqlf .Sprintf (`DELETE FROM %s` , quote (s .migrationsTable ))); err != nil {
327
+ if err := tx .Exec (ctx , sqlf .Sprintf (`DELETE FROM %s` , quote (s .schemaName ))); err != nil {
328
328
return 0 , err
329
329
}
330
330
}
331
331
332
- if err := tx .Exec (ctx , sqlf .Sprintf (`INSERT INTO %s (version, dirty) VALUES (%s, true)` , quote (s .migrationsTable ), targetVersion )); err != nil {
332
+ if err := tx .Exec (ctx , sqlf .Sprintf (`INSERT INTO %s (version, dirty) VALUES (%s, true)` , quote (s .schemaName ), targetVersion )); err != nil {
333
333
return 0 , err
334
334
}
335
335
@@ -345,7 +345,7 @@ func (s *Store) setVersion(ctx context.Context, up bool, expectedCurrentVersion,
345
345
RETURNING id
346
346
` ,
347
347
currentMigrationLogSchemaVersion ,
348
- s .migrationsTable ,
348
+ s .schemaName ,
349
349
sourceVersion ,
350
350
up ,
351
351
)))
@@ -358,7 +358,7 @@ func (s *Store) setVersion(ctx context.Context, up bool, expectedCurrentVersion,
358
358
359
359
var quote = sqlf .Sprintf
360
360
361
- func strPtr (err error ) * string {
361
+ func errMsgPtr (err error ) * string {
362
362
if err == nil {
363
363
return nil
364
364
}
0 commit comments