Skip to content

Commit

Permalink
Merge pull request #464 from RichardKnop/fix-mongodb-backend-factory-…
Browse files Browse the repository at this point in the history
…test-issue

fix: fixed a unit test which actually tried to connect to mongo
  • Loading branch information
RichardKnop committed Sep 5, 2019
2 parents 7b1dc90 + a819600 commit 45c722b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 32 deletions.
63 changes: 41 additions & 22 deletions v1/backends/mongo/mongodb.go
Expand Up @@ -6,33 +6,36 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/RichardKnop/machinery/v1/backends/iface"
"github.com/RichardKnop/machinery/v1/common"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/log"
"github.com/RichardKnop/machinery/v1/tasks"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

// Backend represents a MongoDB result backend
type Backend struct {
common.Backend
client *mongo.Client
tasksCollection *mongo.Collection
groupMetasCollection *mongo.Collection
client *mongo.Client
tc *mongo.Collection
gmc *mongo.Collection
once sync.Once
}

// New creates Backend instance
func New(cnf *config.Config) (iface.Backend, error) {
backend := &Backend{Backend: common.NewBackend(cnf)}
err := backend.connect()
if err != nil {
return nil, err
backend := &Backend{
Backend: common.NewBackend(cnf),
once: sync.Once{},
}

return backend, nil
}

Expand All @@ -43,7 +46,7 @@ func (b *Backend) InitGroup(groupUUID string, taskUUIDs []string) error {
TaskUUIDs: taskUUIDs,
CreatedAt: time.Now().UTC(),
}
_, err := b.groupMetasCollection.InsertOne(context.Background(), groupMeta)
_, err := b.groupMetasCollection().InsertOne(context.Background(), groupMeta)
return err
}

Expand Down Expand Up @@ -94,7 +97,7 @@ func (b *Backend) TriggerChord(groupUUID string) (bool, error) {
},
}

_, err := b.groupMetasCollection.UpdateOne(context.Background(), query, change, options.Update())
_, err := b.groupMetasCollection().UpdateOne(context.Background(), query, change, options.Update())

