Skip to content

Commit

Permalink
3rd - watermill
Browse files Browse the repository at this point in the history
  • Loading branch information
darjun committed Mar 1, 2020
1 parent 974c4aa commit 4a4b40a
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 1 deletion.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@
13. [email](https://darjun.github.io/2020/02/16/godailylib/email)
14. [dig](https://darjun.github.io/2020/02/22/godailylib/dig)
15. [gojsonq](https://darjun.github.io/2020/02/24/godailylib/gojsonq)
16. [message-bus](https://darjun.github.io/2020/02/26/godailylib/message-bus)
16. [message-bus](https://darjun.github.io/2020/02/26/godailylib/message-bus)
17. [watermill](https://darjun.github.io/2020/03/01/godailylib/watermill)
49 changes: 49 additions & 0 deletions watermill/get-started/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"context"
"log"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}

go process(messages)

publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}

func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg.Ack()
}
}
64 changes: 64 additions & 0 deletions watermill/middleware/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
logger = watermill.NewStdLogger(false, false)
)

func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}

pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
go publishMessages(pubSub)

router.AddMiddleware(middleware.Duplicator)

router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)
router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)

ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}

func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("in_topic", msg); err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}

func printMessages(msg *message.Message) error {
fmt.Printf("\n> Received message: %s\n> %s\n>\n", msg.UUID, string(msg.Payload))
return nil
}

type myHandler struct {
}

func (m myHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("myHandler received message", msg.UUID)

msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by myHandler"))
return message.Messages{msg}, nil
}
77 changes: 77 additions & 0 deletions watermill/middleware/self-defined/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
logger = watermill.NewStdLogger(false, false)
)

func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}

pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
go publishMessages(pubSub)

router.AddMiddleware(myMiddleware{Name: "dj"}.Middleware)

router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)
router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)

ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}

func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("in_topic", msg); err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}

func printMessages(msg *message.Message) error {
fmt.Printf("\n> Received message: %s\n> %s\n>\n", msg.UUID, string(msg.Payload))
return nil
}

type myHandler struct {
}

func (m myHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("myHandler received message", msg.UUID)

msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by myHandler"))
return message.Messages{msg}, nil
}

type myMiddleware struct {
Name string
}

func (m myMiddleware) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
fields := watermill.LogFields{"name": m.Name}
logger.Info("myMiddleware before", fields)
ms, err := h(message)
logger.Info("myMiddleware after", fields)
return ms, err
}
}
64 changes: 64 additions & 0 deletions watermill/persistent/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
logger = watermill.NewStdLogger(false, false)
)

func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}

pubSub := gochannel.NewGoChannel(gochannel.Config{
Persistent: true,
}, logger)
go publishMessages(pubSub)

router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)

router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)

ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}

func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("in_topic", msg); err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}

func printMessages(msg *message.Message) error {
fmt.Printf("\n> Received message: %s\n> %s\n>\n", msg.UUID, string(msg.Payload))
return nil
}

type myHandler struct {
}

func (m myHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("myHandler received message", msg.UUID)

msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by myHandler"))
return message.Messages{msg}, nil
}
59 changes: 59 additions & 0 deletions watermill/rabbitmq/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Sources for https://watermill.io/docs/getting-started/
package main

import (
"context"
"log"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://localhost:5672/"

func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

subscriber, err := amqp.NewSubscriber(
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}

messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}

go process(messages)

publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}

publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}

func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
msg.Ack()
}
}
62 changes: 62 additions & 0 deletions watermill/router/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
logger = watermill.NewStdLogger(false, false)
)

func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}

pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
go publishMessages(pubSub)

router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)

router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)

ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}

func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("in_topic", msg); err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}

func printMessages(msg *message.Message) error {
fmt.Printf("\n> Received message: %s\n> %s\n>\n", msg.UUID, string(msg.Payload))
return nil
}

type myHandler struct {
}

func (m myHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("myHandler received message", msg.UUID)

msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by myHandler"))
return message.Messages{msg}, nil
}

0 comments on commit 4a4b40a

Please sign in to comment.