Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use semver directory to carry the schema version #930

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ vendor/
.vscode/
.idea
dist/
migrate
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ run:
linters:
enable:
#- golint
- interfacer
- unconvert
#- dupl
- goconst
Expand Down
19 changes: 14 additions & 5 deletions database/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"strings"
"time"

"github.com/gocql/gocql"
"github.com/hashicorp/go-multierror"
"go.uber.org/atomic"

"github.com/gocql/gocql"
"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/multistmt"
"github.com/hashicorp/go-multierror"
)

func init() {
Expand Down Expand Up @@ -165,7 +165,8 @@ func (c *Cassandra) Open(url string) (database.Driver, error) {
}

if len(u.Query().Get("disable-host-lookup")) > 0 {
if flag, err := strconv.ParseBool(u.Query().Get("disable-host-lookup")); err != nil && flag {
if flag, err := strconv.ParseBool(u.Query().Get("disable-host-lookup")); err != nil &&
flag {
cluster.DisableInitialHostLookup = true
} else if err != nil {
return nil, err
Expand Down Expand Up @@ -243,6 +244,10 @@ func (c *Cassandra) Run(migration io.Reader) error {
return nil
}

func (c *Cassandra) SetMigrationRecord(rec *database.MigrationRecord) error {
return c.SetVersion(rec.Version, rec.Dirty)
}

func (c *Cassandra) SetVersion(version int, dirty bool) error {
// DELETE instead of TRUNCATE because AWS Keyspaces does not support it
// see: https://docs.aws.amazon.com/keyspaces/latest/devguide/cassandra-apis.html
Expand Down Expand Up @@ -293,7 +298,10 @@ func (c *Cassandra) Version() (version int, dirty bool, err error) {

func (c *Cassandra) Drop() error {
// select all tables in current schema
query := fmt.Sprintf(`SELECT table_name from system_schema.tables WHERE keyspace_name='%s'`, c.config.KeyspaceName)
query := fmt.Sprintf(
`SELECT table_name from system_schema.tables WHERE keyspace_name='%s'`,
c.config.KeyspaceName,
)
iter := c.session.Query(query).Iter()
var tableName string
for iter.Scan(&tableName) {
Expand Down Expand Up @@ -324,7 +332,8 @@ func (c *Cassandra) ensureVersionTable() (err error) {
}
}()

err = c.session.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (version bigint, dirty boolean, PRIMARY KEY(version))", c.config.MigrationsTable)).Exec()
err = c.session.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (version bigint, dirty boolean, PRIMARY KEY(version))", c.config.MigrationsTable)).
Exec()
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions database/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"strings"
"time"

"github.com/hashicorp/go-multierror"
"go.uber.org/atomic"

"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/multistmt"
"github.com/hashicorp/go-multierror"
)

var (
Expand Down Expand Up @@ -163,6 +163,7 @@ func (ch *ClickHouse) Run(r io.Reader) error {

return nil
}

func (ch *ClickHouse) Version() (int, bool, error) {
var (
version int
Expand All @@ -178,6 +179,10 @@ func (ch *ClickHouse) Version() (int, bool, error) {
return version, dirty == 1, nil
}

func (ch *ClickHouse) SetMigrationRecord(rec *database.MigrationRecord) error {
return ch.SetVersion(rec.Version, rec.Dirty)
}

func (ch *ClickHouse) SetVersion(version int, dirty bool) error {
var (
bool = func(v bool) uint8 {
Expand Down Expand Up @@ -261,7 +266,6 @@ func (ch *ClickHouse) ensureVersionTable() (err error) {
func (ch *ClickHouse) Drop() (err error) {
query := "SHOW TABLES FROM " + ch.config.DatabaseName
tables, err := ch.conn.Query(query)

if err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
Expand Down Expand Up @@ -297,6 +301,7 @@ func (ch *ClickHouse) Lock() error {

return nil
}

func (ch *ClickHouse) Unlock() error {
if !ch.isLocked.CAS(true, false) {
return database.ErrNotLocked
Expand Down
135 changes: 83 additions & 52 deletions database/cockroachdb/cockroachdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"strconv"

"github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/hashicorp/go-multierror"
"github.com/lib/pq"
"go.uber.org/atomic"

"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
)

func init() {
Expand All @@ -24,8 +25,10 @@ func init() {
database.Register("crdb-postgres", &db)
}

var DefaultMigrationsTable = "schema_migrations"
var DefaultLockTable = "schema_lock"
var (
DefaultMigrationsTable = "schema_migrations"
DefaultLockTable = "schema_lock"
)

var (
ErrNilConfig = fmt.Errorf("no config")
Expand Down Expand Up @@ -147,67 +150,91 @@ func (c *CockroachDb) Close() error {
// Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed
// See: https://github.com/cockroachdb/cockroach/issues/13546
func (c *CockroachDb) Lock() error {
return database.CasRestoreOnErr(&c.isLocked, false, true, database.ErrLocked, func() (err error) {
return crdb.ExecuteTx(context.Background(), c.db, nil, func(tx *sql.Tx) (err error) {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
}

query := "SELECT * FROM " + c.config.LockTable + " WHERE lock_id = $1"
rows, err := tx.Query(query, aid)
if err != nil {
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)}
}
defer func() {
if errClose := rows.Close(); errClose != nil {
err = multierror.Append(err, errClose)
return database.CasRestoreOnErr(
&c.isLocked,
false,
true,
database.ErrLocked,
func() (err error) {
return crdb.ExecuteTx(context.Background(), c.db, nil, func(tx *sql.Tx) (err error) {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
}
}()

// If row exists at all, lock is present
locked := rows.Next()
if locked && !c.config.ForceLock {
return database.ErrLocked
}
query := "SELECT * FROM " + c.config.LockTable + " WHERE lock_id = $1"
rows, err := tx.Query(query, aid)
if err != nil {
return database.Error{
OrigErr: err,
Err: "failed to fetch migration lock",
Query: []byte(query),
}
}
defer func() {
if errClose := rows.Close(); errClose != nil {
err = multierror.Append(err, errClose)
}
}()

// If row exists at all, lock is present
locked := rows.Next()
if locked && !c.config.ForceLock {
return database.ErrLocked
}

query = "INSERT INTO " + c.config.LockTable + " (lock_id) VALUES ($1)"
if _, err := tx.Exec(query, aid); err != nil {
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)}
}
query = "INSERT INTO " + c.config.LockTable + " (lock_id) VALUES ($1)"
if _, err := tx.Exec(query, aid); err != nil {
return database.Error{
OrigErr: err,
Err: "failed to set migration lock",
Query: []byte(query),
}
}

return nil
})
})
return nil
})
},
)
}

// Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed
// See: https://github.com/cockroachdb/cockroach/issues/13546
func (c *CockroachDb) Unlock() error {
return database.CasRestoreOnErr(&c.isLocked, true, false, database.ErrNotLocked, func() (err error) {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
}
return database.CasRestoreOnErr(
&c.isLocked,
true,
false,
database.ErrNotLocked,
func() (err error) {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
}

// In the event of an implementation (non-migration) error, it is possible for the lock to not be released. Until
// a better locking mechanism is added, a manual purging of the lock table may be required in such circumstances
query := "DELETE FROM " + c.config.LockTable + " WHERE lock_id = $1"
if _, err := c.db.Exec(query, aid); err != nil {
if e, ok := err.(*pq.Error); ok {
// 42P01 is "UndefinedTableError" in CockroachDB
// https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/pgwire/pgerror/codes.go
if e.Code == "42P01" {
// On drops, the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema
return nil
// In the event of an implementation (non-migration) error, it is possible for the lock to not be released. Until
// a better locking mechanism is added, a manual purging of the lock table may be required in such circumstances
query := "DELETE FROM " + c.config.LockTable + " WHERE lock_id = $1"
if _, err := c.db.Exec(query, aid); err != nil {
if e, ok := err.(*pq.Error); ok {
// 42P01 is "UndefinedTableError" in CockroachDB
// https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/pgwire/pgerror/codes.go
if e.Code == "42P01" {
// On drops, the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema
return nil
}
}
}

return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)}
}
return database.Error{
OrigErr: err,
Err: "failed to release migration lock",
Query: []byte(query),
}
}

return nil
})
return nil
},
)
}

func (c *CockroachDb) Run(migration io.Reader) error {
Expand All @@ -225,6 +252,10 @@ func (c *CockroachDb) Run(migration io.Reader) error {
return nil
}

func (c *CockroachDb) SetMigrationRecord(rec *database.MigrationRecord) error {
return c.SetVersion(rec.Version, rec.Dirty)
}

func (c *CockroachDb) SetVersion(version int, dirty bool) error {
return crdb.ExecuteTx(context.Background(), c.db, nil, func(tx *sql.Tx) error {
if _, err := tx.Exec(`DELETE FROM "` + c.config.MigrationsTable + `"`); err != nil {
Expand Down
17 changes: 15 additions & 2 deletions database/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@ var (

const NilVersion int = -1

var driversMu sync.RWMutex
var drivers = make(map[string]Driver)
var (
driversMu sync.RWMutex
drivers = make(map[string]Driver)
)

type MigrationRecord struct {
Version int
Identifier string
Dirty bool
}

// Driver is the interface every database driver must implement.
//
Expand Down Expand Up @@ -65,6 +73,11 @@ type Driver interface {
// Run applies a migration to the database. migration is guaranteed to be not nil.
Run(migration io.Reader) error

// SetMigrationRecord saves version, identifier and dirty state etc.
// Migrate will call this function before and after each call to Run.
// version must be >= -1. -1 means NilVersion.
SetMigrationRecord(rec *MigrationRecord) error

// SetVersion saves version and dirty state.
// Migrate will call this function before and after each call to Run.
// version must be >= -1. -1 means NilVersion.
Expand Down
18 changes: 12 additions & 6 deletions database/driver_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package database
package database_test

import (
"io"
"testing"

"github.com/golang-migrate/migrate/v4/database"
)

func ExampleDriver() {
Expand All @@ -18,7 +20,7 @@ type mockDriver struct {
url string
}

func (m *mockDriver) Open(url string) (Driver, error) {
func (m *mockDriver) Open(url string) (database.Driver, error) {
return &mockDriver{
url: url,
}, nil
Expand All @@ -40,6 +42,10 @@ func (m *mockDriver) Run(migration io.Reader) error {
return nil
}

func (m *mockDriver) SetMigrationRecord(rec *database.MigrationRecord) error {
return nil
}

func (m *mockDriver) SetVersion(version int, dirty bool) error {
return nil
}
Expand All @@ -53,14 +59,14 @@ func (m *mockDriver) Drop() error {
}

func TestRegisterTwice(t *testing.T) {
Register("mock", &mockDriver{})
database.Register("mock", &mockDriver{})

var err interface{}
func() {
defer func() {
err = recover()
}()
Register("mock", &mockDriver{})
database.Register("mock", &mockDriver{})
}()

if err == nil {
Expand All @@ -76,7 +82,7 @@ func TestOpen(t *testing.T) {
defer func() {
_ = recover()
}()
Register("mock", &mockDriver{})
database.Register("mock", &mockDriver{})
}()

cases := []struct {
Expand All @@ -95,7 +101,7 @@ func TestOpen(t *testing.T) {

for _, c := range cases {
t.Run(c.url, func(t *testing.T) {
d, err := Open(c.url)
d, err := database.Open(c.url)

if err == nil {
if c.err {
Expand Down
Loading