Skip to content

Commit

Permalink
add publisher
Browse files Browse the repository at this point in the history
Signed-off-by: Dusan Malusev <dusan@dusanmalusev.dev>
  • Loading branch information
CodeLieutenant committed Apr 23, 2024
1 parent d72aeca commit 61cd8f8
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 3 deletions.
8 changes: 7 additions & 1 deletion amqpfx/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,13 @@ func c[T consumer.Message](
}, fx.ResultTags(`name:"`+name+`"`))),
fx.Invoke(fx.Annotate(func(lc fx.Lifecycle, c consumer.Consumer[T]) {
lc.Append(fx.StartStopHook(
c.Start,
func(ctx context.Context) {
go func() {
if err := c.Start(ctx); err != nil {
panic(err)
}
}()
},
c.CloseWithContext,
))
},
Expand Down
46 changes: 46 additions & 0 deletions amqpfx/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package amqpfx

import (
"context"
"fmt"

"go.uber.org/fx"

"github.com/nano-interactive/go-amqp/v3/connection"
"github.com/nano-interactive/go-amqp/v3/publisher"
)

func GetPublisherName(connectionName, exchangeName string) string {
return fmt.Sprintf("amqp-publisher-%s-%s", exchangeName, connectionName)
}

func GetPublisherParamName(connectionName, exchangeName string) string {
return fmt.Sprintf(`name:"amqp-publisher-param-%s-%s"`, exchangeName, connectionName)
}

func PublisherModule[T any](
connectionOptions connection.Config,
exchangeName string,
options ...publisher.Option[T],
) fx.Option {
module := fmt.Sprintf("amqp-publisher-module-%s-%s", exchangeName, connectionOptions.ConnectionName)

return fx.Module(module, fx.Provide(fx.Annotate(func(lc fx.Lifecycle) (*publisher.Publisher[T], error) {
ctx, cancel := context.WithCancel(context.Background())

pub, err := publisher.New(ctx, connectionOptions, exchangeName, options...)
if err != nil {
cancel()
return nil, err
}

lc.Append(fx.StopHook(func(ctx context.Context) error {
cancel()
return pub.CloseWithContext(ctx)
}))

return pub, err
},
fx.ResultTags(GetPublisherParamName(connectionOptions.ConnectionName, exchangeName)), fx.As(new(publisher.Pub[T])),
)))
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/invopop/validation v0.3.0
github.com/jackc/pgx/v5 v5.5.5
github.com/nano-interactive/go-amqp/v3 v3.1.0
github.com/nano-interactive/go-amqp/v3 v3.2.0
github.com/nano-interactive/go-utils/v2 v2.0.12
github.com/rabbitmq/amqp091-go v1.9.0
github.com/rs/zerolog v1.32.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/nano-interactive/go-amqp/v3 v3.1.0 h1:kMIhAenH3GeQaJJZebGk+3mq2BsvFeGbshPQX7vpFTI=
github.com/nano-interactive/go-amqp/v3 v3.1.0/go.mod h1:WzjCL9Sgj6GsU4Hkbfylusa1L9/1U9tmNonJ4bYN5Z8=
github.com/nano-interactive/go-amqp/v3 v3.2.0 h1:NMbffwrb1Knqy/3cARaIEPUN+xZzhO37mG+WHtyYGdA=
github.com/nano-interactive/go-amqp/v3 v3.2.0/go.mod h1:WzjCL9Sgj6GsU4Hkbfylusa1L9/1U9tmNonJ4bYN5Z8=
github.com/nano-interactive/go-utils/v2 v2.0.12 h1:+useoofopUprRMQVWiid2iHMyzfJ4xqFL0xzJwllQCg=
github.com/nano-interactive/go-utils/v2 v2.0.12/go.mod h1:++tsLNqi3Cbn0za6BGJuKfhVXtB8AmChrxZ1ksvSUi0=
github.com/pelletier/go-toml/v2 v2.2.1 h1:9TA9+T8+8CUCO2+WYnDLCgrYi9+omqKXyjDtosvtEhg=
Expand Down
6 changes: 5 additions & 1 deletion http/fiber/fiberfx/fiberfx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ func RunApp(addr, appName string, shutdownTimeout time.Duration) fx.Option {
return fx.Invoke(fx.Annotate(func(app *fiber.App, lc fx.Lifecycle) {
lc.Append(fx.StartStopHook(
func() {
go func() { _ = app.Listen(addr) }()
go func() {
if err := app.Listen(addr); err != nil {
panic(err)
}
}()
},
func(ctx context.Context) error {
newCtx, cancel := context.WithTimeout(ctx, shutdownTimeout)
Expand Down

0 comments on commit 61cd8f8

Please sign in to comment.