Closed
Description
Steps to reproduce
This is my test code:
package main
import (
"context"
stdSQL "database/sql"
"fmt"
"log"
"log/slog"
"math/rand"
"sync"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
"github.com/ThreeDotsLabs/watermill/components/cqrs"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
driver "github.com/go-sql-driver/mysql"
"google.golang.org/protobuf/types/known/timestamppb"
)
// BookRoomHandler is a command handler, which handles BookRoom command and emits RoomBooked.
//
// In CQRS, one command must be handled by only one handler.
// When another handler with this command is added to command processor, error will be returned.
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) Handle(ctx context.Context, cmd *BookRoom) error {
// some random price, in production you probably will calculate in wiser way
price := (rand.Int63n(40) + 1) * 10
slog.Info(
"Booked room",
"room_id", cmd.RoomId,
"guest_name", cmd.GuestName,
"start_date", time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
"end_date", time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked will be handled by OrderBeerOnRoomBooked event handler,
// in future RoomBooked may be handled by multiple event handler
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Price: price,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked is an event handler, which handles RoomBooked event and emits OrderBeer command.
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, event *RoomBooked) error {
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
panic("test")
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler is a command handler, which handles OrderBeer command and emits BeerOrdered.
// BeerOrdered is not handled by any event handler, but we may use persistent Pub/Sub to handle it in the future.
type OrderBeerHandler struct {
eventBus *cqrs.EventBus
}
func (o OrderBeerHandler) Handle(ctx context.Context, cmd *OrderBeer) error {
if rand.Int63n(10) == 0 {
// sometimes there is no beer left, command will be retried
return fmt.Errorf("no beer left for room %s, please try later", cmd.RoomId)
}
if err := o.eventBus.Publish(ctx, &BeerOrdered{
RoomId: cmd.RoomId,
Count: cmd.Count,
}); err != nil {
return err
}
slog.Info(fmt.Sprintf("%d beers ordered to room %s", cmd.Count, cmd.RoomId))
return nil
}
// BookingsFinancialReport is a read model, which calculates how much money we may earn from bookings.
// Like OrderBeerOnRoomBooked, it listens for RoomBooked event.
//
// This implementation is just writing to the memory. In production, you will probably will use some persistent storage.
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, event *RoomBooked) error {
// Handle may be called concurrently, so it need to be thread safe.
b.lock.Lock()
defer b.lock.Unlock()
// When we are using Pub/Sub which doesn't provide exactly-once delivery semantics, we need to deduplicate messages.
// GoChannel Pub/Sub provides exactly-once delivery,
// but let's make this example ready for other Pub/Sub implementations.
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
slog.Info(fmt.Sprintf(">>> Already booked rooms for $%d\n", b.totalCharge))
return nil
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "xxxx:3306"
conf.DBName = "watermill"
conf.Passwd = "xxxxx"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func main() {
logger := watermill.NewSlogLoggerWithLevelMapping(nil, map[slog.Level]slog.Level{
slog.LevelInfo: slog.LevelDebug,
})
cqrsMarshaler := cqrs.JSONMarshaler{
// It will generate topic names based on the event/command type.
// So for example, for "RoomBooked" name will be "RoomBooked".
//
// This value is used to generate topic names with "generateEventsTopic" and "generateCommandsTopic" functions.
GenerateName: cqrs.StructName,
}
generateEventsTopic := func(eventName string) string {
return "events." + eventName
}
generateCommandsTopic := func(commandName string) string {
return "commands." + commandName
}
db := createDB()
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be sent to queue, because they need to be consumed once.
// commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
AutoInitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
// 创建 CQRS 事件总线
eventBus, err := cqrs.NewEventBusWithConfig(publisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
return "events." + params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
// 创建 CQRS 命令总线
commandBus, err := cqrs.NewCommandBusWithConfig(
publisher,
cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
return "commands." + params.CommandName, nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
log.Fatalf("Failed to create command bus: %v", err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
return generateCommandsTopic(params.CommandName), nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return subscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventProcessorWithConfig(
router,
cqrs.EventProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
return generateEventsTopic(params.EventName), nil
},
SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return subscriber, nil
},
OnHandle: func(params cqrs.EventProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
cqrs.NewCommandHandler("BookRoomHandler", BookRoomHandler{eventBus}.Handle),
cqrs.NewCommandHandler("OrderBeerHandler", OrderBeerHandler{eventBus}.Handle),
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlers(
cqrs.NewEventHandler(
"OrderBeerOnRoomBooked",
OrderBeerOnRoomBooked{commandBus}.Handle,
),
cqrs.NewEventHandler(
"LogBeerOrdered",
func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
},
),
cqrs.NewEventHandler(
"BookingsFinancialReport",
NewBookingsFinancialReport().Handle,
),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
// go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
func publishCommands(commandBus *cqrs.CommandBus) {
i := 0
for {
i++
startDate := timestamppb.New(time.Now())
endDate := timestamppb.New(time.Now().Add(time.Hour * 24 * 3))
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "John",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
docker-compose.yml
services:
golang:
image: golang:1.23
restart: unless-stopped
volumes:
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir: /app
command: go run .
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
Expected behavior
If the Handler panics, the event ends immediately and does not keep retrying.
Actual behavior
In fact, although I did not set up the retry middleware, the event is still retrying
Possible solution
When using MySQL as pubsub, how can I set the corresponding configuration to meet my needs: do not retry even if a panic occurs