-
Notifications
You must be signed in to change notification settings - Fork 246
/
driver.go
104 lines (87 loc) · 3.18 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package migrations
import (
"context"
"os"
"cloud.google.com/go/spanner"
admin "cloud.google.com/go/spanner/admin/database/apiv1"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/migrate"
)
const (
tableSchemaVersion = "schema_version"
colVersionNum = "version_num"
emulatorSettingKey = "SPANNER_EMULATOR_HOST"
)
// SpannerMigrationDriver can migrate a Cloud Spanner instance
// The adminClient is required for DDL changes
type SpannerMigrationDriver struct {
client *spanner.Client
adminClient *admin.DatabaseAdminClient
}
// Wrapper makes it possible to forward the spanner clients to the MigrationFunc's to execute
type Wrapper struct {
client *spanner.Client
adminClient *admin.DatabaseAdminClient
}
// NewSpannerDriver returns a migration driver for the given Cloud Spanner instance
func NewSpannerDriver(database, credentialsFilePath, emulatorHost string) (*SpannerMigrationDriver, error) {
ctx := context.Background()
if len(emulatorHost) > 0 {
err := os.Setenv(emulatorSettingKey, emulatorHost)
if err != nil {
return nil, err
}
}
log.Ctx(ctx).Info().Str("spanner-emulator-host", os.Getenv(emulatorSettingKey)).Msg("spanner emulator")
log.Ctx(ctx).Info().Str("credentials", credentialsFilePath).Str("db", database).Msg("connecting")
client, err := spanner.NewClient(ctx, database, option.WithCredentialsFile(credentialsFilePath))
if err != nil {
return nil, err
}
adminClient, err := admin.NewDatabaseAdminClient(ctx, option.WithCredentialsFile(credentialsFilePath))
if err != nil {
return nil, err
}
return &SpannerMigrationDriver{client, adminClient}, nil
}
func (smd *SpannerMigrationDriver) Version(ctx context.Context) (string, error) {
var schemaRevision string
if err := smd.client.Single().Read(
ctx,
tableSchemaVersion,
spanner.AllKeys(),
[]string{colVersionNum},
).Do(func(r *spanner.Row) error {
return r.Columns(&schemaRevision)
}); err != nil {
if spanner.ErrCode(err) == codes.NotFound {
// There is no schema table, empty database
return "", nil
}
return "", err
}
return schemaRevision, nil
}
// Conn returns the underlying spanner clients in a Wrapper instance for MigrationFunc to use
func (smd *SpannerMigrationDriver) Conn() Wrapper {
return Wrapper{client: smd.client, adminClient: smd.adminClient}
}
func (smd *SpannerMigrationDriver) RunTx(ctx context.Context, f migrate.TxMigrationFunc[*spanner.ReadWriteTransaction]) error {
_, err := smd.client.ReadWriteTransaction(ctx, func(ctx context.Context, rwt *spanner.ReadWriteTransaction) error {
return f(ctx, rwt)
})
return err
}
func (smd *SpannerMigrationDriver) WriteVersion(_ context.Context, rwt *spanner.ReadWriteTransaction, version, replaced string) error {
return rwt.BufferWrite([]*spanner.Mutation{
spanner.Delete(tableSchemaVersion, spanner.KeySetFromKeys(spanner.Key{replaced})),
spanner.Insert(tableSchemaVersion, []string{colVersionNum}, []interface{}{version}),
})
}
func (smd *SpannerMigrationDriver) Close(_ context.Context) error {
smd.client.Close()
return nil
}
var _ migrate.Driver[Wrapper, *spanner.ReadWriteTransaction] = &SpannerMigrationDriver{}