-
Notifications
You must be signed in to change notification settings - Fork 134
/
migrations.go
159 lines (136 loc) · 3.12 KB
/
migrations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package database
import (
"bytes"
"context"
"io/fs"
"path"
"sort"
"strconv"
"strings"
"github.com/jackc/pgtype/pgxtype"
stakikfs "github.com/rakyll/statik/fs"
log "github.com/sirupsen/logrus"
)
// Migration represents a single, versioned database migration script
type Migration struct {
id int
name string
sql string
}
func NewMigration(id int, name string, sql string) Migration {
return Migration{
id: id,
name: name,
sql: sql,
}
}
func UpdateDatabase(ctx context.Context, db pgxtype.Querier, migrations []Migration) error {
log.Info("Updating postgres...")
version, err := readVersion(ctx, db)
if err != nil {
return err
}
log.Infof("Current version %v", version)
for _, m := range migrations {
if m.id > version {
log.Debugf("Executing %s", m.name)
_, err := db.Exec(ctx, m.sql)
if err != nil {
return err
}
version = m.id
err = setVersion(ctx, db, version)
if err != nil {
return err
}
}
}
log.Info("Database updated.")
return nil
}
func readVersion(ctx context.Context, db pgxtype.Querier) (int, error) {
_, err := db.Exec(ctx,
`CREATE SEQUENCE IF NOT EXISTS database_version START WITH 0 MINVALUE 0;`)
if err != nil {
return 0, err
}
result, err := db.Query(ctx,
`SELECT last_value FROM database_version`)
if err != nil {
return 0, err
}
defer result.Close()
var version int
result.Next()
err = result.Scan(&version)
return version, err
}
func setVersion(ctx context.Context, db pgxtype.Querier, version int) error {
_, err := db.Exec(ctx, `SELECT setval('database_version', $1)`, version)
return err
}
func ReadMigrations(fsys fs.FS, basePath string) ([]Migration, error) {
files, err := fs.ReadDir(fsys, basePath)
if err != nil {
return nil, err
}
sort.Slice(files, func(i, j int) bool { return files[i].Name() < files[j].Name() })
var migrations []Migration
for _, f := range files {
if f.IsDir() {
continue
}
bytes, err := fs.ReadFile(fsys, path.Join(basePath, f.Name()))
if err != nil {
return nil, err
}
id, err := strconv.Atoi(strings.Split(f.Name(), "_")[0])
if err != nil {
return nil, err
}
migrations = append(migrations, Migration{
id: id,
name: f.Name(),
sql: string(bytes),
})
}
return migrations, nil
}
// TODO: remove this when we've migrated over to iofs
func ReadMigrationsFromStatik(namespace string) ([]Migration, error) {
vfs, err := stakikfs.NewWithNamespace(namespace)
if err != nil {
return nil, err
}
dir, err := vfs.Open("/")
if err != nil {
return nil, err
}
files, err := dir.Readdir(-1)
if err != nil {
return nil, err
}
sort.Slice(files, func(i, j int) bool { return files[i].Name() < files[j].Name() })
var migrations []Migration
for _, f := range files {
file, err := vfs.Open("/" + f.Name())
if err != nil {
return nil, err
}
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(file)
if err != nil {
return nil, err
}
id, err := strconv.Atoi(strings.Split(f.Name(), "_")[0])
if err != nil {
return nil, err
}
migrations = append(migrations, Migration{
id: id,
name: f.Name(),
sql: buf.String(),
})
}
return migrations, nil
}