Skip to content

Commit

Permalink
Add advisory locking to mongodb
Browse files Browse the repository at this point in the history
  • Loading branch information
andyN42 committed Sep 21, 2020
1 parent 50439fe commit 0f9cfff
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 10 deletions.
2 changes: 2 additions & 0 deletions database/mongodb/README.md
Expand Up @@ -13,6 +13,8 @@
|------------|---------------------|-------------|
| `x-migrations-collection` | `MigrationsCollection` | Name of the migrations collection |
| `x-transaction-mode` | `TransactionMode` | If set to `true` wrap commands in [transaction](https://docs.mongodb.com/manual/core/transactions). Available only for replica set. Driver is using [strconv.ParseBool](https://golang.org/pkg/strconv/#ParseBool) for parsing|
| `x-advisory-lock-collection` | `migrate_advisory_lock` | The name of the collection to use for advisory locking |
| `x-advisory-lock-backoff-seconds` | `15` | The max time that the advisory lock will wait during exponential backoff if the db is already locked. |
| `dbname` | `DatabaseName` | The name of the database to connect to |
| `user` | | The user to sign in as. Can be omitted |
| `password` | | The user's password. Can be omitted |
Expand Down
140 changes: 131 additions & 9 deletions database/mongodb/mongodb.go
Expand Up @@ -3,16 +3,20 @@ package mongodb
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/url"
"strconv"

"github.com/cenkalti/backoff/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/hashicorp/go-multierror"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
"io"
"io/ioutil"
"net/url"
os "os"
"strconv"
"time"
)

