Skip to content

Commit

Permalink
Merge pull request #2 from henomis/feat-generic-queue
Browse files Browse the repository at this point in the history
feat: implemented wathable queue in mongo
  • Loading branch information
henomis committed Dec 22, 2021
2 parents 2326202 + 985e7a5 commit 557a28c
Show file tree
Hide file tree
Showing 7 changed files with 454 additions and 0 deletions.
91 changes: 91 additions & 0 deletions pkg/watchablequeue/examples/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package main

import (
"fmt"
"log"
"time"

mongowatchablequeue "github.com/henomis/mailqueue-go/pkg/watchablequeue/mongo"
)

type Pippo struct {
Name string `bson:"name"`
Value int64 `bson:"value"`
Sent bool `bson:"sent"`
}

func (p *Pippo) String() string {
return fmt.Sprintf("name: %s value: %d", p.Name, p.Value)
}

func main() {

q, err := mongowatchablequeue.NewMongoQueue(
&mongowatchablequeue.MongoWatchableQueueOptions{
MongoEndpoint: "mongodb+srv://admin:s0n0su4tl4s@cluster0.3jd0r.mongodb.net/?retryWrites=true&w=majority",
MongoDatabase: "prova",
MongoCollection: "test",
MongoCappedSize: 10000,
MongoDocumentFilter: `{"value.sent":false}`,
MongoUpdateOnCommit: `{"$set": {"value.sent": true}}`,
},
)
// , "prova", "test", 10000)
if err != nil {
panic(err)
}

pippo := Pippo{

Name: "pippo3",
Value: time.Now().Unix(),
Sent: false,
}
pippo2 := &mongowatchablequeue.MongoElement{}

container := &mongowatchablequeue.MongoElement{
Value: pippo,
}

// q.Enqueue(&pippo)

// err = q.Dequeue(&pippo2)
// if err != nil {
// panic(err)
// }

ch, err := q.Watch(pippo2)
if err != nil {
panic(err)
}

go func() {
for i := range ch {

g, ok := i.(*mongowatchablequeue.MongoElement)
if ok {

log.Println("dec ", g)
q.Commit(g)
}
// q.Commit()

}
}()

// go func(channel <-chan interface{}) {
// for v := range ch {

// e := v.(*Pippo)
// log.Printf("%+v\n", e)
// }

// }(ch)

time.Sleep(1 * time.Second)
log.Println("equeue")
q.Enqueue(container)

time.Sleep(10 * time.Second)

}
22 changes: 22 additions & 0 deletions pkg/watchablequeue/mongo/element.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package mongowatchablequeue

import "fmt"

type MongoElement struct {
ID string `bson:"_id"`
Value interface{} `bson:"value"`
}

func (m *MongoElement) String() string {
return fmt.Sprintf("ID: %s, Value: %+v", m.ID, m.Value)
}

func validateMongoElement(element interface{}) (*MongoElement, error) {

mongoElement, ok := element.(*MongoElement)
if !ok {
return nil, fmt.Errorf("invalid element")
}

return mongoElement, nil
}
20 changes: 20 additions & 0 deletions pkg/watchablequeue/mongo/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package mongowatchablequeue

import "sync"

type mongoWatchableQueueFlag struct {
sync.Mutex
watched bool
}

func (f *mongoWatchableQueueFlag) SetWatched(value bool) {
f.Lock()
defer f.Unlock()
f.watched = value
}

func (f *mongoWatchableQueueFlag) IsWatched() bool {
f.Lock()
defer f.Unlock()
return f.watched
}
10 changes: 10 additions & 0 deletions pkg/watchablequeue/mongo/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package mongowatchablequeue

type MongoWatchableQueueOptions struct {
MongoEndpoint string
MongoDatabase string
MongoCollection string
MongoCappedSize int64
MongoDocumentFilter string
MongoUpdateOnCommit string
}
127 changes: 127 additions & 0 deletions pkg/watchablequeue/mongo/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package mongowatchablequeue

import (
"context"
"fmt"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

type MongoWatchableQueue struct {
mongoClient *mongo.Client
mongoDatabase *mongo.Database
mongoCollection *mongo.Collection
mongoCollectionCursor *mongo.Cursor
mongoCollectionCursorChannel chan interface{}
mongoCollectionWatchedFlag mongoWatchableQueueFlag

MongoDocumentFilter bson.M
MongoUpdateOnCommit bson.M
}

func NewMongoQueue(mongoOptions *MongoWatchableQueueOptions) (*MongoWatchableQueue, error) {

mongoQueue := &MongoWatchableQueue{}

err := createMongoConnection(mongoQueue, mongoOptions)
if err != nil {
return nil, err
}

err = mongoQueue.selectDatabaseAndCollection(mongoOptions)
if err != nil {
return nil, err
}

err = mongoQueue.setupMongoFilterAndUpdateCommit(mongoOptions)
if err != nil {
return nil, err
}

return mongoQueue, err
}

func (q *MongoWatchableQueue) Enqueue(element interface{}) error {

mongoElement, err := validateMongoElement(element)
if err != nil {
return err
}
mongoElement.ID = randomObjectID()

_, err = q.mongoCollection.InsertOne(context.Background(), mongoElement)
return err
}

func (q *MongoWatchableQueue) Dequeue(element interface{}) error {

if q.mongoClient == nil {
return fmt.Errorf("invalid mongo client")
}

err := q.waitNextMongoDocument()
if err != nil {
return err
}

//waiting limiter
// for {
// if q.Limiter.Allow() {
// break
// }
// //waiting limiter
// time.Sleep(1 * time.Second)
// }

return q.mongoCollectionCursor.Decode(element)

}

func (q *MongoWatchableQueue) Unwatch() {
q.mongoCollectionWatchedFlag.SetWatched(false)
}

func (q *MongoWatchableQueue) Watch(element interface{}) (<-chan interface{}, error) {

if q.mongoCollectionCursorChannel != nil {
return nil, fmt.Errorf("this queue is already watched")
}

q.mongoCollectionCursorChannel = make(chan interface{})
q.mongoCollectionWatchedFlag.SetWatched(true)

go func(mongoQueue *MongoWatchableQueue, queueElement interface{}) {
for mongoQueue.mongoCollectionWatchedFlag.IsWatched() {

err := mongoQueue.Dequeue(queueElement)
if err != nil {
mongoQueue.closeMongoCollectionCursorChannels()
return
}

mongoQueue.mongoCollectionCursorChannel <- queueElement

}
mongoQueue.closeMongoCollectionCursorChannels()

}(q, element)

return q.mongoCollectionCursorChannel, nil

}

//Commit mongodb implementation
func (q *MongoWatchableQueue) Commit(element interface{}) error {

mongoElement, err := validateMongoElement(element)
if err != nil {
return err
}

_, err = q.mongoCollection.UpdateOne(context.Background(), bson.M{"_id": mongoElement.ID}, q.MongoUpdateOnCommit)
if err != nil {
return err
}
return nil
}
Loading

0 comments on commit 557a28c

Please sign in to comment.