From 7208a16bf3a18db687dc3ea3d109c571bd7037f8 Mon Sep 17 00:00:00 2001 From: Andy Newman Date: Mon, 14 Sep 2020 11:10:46 -0700 Subject: [PATCH 1/3] Add advisory locking to mongodb --- database/mongodb/README.md | 2 + database/mongodb/mongodb.go | 140 +++++++++++++++++++++++++++++-- database/mongodb/mongodb_test.go | 55 +++++++++++- go.mod | 1 + go.sum | 2 + 5 files changed, 190 insertions(+), 10 deletions(-) diff --git a/database/mongodb/README.md b/database/mongodb/README.md index 4204581a9..8c1e7d326 100644 --- a/database/mongodb/README.md +++ b/database/mongodb/README.md @@ -13,6 +13,8 @@ |------------|---------------------|-------------| | `x-migrations-collection` | `MigrationsCollection` | Name of the migrations collection | | `x-transaction-mode` | `TransactionMode` | If set to `true` wrap commands in [transaction](https://docs.mongodb.com/manual/core/transactions). Available only for replica set. Driver is using [strconv.ParseBool](https://golang.org/pkg/strconv/#ParseBool) for parsing| +| `x-advisory-lock-collection` | `migrate_advisory_lock` | The name of the collection to use for advisory locking | +| `x-advisory-lock-backoff-seconds` | `15` | The max time that the advisory lock will wait during exponential backoff if the db is already locked. | | `dbname` | `DatabaseName` | The name of the database to connect to | | `user` | | The user to sign in as. Can be omitted | | `password` | | The user's password. Can be omitted | diff --git a/database/mongodb/mongodb.go b/database/mongodb/mongodb.go index 95992ecb2..2f06b1011 100644 --- a/database/mongodb/mongodb.go +++ b/database/mongodb/mongodb.go @@ -3,16 +3,20 @@ package mongodb import ( "context" "fmt" - "io" - "io/ioutil" - "net/url" - "strconv" - + "github.com/cenkalti/backoff/v4" "github.com/golang-migrate/migrate/v4/database" + "github.com/hashicorp/go-multierror" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/x/bsonx" "go.mongodb.org/mongo-driver/x/mongo/driver/connstring" + "io" + "io/ioutil" + "net/url" + os "os" + "strconv" + "time" ) func init() { @@ -23,6 +27,11 @@ func init() { var DefaultMigrationsCollection = "schema_migrations" +const DefaultLockingCollection = "migrate_advisory_lock" // the collection to use for advisory locking by default. +const LockingKey = "locking_key" // the key to lock on, will have a unique=true index on it +const lockKeyUniqueValue = 0 // the unique value to lock on. If multiple clients try to insert the same key, it will fail (locked). +const LockingBackoffTime = 15 // the default maximum time to wait for a lock to be released + var ( ErrNoDatabaseName = fmt.Errorf("no database name") ErrNilConfig = fmt.Errorf("no config") @@ -31,13 +40,14 @@ var ( type Mongo struct { client *mongo.Client db *mongo.Database - config *Config } type Config struct { DatabaseName string MigrationsCollection string + LockingCollection string + LockingBackoffTime int TransactionMode bool } @@ -46,6 +56,15 @@ type versionInfo struct { Dirty bool `bson:"dirty"` } +type lockObj struct { + Key int `bson:"locking_key"` + Pid int `bson:"pid"` + Name string `bson:"hostname"` +} +type findFilter struct { + Key int `bson:"locking_key"` +} + func WithInstance(instance *mongo.Client, config *Config) (database.Driver, error) { if config == nil { return nil, ErrNilConfig @@ -56,17 +75,31 @@ func WithInstance(instance *mongo.Client, config *Config) (database.Driver, erro if len(config.MigrationsCollection) == 0 { config.MigrationsCollection = DefaultMigrationsCollection } + if len(config.LockingCollection) == 0 { + config.LockingCollection = DefaultLockingCollection + } + if config.LockingBackoffTime <= 0 { + config.LockingBackoffTime = LockingBackoffTime + } + mc := &Mongo{ client: instance, db: instance.Database(config.DatabaseName), config: config, } + if err := mc.ensureLockTable(); err != nil { + return nil, err + } + if err := mc.ensureVersionTable(); err != nil { + return nil, err + } + return mc, nil } func (m *Mongo) Open(dsn string) (database.Driver, error) { - //connsting is experimental package, but it used for parse connection string in mongo.Connect function + //connstring is experimental package, but it used for parse connection string in mongo.Connect function uri, err := connstring.Parse(dsn) if err != nil { return nil, err @@ -74,10 +107,11 @@ func (m *Mongo) Open(dsn string) (database.Driver, error) { if len(uri.Database) == 0 { return nil, ErrNoDatabaseName } - unknown := url.Values(uri.UnknownOptions) migrationsCollection := unknown.Get("x-migrations-collection") + lockCollection := unknown.Get("x-advisory-lock-collection") + lockingBackoffTime, _ := strconv.Atoi(unknown.Get("x-advisory-lock-backoff-seconds")) transactionMode, _ := strconv.ParseBool(unknown.Get("x-transaction-mode")) client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(dsn)) @@ -90,6 +124,8 @@ func (m *Mongo) Open(dsn string) (database.Driver, error) { mc, err := WithInstance(client, &Config{ DatabaseName: uri.Database, MigrationsCollection: migrationsCollection, + LockingCollection: lockCollection, + LockingBackoffTime: lockingBackoffTime, TransactionMode: transactionMode, }) if err != nil { @@ -184,10 +220,96 @@ func (m *Mongo) Drop() error { return m.db.Drop(context.TODO()) } -func (m *Mongo) Lock() error { +// Note that this could possibly have a race condition +// if three migrate processes try to create the index at the exact same time, but it +// takes a while for the first call to build the index (although it's an empty collection, so that may not take long) +// then two of them could return successful, while the index is still building, leading to +// the second and third processes to successfully insert a document (and "acquire" the lock), +// as duplicate keys would be allowed. +// +// This may not be an issue, if the collection is empty, and creating the lock takes next to no time. +// +func (m *Mongo) ensureLockTable() error { + indexes := m.db.Collection(m.config.LockingCollection).Indexes() + indexOptions := options.Index().SetUnique(true).SetName("lock_unique_key") + indexKeys := bsonx.MDoc{ + LockingKey: bsonx.Int32(-1), + } + _, err := indexes.CreateOne(context.TODO(), mongo.IndexModel{ + Options: indexOptions, + Keys: indexKeys, + }) + if err != nil { + return err + } + return nil +} + +// ensureVersionTable checks if versions table exists and, if not, creates it. +// Note that this function locks the database, which deviates from the usual +// convention of "caller locks" in the MongoDb type. +func (m *Mongo) ensureVersionTable() (err error) { + if err = m.Lock(); err != nil { + return err + } + + defer func() { + if e := m.Unlock(); e != nil { + if err == nil { + err = e + } else { + err = multierror.Append(err, e) + } + } + }() + + if err != nil { + return err + } + if _, _, err = m.Version(); err != nil { + return err + } return nil } +// Utilizes advisory locking on the config.LockingCollection collection +// This uses a unique index on the `locking_key` field. +func (m *Mongo) Lock() error { + pid := os.Getpid() + hostname, err := os.Hostname() + if err != nil { + hostname = fmt.Sprintf("Could not determine hostname. Error: %s", err.Error()) + } + + newLockObj := lockObj{ + Key: lockKeyUniqueValue, + Pid: pid, + Name: hostname, + } + operation := func() error { + _, err := m.db.Collection(m.config.LockingCollection).InsertOne(context.TODO(), newLockObj) + return err + } + exponentialBackOff := backoff.NewExponentialBackOff() + duration := time.Duration(m.config.LockingBackoffTime) * time.Second + exponentialBackOff.MaxElapsedTime = duration + exponentialBackOff.MaxInterval = exponentialBackOff.MaxElapsedTime / 10 + + err = backoff.Retry(operation, exponentialBackOff) + if err != nil { + return database.ErrLocked + } + return nil + +} func (m *Mongo) Unlock() error { + + filter := findFilter{ + Key: lockKeyUniqueValue, + } + _, err := m.db.Collection(m.config.LockingCollection).DeleteMany(context.TODO(), filter) + if err != nil { + return err + } return nil } diff --git a/database/mongodb/mongodb_test.go b/database/mongodb/mongodb_test.go index c0d09f387..6d104e1a8 100644 --- a/database/mongodb/mongodb_test.go +++ b/database/mongodb/mongodb_test.go @@ -92,7 +92,7 @@ func Test(t *testing.T) { } }() dt.TestNilVersion(t, d) - //TestLockAndUnlock(t, d) driver doesn't support lock on database level + dt.TestLockAndUnlock(t, d) dt.TestRun(t, d, bytes.NewReader([]byte(`[{"insert":"hello","documents":[{"wild":"world"}]}]`))) dt.TestSetVersion(t, d) dt.TestDrop(t, d) @@ -180,6 +180,59 @@ func TestWithAuth(t *testing.T) { }) } +func TestLockWorks(t *testing.T) { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.FirstPort() + if err != nil { + t.Fatal(err) + } + + addr := mongoConnectionString(ip, port) + p := &Mongo{} + d, err := p.Open(addr) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + + dt.TestRun(t, d, bytes.NewReader([]byte(`[{"insert":"hello","documents":[{"wild":"world"}]}]`))) + + mc := d.(*Mongo) + + err = mc.Lock() + if err != nil { + t.Fatal(err) + } + err = mc.Unlock() + if err != nil { + t.Fatal(err) + } + + err = mc.Lock() + if err != nil { + t.Fatal(err) + } + err = mc.Unlock() + if err != nil { + t.Fatal(err) + } + + //try to hit a lock conflict + err = mc.Lock() + if err != nil { + t.Fatal(err) + } + err = mc.Lock() + if err == nil { + t.Fatal("should have failed, mongo should be locked already") + } + }) +} + func TestTransaction(t *testing.T) { transactionSpecs := []dktesting.ContainerSpec{ {ImageName: "mongo:4", Options: dktest.Options{PortRequired: true, ReadyFunc: isReady, diff --git a/go.mod b/go.mod index b8746d8fb..fcef92a8d 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/aws/aws-sdk-go v1.17.7 github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/cenkalti/backoff/v4 v4.0.2 github.com/cockroachdb/cockroach-go v0.0.0-20190925194419-606b3d062051 github.com/cznic/mathutil v0.0.0-20180504122225-ca4c9f2c1369 // indirect github.com/denisenkom/go-mssqldb v0.0.0-20200620013148-b91950f658ec diff --git a/go.sum b/go.sum index 2ef1a3e45..aea4b9257 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAK github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs= +github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= From 73760ed4cb2869936da9dc945da1ab308af0d708 Mon Sep 17 00:00:00 2001 From: Andy Newman Date: Wed, 23 Sep 2020 12:28:16 -0700 Subject: [PATCH 2/3] Implement PR feedback rename constants rename query params add new timeout interval query param add new feature flag use timeout context --- database/mongodb/README.md | 4 +- database/mongodb/mongodb.go | 105 +++++++++++++++++++------------ database/mongodb/mongodb_test.go | 14 +++++ 3 files changed, 82 insertions(+), 41 deletions(-) diff --git a/database/mongodb/README.md b/database/mongodb/README.md index 8c1e7d326..4bd76f63b 100644 --- a/database/mongodb/README.md +++ b/database/mongodb/README.md @@ -13,8 +13,10 @@ |------------|---------------------|-------------| | `x-migrations-collection` | `MigrationsCollection` | Name of the migrations collection | | `x-transaction-mode` | `TransactionMode` | If set to `true` wrap commands in [transaction](https://docs.mongodb.com/manual/core/transactions). Available only for replica set. Driver is using [strconv.ParseBool](https://golang.org/pkg/strconv/#ParseBool) for parsing| +| `x-advisory-locking` | `true` | Feature flag for advisory locking, if set to false, disable advisory locking | | `x-advisory-lock-collection` | `migrate_advisory_lock` | The name of the collection to use for advisory locking | -| `x-advisory-lock-backoff-seconds` | `15` | The max time that the advisory lock will wait during exponential backoff if the db is already locked. | +| `x-advisory-lock-timout` | `15` | The max time in seconds that the advisory lock will wait if the db is already locked. | +| `x-advisory-lock-timout-interval` | `10` | The max timeout in seconds interval that the advisory lock will wait if the db is already locked. | | `dbname` | `DatabaseName` | The name of the database to connect to | | `user` | | The user to sign in as. Can be omitted | | `password` | | The user's password. Can be omitted | diff --git a/database/mongodb/mongodb.go b/database/mongodb/mongodb.go index 2f06b1011..9dae2ac3c 100644 --- a/database/mongodb/mongodb.go +++ b/database/mongodb/mongodb.go @@ -9,7 +9,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/x/bsonx" "go.mongodb.org/mongo-driver/x/mongo/driver/connstring" "io" "io/ioutil" @@ -28,9 +27,12 @@ func init() { var DefaultMigrationsCollection = "schema_migrations" const DefaultLockingCollection = "migrate_advisory_lock" // the collection to use for advisory locking by default. -const LockingKey = "locking_key" // the key to lock on, will have a unique=true index on it const lockKeyUniqueValue = 0 // the unique value to lock on. If multiple clients try to insert the same key, it will fail (locked). -const LockingBackoffTime = 15 // the default maximum time to wait for a lock to be released +const DefaultLockTimeout = 15 // the default maximum time to wait for a lock to be released. +const DefaultLockTimeoutInterval = 10 // the default maximum intervals time for the locking timout. +const DefaultAdvisoryLockingFlag = true // the default value for the advisory locking feature flag. Default is true. +const LockIndexName = "lock_unique_key" // the name of the index which adds unique constraint to the locking_key field. +const contextWaitTimeout = 5 * time.Second // how long to wait for the request to mongo to block/wait for. var ( ErrNoDatabaseName = fmt.Errorf("no database name") @@ -43,23 +45,28 @@ type Mongo struct { config *Config } +type Locking struct { + CollectionName string + Timeout int + UseAdvisoryLocking bool + Interval int +} type Config struct { DatabaseName string MigrationsCollection string - LockingCollection string - LockingBackoffTime int TransactionMode bool + Locking Locking } - type versionInfo struct { Version int `bson:"version"` Dirty bool `bson:"dirty"` } type lockObj struct { - Key int `bson:"locking_key"` - Pid int `bson:"pid"` - Name string `bson:"hostname"` + Key int `bson:"locking_key"` + Pid int `bson:"pid"` + Hostname string `bson:"hostname"` + CreatedAt time.Time `bson:"created_at"` } type findFilter struct { Key int `bson:"locking_key"` @@ -75,11 +82,14 @@ func WithInstance(instance *mongo.Client, config *Config) (database.Driver, erro if len(config.MigrationsCollection) == 0 { config.MigrationsCollection = DefaultMigrationsCollection } - if len(config.LockingCollection) == 0 { - config.LockingCollection = DefaultLockingCollection + if len(config.Locking.CollectionName) == 0 { + config.Locking.CollectionName = DefaultLockingCollection + } + if config.Locking.Timeout <= 0 { + config.Locking.Timeout = DefaultLockTimeout } - if config.LockingBackoffTime <= 0 { - config.LockingBackoffTime = LockingBackoffTime + if config.Locking.Interval <= 0 { + config.Locking.Interval = DefaultLockTimeoutInterval } mc := &Mongo{ @@ -88,8 +98,10 @@ func WithInstance(instance *mongo.Client, config *Config) (database.Driver, erro config: config, } - if err := mc.ensureLockTable(); err != nil { - return nil, err + if mc.config.Locking.UseAdvisoryLocking { + if err := mc.ensureLockTable(); err != nil { + return nil, err + } } if err := mc.ensureVersionTable(); err != nil { return nil, err @@ -111,22 +123,32 @@ func (m *Mongo) Open(dsn string) (database.Driver, error) { migrationsCollection := unknown.Get("x-migrations-collection") lockCollection := unknown.Get("x-advisory-lock-collection") - lockingBackoffTime, _ := strconv.Atoi(unknown.Get("x-advisory-lock-backoff-seconds")) transactionMode, _ := strconv.ParseBool(unknown.Get("x-transaction-mode")) + advisoryLockingFlag, err := strconv.ParseBool(unknown.Get("x-advisory-locking")) + if err != nil { + advisoryLockingFlag = DefaultAdvisoryLockingFlag + } + lockingTimout, _ := strconv.Atoi(unknown.Get("x-advisory-lock-timeout")) + maxLockingIntervals, _ := strconv.Atoi(unknown.Get("x-advisory-lock-timout-interval")) client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(dsn)) if err != nil { return nil, err } + if err = client.Ping(context.TODO(), nil); err != nil { return nil, err } mc, err := WithInstance(client, &Config{ DatabaseName: uri.Database, MigrationsCollection: migrationsCollection, - LockingCollection: lockCollection, - LockingBackoffTime: lockingBackoffTime, TransactionMode: transactionMode, + Locking: Locking{ + CollectionName: lockCollection, + Timeout: lockingTimout, + UseAdvisoryLocking: advisoryLockingFlag, + Interval: maxLockingIntervals, + }, }) if err != nil { return nil, err @@ -220,24 +242,13 @@ func (m *Mongo) Drop() error { return m.db.Drop(context.TODO()) } -// Note that this could possibly have a race condition -// if three migrate processes try to create the index at the exact same time, but it -// takes a while for the first call to build the index (although it's an empty collection, so that may not take long) -// then two of them could return successful, while the index is still building, leading to -// the second and third processes to successfully insert a document (and "acquire" the lock), -// as duplicate keys would be allowed. -// -// This may not be an issue, if the collection is empty, and creating the lock takes next to no time. -// func (m *Mongo) ensureLockTable() error { - indexes := m.db.Collection(m.config.LockingCollection).Indexes() - indexOptions := options.Index().SetUnique(true).SetName("lock_unique_key") - indexKeys := bsonx.MDoc{ - LockingKey: bsonx.Int32(-1), - } + indexes := m.db.Collection(m.config.Locking.CollectionName).Indexes() + + indexOptions := options.Index().SetUnique(true).SetName(LockIndexName) _, err := indexes.CreateOne(context.TODO(), mongo.IndexModel{ Options: indexOptions, - Keys: indexKeys, + Keys: findFilter{Key: -1}, }) if err != nil { return err @@ -275,6 +286,9 @@ func (m *Mongo) ensureVersionTable() (err error) { // Utilizes advisory locking on the config.LockingCollection collection // This uses a unique index on the `locking_key` field. func (m *Mongo) Lock() error { + if !m.config.Locking.UseAdvisoryLocking { + return nil + } pid := os.Getpid() hostname, err := os.Hostname() if err != nil { @@ -282,32 +296,43 @@ func (m *Mongo) Lock() error { } newLockObj := lockObj{ - Key: lockKeyUniqueValue, - Pid: pid, - Name: hostname, + Key: lockKeyUniqueValue, + Pid: pid, + Hostname: hostname, + CreatedAt: time.Now(), } operation := func() error { - _, err := m.db.Collection(m.config.LockingCollection).InsertOne(context.TODO(), newLockObj) + timeout, cancelFunc := context.WithTimeout(context.Background(), contextWaitTimeout) + _, err := m.db.Collection(m.config.Locking.CollectionName).InsertOne(timeout, newLockObj) + defer cancelFunc() return err } exponentialBackOff := backoff.NewExponentialBackOff() - duration := time.Duration(m.config.LockingBackoffTime) * time.Second + duration := time.Duration(m.config.Locking.Timeout) * time.Second exponentialBackOff.MaxElapsedTime = duration - exponentialBackOff.MaxInterval = exponentialBackOff.MaxElapsedTime / 10 + exponentialBackOff.MaxInterval = time.Duration(m.config.Locking.Interval) * time.Second err = backoff.Retry(operation, exponentialBackOff) if err != nil { return database.ErrLocked } + return nil } func (m *Mongo) Unlock() error { + if !m.config.Locking.UseAdvisoryLocking { + return nil + } filter := findFilter{ Key: lockKeyUniqueValue, } - _, err := m.db.Collection(m.config.LockingCollection).DeleteMany(context.TODO(), filter) + + ctx, cancel := context.WithTimeout(context.Background(), contextWaitTimeout) + _, err := m.db.Collection(m.config.Locking.CollectionName).DeleteMany(ctx, filter) + defer cancel() + if err != nil { return err } diff --git a/database/mongodb/mongodb_test.go b/database/mongodb/mongodb_test.go index 6d104e1a8..50587e29a 100644 --- a/database/mongodb/mongodb_test.go +++ b/database/mongodb/mongodb_test.go @@ -221,7 +221,21 @@ func TestLockWorks(t *testing.T) { t.Fatal(err) } + // disable locking, validate wer can lock twice + mc.config.Locking.UseAdvisoryLocking = false + err = mc.Lock() + if err != nil { + t.Fatal(err) + } + err = mc.Lock() + if err != nil { + t.Fatal(err) + } + + // re-enable locking, //try to hit a lock conflict + mc.config.Locking.UseAdvisoryLocking = true + mc.config.Locking.Timeout = 1 err = mc.Lock() if err != nil { t.Fatal(err) From 2c2f6910f4f72ba862c5c93811e41a7db7320f24 Mon Sep 17 00:00:00 2001 From: Andy Newman Date: Fri, 25 Sep 2020 12:04:58 -0700 Subject: [PATCH 3/3] Rename mongo flag for enabling locking. Add validation around parsing mongo params. --- database/mongodb/README.md | 2 +- database/mongodb/mongodb.go | 75 ++++++++++++++++++++++++-------- database/mongodb/mongodb_test.go | 4 +- 3 files changed, 61 insertions(+), 20 deletions(-) diff --git a/database/mongodb/README.md b/database/mongodb/README.md index 4bd76f63b..32b736fae 100644 --- a/database/mongodb/README.md +++ b/database/mongodb/README.md @@ -14,7 +14,7 @@ | `x-migrations-collection` | `MigrationsCollection` | Name of the migrations collection | | `x-transaction-mode` | `TransactionMode` | If set to `true` wrap commands in [transaction](https://docs.mongodb.com/manual/core/transactions). Available only for replica set. Driver is using [strconv.ParseBool](https://golang.org/pkg/strconv/#ParseBool) for parsing| | `x-advisory-locking` | `true` | Feature flag for advisory locking, if set to false, disable advisory locking | -| `x-advisory-lock-collection` | `migrate_advisory_lock` | The name of the collection to use for advisory locking | +| `x-advisory-lock-collection` | `migrate_advisory_lock` | The name of the collection to use for advisory locking.| | `x-advisory-lock-timout` | `15` | The max time in seconds that the advisory lock will wait if the db is already locked. | | `x-advisory-lock-timout-interval` | `10` | The max timeout in seconds interval that the advisory lock will wait if the db is already locked. | | `dbname` | `DatabaseName` | The name of the database to connect to | diff --git a/database/mongodb/mongodb.go b/database/mongodb/mongodb.go index 9dae2ac3c..17ca804f2 100644 --- a/database/mongodb/mongodb.go +++ b/database/mongodb/mongodb.go @@ -46,10 +46,10 @@ type Mongo struct { } type Locking struct { - CollectionName string - Timeout int - UseAdvisoryLocking bool - Interval int + CollectionName string + Timeout int + Enabled bool + Interval int } type Config struct { DatabaseName string @@ -98,7 +98,7 @@ func WithInstance(instance *mongo.Client, config *Config) (database.Driver, erro config: config, } - if mc.config.Locking.UseAdvisoryLocking { + if mc.config.Locking.Enabled { if err := mc.ensureLockTable(); err != nil { return nil, err } @@ -123,14 +123,22 @@ func (m *Mongo) Open(dsn string) (database.Driver, error) { migrationsCollection := unknown.Get("x-migrations-collection") lockCollection := unknown.Get("x-advisory-lock-collection") - transactionMode, _ := strconv.ParseBool(unknown.Get("x-transaction-mode")) - advisoryLockingFlag, err := strconv.ParseBool(unknown.Get("x-advisory-locking")) + transactionMode, err := parseBoolean(unknown.Get("x-transaction-mode"), false) if err != nil { - advisoryLockingFlag = DefaultAdvisoryLockingFlag + return nil, err + } + advisoryLockingFlag, err := parseBoolean(unknown.Get("x-advisory-locking"), DefaultAdvisoryLockingFlag) + if err != nil { + return nil, err + } + lockingTimout, err := parseInt(unknown.Get("x-advisory-lock-timeout"), DefaultLockTimeout) + if err != nil { + return nil, err + } + maxLockingIntervals, err := parseInt(unknown.Get("x-advisory-lock-timout-interval"), DefaultLockTimeoutInterval) + if err != nil { + return nil, err } - lockingTimout, _ := strconv.Atoi(unknown.Get("x-advisory-lock-timeout")) - maxLockingIntervals, _ := strconv.Atoi(unknown.Get("x-advisory-lock-timout-interval")) - client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(dsn)) if err != nil { return nil, err @@ -144,10 +152,10 @@ func (m *Mongo) Open(dsn string) (database.Driver, error) { MigrationsCollection: migrationsCollection, TransactionMode: transactionMode, Locking: Locking{ - CollectionName: lockCollection, - Timeout: lockingTimout, - UseAdvisoryLocking: advisoryLockingFlag, - Interval: maxLockingIntervals, + CollectionName: lockCollection, + Timeout: lockingTimout, + Enabled: advisoryLockingFlag, + Interval: maxLockingIntervals, }, }) if err != nil { @@ -156,6 +164,39 @@ func (m *Mongo) Open(dsn string) (database.Driver, error) { return mc, nil } +//Parse the url param, convert it to boolean +// returns error if param invalid. returns defaultValue if param not present +func parseBoolean(urlParam string, defaultValue bool) (bool, error) { + + // if parameter passed, parse it (otherwise return default value) + if urlParam != "" { + result, err := strconv.ParseBool(urlParam) + if err != nil { + return false, err + } + return result, nil + } + + // if no url Param passed, return default value + return defaultValue, nil +} + +//Parse the url param, convert it to int +// returns error if param invalid. returns defaultValue if param not present +func parseInt(urlParam string, defaultValue int) (int, error) { + + // if parameter passed, parse it (otherwise return default value) + if urlParam != "" { + result, err := strconv.Atoi(urlParam) + if err != nil { + return -1, err + } + return result, nil + } + + // if no url Param passed, return default value + return defaultValue, nil +} func (m *Mongo) SetVersion(version int, dirty bool) error { migrationsCollection := m.db.Collection(m.config.MigrationsCollection) if err := migrationsCollection.Drop(context.TODO()); err != nil { @@ -286,7 +327,7 @@ func (m *Mongo) ensureVersionTable() (err error) { // Utilizes advisory locking on the config.LockingCollection collection // This uses a unique index on the `locking_key` field. func (m *Mongo) Lock() error { - if !m.config.Locking.UseAdvisoryLocking { + if !m.config.Locking.Enabled { return nil } pid := os.Getpid() @@ -321,7 +362,7 @@ func (m *Mongo) Lock() error { } func (m *Mongo) Unlock() error { - if !m.config.Locking.UseAdvisoryLocking { + if !m.config.Locking.Enabled { return nil } diff --git a/database/mongodb/mongodb_test.go b/database/mongodb/mongodb_test.go index 50587e29a..c73da46c4 100644 --- a/database/mongodb/mongodb_test.go +++ b/database/mongodb/mongodb_test.go @@ -222,7 +222,7 @@ func TestLockWorks(t *testing.T) { } // disable locking, validate wer can lock twice - mc.config.Locking.UseAdvisoryLocking = false + mc.config.Locking.Enabled = false err = mc.Lock() if err != nil { t.Fatal(err) @@ -234,7 +234,7 @@ func TestLockWorks(t *testing.T) { // re-enable locking, //try to hit a lock conflict - mc.config.Locking.UseAdvisoryLocking = true + mc.config.Locking.Enabled = true mc.config.Locking.Timeout = 1 err = mc.Lock() if err != nil {