func init() {
Expand All @@ -23,6 +27,11 @@ func init() {

var DefaultMigrationsCollection = "schema_migrations"

const DefaultLockingCollection = "migrate_advisory_lock" // the collection to use for advisory locking by default.
const LockingKey = "locking_key" // the key to lock on, will have a unique=true index on it
const lockKeyUniqueValue = 0 // the unique value to lock on. If multiple clients try to insert the same key, it will fail (locked).
const LockingBackoffTime = 15 // the default maximum time to wait for a lock to be released

var (
ErrNoDatabaseName = fmt.Errorf("no database name")
ErrNilConfig = fmt.Errorf("no config")
Expand All @@ -31,13 +40,14 @@ var (
type Mongo struct {
client *mongo.Client
db *mongo.Database

config *Config
}

type Config struct {
DatabaseName string
MigrationsCollection string
LockingCollection string
LockingBackoffTime int
TransactionMode bool
}

Expand All @@ -46,6 +56,15 @@ type versionInfo struct {
Dirty bool `bson:"dirty"`
}

type lockObj struct {
Key int `bson:"locking_key"`
Pid int `bson:"pid"`
Name string `bson:"hostname"`
}
type findFilter struct {
Key int `bson:"locking_key"`
}

func WithInstance(instance *mongo.Client, config *Config) (database.Driver, error) {
if config == nil {
return nil, ErrNilConfig
Expand All @@ -56,28 +75,43 @@ func WithInstance(instance *mongo.Client, config *Config) (database.Driver, erro
if len(config.MigrationsCollection) == 0 {
config.MigrationsCollection = DefaultMigrationsCollection
}
if len(config.LockingCollection) == 0 {
config.LockingCollection = DefaultLockingCollection
}
if config.LockingBackoffTime <= 0 {
config.LockingBackoffTime = LockingBackoffTime
}

mc := &Mongo{
client: instance,
db: instance.Database(config.DatabaseName),
config: config,
}

if err := mc.ensureLockTable(); err != nil {
return nil, err
}
if err := mc.ensureVersionTable(); err != nil {
return nil, err
}

return mc, nil
}

func (m *Mongo) Open(dsn string) (database.Driver, error) {
//connsting is experimental package, but it used for parse connection string in mongo.Connect function
//connstring is experimental package, but it used for parse connection string in mongo.Connect function
uri, err := connstring.Parse(dsn)
if err != nil {
return nil, err
}
if len(uri.Database) == 0 {
return nil, ErrNoDatabaseName
}

unknown := url.Values(uri.UnknownOptions)

migrationsCollection := unknown.Get("x-migrations-collection")
lockCollection := unknown.Get("x-advisory-lock-collection")
lockingBackoffTime, _ := strconv.Atoi(unknown.Get("x-advisory-lock-backoff-seconds"))
transactionMode, _ := strconv.ParseBool(unknown.Get("x-transaction-mode"))

client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(dsn))
Expand All @@ -90,6 +124,8 @@ func (m *Mongo) Open(dsn string) (database.Driver, error) {
mc, err := WithInstance(client, &Config{
DatabaseName: uri.Database,
MigrationsCollection: migrationsCollection,
LockingCollection: lockCollection,
LockingBackoffTime: lockingBackoffTime,
TransactionMode: transactionMode,
})
if err != nil {
Expand Down Expand Up @@ -184,10 +220,96 @@ func (m *Mongo) Drop() error {
return m.db.Drop(context.TODO())
}

func (m *Mongo) Lock() error {
// Note that this could possibly have a race condition
// if three migrate processes try to create the index at the exact same time, but it
// takes a while for the first call to build the index (although it's an empty collection, so that may not take long)
// then two of them could return successful, while the index is still building, leading to
// the second and third processes to successfully insert a document (and "acquire" the lock),
// as duplicate keys would be allowed.
//
// This may not be an issue, if the collection is empty, and creating the lock takes next to no time.
//
func (m *Mongo) ensureLockTable() error {
indexes := m.db.Collection(m.config.LockingCollection).Indexes()
indexOptions := options.Index().SetUnique(true).SetName("lock_unique_key")
indexKeys := bsonx.MDoc{
LockingKey: bsonx.Int32(-1),
}
_, err := indexes.CreateOne(context.TODO(), mongo.IndexModel{
Options: indexOptions,
Keys: indexKeys,
})
if err != nil {
return err
}
return nil
}

// ensureVersionTable checks if versions table exists and, if not, creates it.
// Note that this function locks the database, which deviates from the usual
// convention of "caller locks" in the MongoDb type.
func (m *Mongo) ensureVersionTable() (err error) {
if err = m.Lock(); err != nil {
return err
}

defer func() {
if e := m.Unlock(); e != nil {
if err == nil {
err = e
} else {
err = multierror.Append(err, e)
}
}
}()

if err != nil {
return err
}
if _, _, err = m.Version(); err != nil {
return err
}
return nil
}

// Utilizes advisory locking on the config.LockingCollection collection
// This uses a unique index on the `locking_key` field.
func (m *Mongo) Lock() error {
pid := os.Getpid()
hostname, err := os.Hostname()
if err != nil {
hostname = fmt.Sprintf("Can't find hostname. %s", err.Error())
}

newLockObj := lockObj{
Key: lockKeyUniqueValue,
Pid: pid,
Name: hostname,
}
operation := func() error {
_, err := m.db.Collection(m.config.LockingCollection).InsertOne(context.TODO(), newLockObj)
return err
}
exponentialBackOff := backoff.NewExponentialBackOff()
duration := time.Duration(m.config.LockingBackoffTime) * time.Second
exponentialBackOff.MaxElapsedTime = duration
exponentialBackOff.MaxInterval = exponentialBackOff.MaxElapsedTime / 10

err = backoff.Retry(operation, exponentialBackOff)
if err != nil {
return database.ErrLocked
}
return nil

}
func (m *Mongo) Unlock() error {

filter := findFilter{
Key: lockKeyUniqueValue,
}
_, err := m.db.Collection(m.config.LockingCollection).DeleteMany(context.TODO(), filter)
if err != nil {
return err
}
return nil
}
55 changes: 54 additions & 1 deletion database/mongodb/mongodb_test.go
Expand Up @@ -92,7 +92,7 @@ func Test(t *testing.T) {
}
}()
dt.TestNilVersion(t, d)
//TestLockAndUnlock(t, d) driver doesn't support lock on database level
dt.TestLockAndUnlock(t, d)
dt.TestRun(t, d, bytes.NewReader([]byte(`[{"insert":"hello","documents":[{"wild":"world"}]}]`)))
dt.TestSetVersion(t, d)
dt.TestDrop(t, d)
Expand Down Expand Up @@ -180,6 +180,59 @@ func TestWithAuth(t *testing.T) {
})
}

func TestLockWorks(t *testing.T) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
ip, port, err := c.FirstPort()
if err != nil {
t.Fatal(err)
}

addr := mongoConnectionString(ip, port)
p := &Mongo{}
d, err := p.Open(addr)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
t.Error(err)
}
}()

dt.TestRun(t, d, bytes.NewReader([]byte(`[{"insert":"hello","documents":[{"wild":"world"}]}]`)))

mc := d.(*Mongo)

err = mc.Lock()
if err != nil {
t.Fatal(err)
}
err = mc.Unlock()
if err != nil {
t.Fatal(err)
}

err = mc.Lock()
if err != nil {
t.Fatal(err)
}
err = mc.Unlock()
if err != nil {
t.Fatal(err)
}

//try to hit a lock conflict
err = mc.Lock()
if err != nil {
t.Fatal(err)
}
err = mc.Lock()
if err == nil {
t.Fatal("should have failed, mongo should be locked already")
}
})
}

func TestTransaction(t *testing.T) {
transactionSpecs := []dktesting.ContainerSpec{
{ImageName: "mongo:4", Options: dktest.Options{PortRequired: true, ReadyFunc: isReady,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -9,6 +9,7 @@ require (
github.com/aws/aws-sdk-go v1.17.7
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/cenkalti/backoff/v4 v4.0.2
github.com/cockroachdb/cockroach-go v0.0.0-20190925194419-606b3d062051
github.com/cznic/mathutil v0.0.0-20180504122225-ca4c9f2c1369 // indirect
github.com/denisenkom/go-mssqldb v0.0.0-20200620013148-b91950f658ec
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -69,6 +69,8 @@ github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAK
github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs=
github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand Down

0 comments on commit 0f9cfff

Please sign in to comment.