if err != nil {
if err == mongo.ErrNoDocuments {
Expand Down Expand Up @@ -175,7 +178,7 @@ func (b *Backend) SetStateFailure(signature *tasks.Signature, err string) error
// GetState returns the latest task state
func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) {
state := &tasks.TaskState{}
err := b.tasksCollection.FindOne(context.Background(), bson.M{"_id": taskUUID}).Decode(state)
err := b.tasksCollection().FindOne(context.Background(), bson.M{"_id": taskUUID}).Decode(state)

if err != nil {
return nil, err
Expand All @@ -185,13 +188,13 @@ func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) {

// PurgeState deletes stored task state
func (b *Backend) PurgeState(taskUUID string) error {
_, err := b.tasksCollection.DeleteOne(context.Background(), bson.M{"_id": taskUUID})
_, err := b.tasksCollection().DeleteOne(context.Background(), bson.M{"_id": taskUUID})
return err
}

// PurgeGroupMeta deletes stored group meta data
func (b *Backend) PurgeGroupMeta(groupUUID string) error {
_, err := b.groupMetasCollection.DeleteOne(context.Background(), bson.M{"_id": groupUUID})
_, err := b.groupMetasCollection().DeleteOne(context.Background(), bson.M{"_id": groupUUID})
return err
}

Expand All @@ -207,15 +210,15 @@ func (b *Backend) lockGroupMeta(groupUUID string) error {
},
}

_, err := b.groupMetasCollection.UpdateOne(context.Background(), query, change, options.Update().SetUpsert(true))
_, err := b.groupMetasCollection().UpdateOne(context.Background(), query, change, options.Update().SetUpsert(true))

return err
}

// unlockGroupMeta releases lock on groupUUID document
func (b *Backend) unlockGroupMeta(groupUUID string) error {
update := bson.M{"$set": bson.M{"lock": false}}
_, err := b.groupMetasCollection.UpdateOne(context.Background(), bson.M{"_id": groupUUID}, update, options.Update())
_, err := b.groupMetasCollection().UpdateOne(context.Background(), bson.M{"_id": groupUUID}, update, options.Update())
return err
}

Expand All @@ -224,7 +227,7 @@ func (b *Backend) getGroupMeta(groupUUID string) (*tasks.GroupMeta, error) {
groupMeta := &tasks.GroupMeta{}
query := bson.M{"_id": groupUUID}

err := b.groupMetasCollection.FindOne(context.Background(), query).Decode(groupMeta)
err := b.groupMetasCollection().FindOne(context.Background(), query).Decode(groupMeta)
if err != nil {
return nil, err
}
Expand All @@ -234,7 +237,7 @@ func (b *Backend) getGroupMeta(groupUUID string) (*tasks.GroupMeta, error) {
// getStates returns multiple task states
func (b *Backend) getStates(taskUUIDs ...string) ([]*tasks.TaskState, error) {
states := make([]*tasks.TaskState, 0, len(taskUUIDs))
cur, err := b.tasksCollection.Find(context.Background(), bson.M{"_id": bson.M{"$in": taskUUIDs}})
cur, err := b.tasksCollection().Find(context.Background(), bson.M{"_id": bson.M{"$in": taskUUIDs}})
if err != nil {
return nil, err
}
Expand All @@ -256,10 +259,26 @@ func (b *Backend) getStates(taskUUIDs ...string) ([]*tasks.TaskState, error) {
// updateState saves current task state
func (b *Backend) updateState(signature *tasks.Signature, update bson.M) error {
update = bson.M{"$set": update}
_, err := b.tasksCollection.UpdateOne(context.Background(), bson.M{"_id": signature.UUID}, update, options.Update().SetUpsert(true))
_, err := b.tasksCollection().UpdateOne(context.Background(), bson.M{"_id": signature.UUID}, update, options.Update().SetUpsert(true))
return err
}

func (b *Backend) tasksCollection() *mongo.Collection {
b.once.Do(func() {
b.connect()
})

return b.tc
}

func (b *Backend) groupMetasCollection() *mongo.Collection {
b.once.Do(func() {
b.connect()
})

return b.gmc
}

// connect creates the underlying mgo connection if it doesn't exist
// creates required indexes for our collections
func (b *Backend) connect() error {
Expand All @@ -275,8 +294,8 @@ func (b *Backend) connect() error {
database = b.GetConfig().MongoDB.Database
}

b.tasksCollection = b.client.Database(database).Collection("tasks")
b.groupMetasCollection = b.client.Database(database).Collection("group_metas")
b.tc = b.client.Database(database).Collection("tasks")
b.gmc = b.client.Database(database).Collection("group_metas")

err = b.createMongoIndexes(database)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion v1/factories.go
Expand Up @@ -20,10 +20,10 @@ import (
amqpbackend "github.com/RichardKnop/machinery/v1/backends/amqp"
dynamobackend "github.com/RichardKnop/machinery/v1/backends/dynamodb"
eagerbackend "github.com/RichardKnop/machinery/v1/backends/eager"
nullbackend "github.com/RichardKnop/machinery/v1/backends/null"
backendiface "github.com/RichardKnop/machinery/v1/backends/iface"
memcachebackend "github.com/RichardKnop/machinery/v1/backends/memcache"
mongobackend "github.com/RichardKnop/machinery/v1/backends/mongo"
nullbackend "github.com/RichardKnop/machinery/v1/backends/null"
redisbackend "github.com/RichardKnop/machinery/v1/backends/redis"
)

Expand Down
19 changes: 10 additions & 9 deletions v1/factories_test.go
Expand Up @@ -17,6 +17,7 @@ import (
redisbroker "github.com/RichardKnop/machinery/v1/brokers/redis"
sqsbroker "github.com/RichardKnop/machinery/v1/brokers/sqs"

mongobackend "github.com/RichardKnop/machinery/v1/backends/mongo"
amqpbackend "github.com/RichardKnop/machinery/v1/backends/amqp"
memcachebackend "github.com/RichardKnop/machinery/v1/backends/memcache"
redisbackend "github.com/RichardKnop/machinery/v1/backends/redis"
Expand Down Expand Up @@ -374,16 +375,16 @@ func TestBackendFactory(t *testing.T) {
ResultsExpireIn: 30,
}

_, err = machinery.BackendFactory(&cnf)
actual, err = machinery.BackendFactory(&cnf)
if assert.NoError(t, err) {
//expected, err := mongobackend.New(&cnf)
//if assert.NoError(t, err) {
// assert.True(
// t,
// reflect.DeepEqual(actual, expected),
// fmt.Sprintf("conn = %v, want %v", actual, expected),
// )
//}
expected, err := mongobackend.New(&cnf)
if assert.NoError(t, err) {
assert.True(
t,
reflect.DeepEqual(actual, expected),
fmt.Sprintf("conn = %v, want %v", actual, expected),
)
}
}
}

Expand Down

0 comments on commit 45c722b

Please sign in to comment.