Skip to content

Commit

Permalink
Merge pull request #6 from henomis/feat-5-add-limiter-to-whatchablequeue
Browse files Browse the repository at this point in the history
feat: add limiter to watchablequeue
  • Loading branch information
henomis committed Dec 23, 2021
2 parents 654f964 + 810086a commit 1c238c2
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
5 changes: 4 additions & 1 deletion pkg/watchablequeue/examples/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"time"

"github.com/henomis/mailqueue-go/pkg/limiter"
mongowatchablequeue "github.com/henomis/mailqueue-go/pkg/watchablequeue/mongo"
)

Expand All @@ -28,6 +29,8 @@ func main() {
panic(err)
}

limiter := limiter.NewDefaultLimiter(3, 1*time.Minute, &limiter.RealSleeper{})

q, err := mongowatchablequeue.NewMongoQueue(
&mongowatchablequeue.MongoWatchableQueueOptions{
MongoEndpoint: os.Getenv("MONGO_ENDPOINT"),
Expand All @@ -37,6 +40,7 @@ func main() {
MongoDocumentFilter: `{"value.sent":false}`,
MongoUpdateOnCommit: `{"$set": {"value.sent": true}}`,
},
limiter,
)

if err != nil {
Expand Down Expand Up @@ -68,7 +72,6 @@ func main() {
log.Println("dec ", g)
q.Commit(g)
}
// q.Commit()

}
}()
Expand Down
21 changes: 13 additions & 8 deletions pkg/watchablequeue/mongo/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package mongowatchablequeue
import (
"context"
"fmt"
"time"

"github.com/henomis/mailqueue-go/pkg/limiter"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
Expand All @@ -15,12 +17,13 @@ type MongoWatchableQueue struct {
mongoCollectionCursor *mongo.Cursor
mongoCollectionCursorChannel chan interface{}
mongoCollectionWatchedFlag mongoWatchableQueueFlag
mongoQueueLimiter limiter.Limiter

MongoDocumentFilter bson.M
MongoUpdateOnCommit bson.M
}

func NewMongoQueue(mongoOptions *MongoWatchableQueueOptions) (*MongoWatchableQueue, error) {
func NewMongoQueue(mongoOptions *MongoWatchableQueueOptions, queueLimiter limiter.Limiter) (*MongoWatchableQueue, error) {

mongoQueue := &MongoWatchableQueue{}

Expand All @@ -39,6 +42,8 @@ func NewMongoQueue(mongoOptions *MongoWatchableQueueOptions) (*MongoWatchableQue
return nil, err
}

mongoQueue.mongoQueueLimiter = queueLimiter

return mongoQueue, err
}

Expand Down Expand Up @@ -66,13 +71,13 @@ func (q *MongoWatchableQueue) Dequeue(element interface{}) error {
}

//waiting limiter
// for {
// if q.Limiter.Allow() {
// break
// }
// //waiting limiter
// time.Sleep(1 * time.Second)
// }
for {
if q.mongoQueueLimiter.Allow() {
break
}
//waiting limiter
time.Sleep(1 * time.Second)
}

return q.mongoCollectionCursor.Decode(element)

Expand Down

0 comments on commit 1c238c2

Please sign in to comment.