Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: add MethodHandler #739

Merged
merged 1 commit into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion docs/primitives/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,32 @@ func SendWelcomeEmail(ctx context.Context, event *SignupEvent) error {

Subscriptions can be in the same service as the topic is declared, or in any other service of your application. Each
subscription to a single topic receives the events independently of any other subscriptions to the same topic. This means
that if one subscription is running very slowly, it will grow a backlog of unprocessed events. However, any other subscriptions will still be processing events in real-time as they are published.
that if one subscription is running very slowly, it will grow a backlog of unprocessed events.
However, any other subscriptions will still be processing events in real-time as they are published.

### Method-based handlers

When using [service structs](/docs/primitives/services-and-apis#service-structs) for dependency injection
it's common to want to define the subscription handler as a method on the service struct, to be able to access the
injected dependencies. The pubsub package provides the `pubsub.MethodHandler` function for this purpose:

```go
//encore:service
type Service struct { /* ... */ }

func (s *Service) SendWelcomeEmail(ctx context.Context, event *SignupEvent) error {
// ...
}

var _ = pubsub.NewSubscription(
user.Signups, "send-welcome-email",
pubsub.SubscriptionConfig[*SignupEvent]{
Handler: pubsub.MethodHandler((*Service).SendWelcomeEmail),
},
)
```

Note that `pubsub.MethodHandler` only allows referencing methods on the service struct type, not any other type.

### Subscription configuration

Expand Down
5 changes: 5 additions & 0 deletions e2e-tests/echo_app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

qt "github.com/frankban/quicktest"
"go.uber.org/goleak"
Expand Down Expand Up @@ -119,6 +120,10 @@ func doTestEndToEndWithApp(t *testing.T, env []string) {
req := httptest.NewRequest("POST", "/echo.Publish", nil)
run.ServeHTTP(w, req)
c.Assert(w.Code, qt.Equals, 200)

// Wait a bit to allow the message to be consumed.
time.Sleep(100 * time.Millisecond)

stats, err := app.NSQ.Stats()
c.Assert(err, qt.IsNil)
c.Assert(len(stats.Producers), qt.Equals, 1)
Expand Down
52 changes: 52 additions & 0 deletions e2e-tests/testdata/testscript/pubsub_method_handler.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
run
publish pointer '{"Data": "test"}'
publish non-pointer '{"Data": "test"}'
checklog '{"topic": "pointer", "subscription": "pointer", "event": {"Data": "test"}, "message": "pointer method"}'
checklog '{"topic": "non-pointer", "subscription": "non-pointer", "event": {"Data": "test"}, "message": "non-pointer method"}'


-- svc/svc.go --
package svc

import (
"context"
"encore.dev/rlog"
"encore.dev/pubsub"
)

//encore:service
type Service struct{}

func (s *Service) PointerMethod(ctx context.Context, event *Event) error {
rlog.Info("pointer method", "event", event)
return nil
}

func (s Service) NonPointerMethod(ctx context.Context, event *Event) error {
rlog.Info("non-pointer method", "event", event)
return nil
}

type Event struct {
Data string
}

var Pointer = pubsub.NewTopic[*Event]("pointer", pubsub.TopicConfig{
DeliveryGuarantee: pubsub.AtLeastOnce,
})

var NonPointer = pubsub.NewTopic[*Event]("non-pointer", pubsub.TopicConfig{
DeliveryGuarantee: pubsub.AtLeastOnce,
})

var _ = pubsub.NewSubscription(Pointer, "pointer",
pubsub.SubscriptionConfig[*Event]{
Handler: pubsub.MethodHandler((*Service).PointerMethod),
},
)

var _ = pubsub.NewSubscription(NonPointer, "non-pointer",
pubsub.SubscriptionConfig[*Event]{
Handler: pubsub.MethodHandler(Service.NonPointerMethod),
},
)
2 changes: 1 addition & 1 deletion e2e-tests/testscript_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func doRun(t *testing.T, experiments []string) {
if topic.Depth == 0 {
break
}
ts.Logf("waiting for queue to be processed, depth: %d", topic.Depth)
ts.Logf("waiting for %q queue to be processed, depth: %d", topic.TopicName, topic.Depth)
time.Sleep(100 * time.Millisecond)
}
}
Expand Down
36 changes: 34 additions & 2 deletions runtime/appruntime/apisdk/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"fmt"
"sync"

"github.com/rs/zerolog"
Expand All @@ -14,7 +15,15 @@ import (

// Initializer is a service initializer.
type Initializer interface {
// ServiceName reports the name of the service.
ServiceName() string

// InitService initializes the service.
InitService() error

// GetDecl returns the service declaration,
// initializing it if necessary.
GetDecl() (any, error)
}

type Decl[T any] struct {
Expand All @@ -39,10 +48,22 @@ func (g *Decl[T]) Get() (*T, error) {
return g.instance, err
}

// GetDecl returns the API Decl, initializing it if necessary.
func (g *Decl[T]) GetDecl() (any, error) {
if err := g.InitService(); err != nil {
return nil, err
}
return g.instance, nil
}

func (g *Decl[T]) InitService() error {
return g.setupOnce.Do(func() error { return doSetupService(Singleton, g) })
}

func (g *Decl[T]) ServiceName() string {
return g.Service
}

func doSetupService[T any](mgr *Manager, decl *Decl[T]) (err error) {
curr := mgr.rt.Current()
if curr.Trace != nil && curr.Req != nil && decl.SetupDefLoc != 0 {
Expand All @@ -67,7 +88,7 @@ func doSetupService[T any](mgr *Manager, decl *Decl[T]) (err error) {
i, err := setupFn()
if err != nil {
mgr.rt.Logger().Error().Err(err).Str("service", decl.Service).Msg("service initialization failed")
return errs.B().Code(errs.Internal).Msg("service initialization failed").Err()
return errs.B().Code(errs.Internal).Msgf("service %s: initialization failed", decl.Service).Err()
}
decl.instance = i

Expand All @@ -85,19 +106,25 @@ type shutdowner interface {
}

func NewManager(rt *reqtrack.RequestTracker, rootLogger zerolog.Logger) *Manager {
return &Manager{rt: rt, rootLogger: rootLogger}
return &Manager{rt: rt, rootLogger: rootLogger, svcMap: make(map[string]Initializer)}
}

type Manager struct {
rt *reqtrack.RequestTracker
rootLogger zerolog.Logger
svcInit []Initializer
svcMap map[string]Initializer

shutdownMu sync.Mutex
shutdownHandlers []shutdowner
}

func (mgr *Manager) RegisterService(i Initializer) {
name := i.ServiceName()
if _, ok := mgr.svcMap[name]; ok {
panic(fmt.Sprintf("service %s: already registered", name))
}
mgr.svcMap[name] = i
mgr.svcInit = append(mgr.svcInit, i)
}

Expand All @@ -122,6 +149,11 @@ func (mgr *Manager) InitializeServices() error {
return nil
}

func (mgr *Manager) GetService(name string) (i Initializer, ok bool) {
i, ok = mgr.svcMap[name]
return i, ok
}

func (mgr *Manager) Shutdown(force context.Context) {
mgr.shutdownMu.Lock()
handlers := mgr.shutdownHandlers
Expand Down
26 changes: 26 additions & 0 deletions runtime/appruntime/apisdk/service/singleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package service

import (
"fmt"

"encore.dev/appruntime/shared/logging"
"encore.dev/appruntime/shared/reqtrack"
)
Expand All @@ -12,3 +14,27 @@ var Singleton = NewManager(reqtrack.Singleton, logging.RootLogger)
func Register(i Initializer) {
Singleton.RegisterService(i)
}

// Get returns the service initializer with the given name.
// The declaration is cast to the given type T.
func Get[T any](name string) (T, error) {
svc, ok := Singleton.GetService(name)
if !ok {
var zero T
return zero, fmt.Errorf("service.Get(%q): unknown service %s", name)
}

decl, err := svc.GetDecl()
if err != nil {
var zero T
return zero, err
}

s, ok := decl.(T)
if !ok {
var zero T
return zero, fmt.Errorf("service.Get(%q): service is of type %T, not %T", name, decl, zero)
}

return s, nil
}
16 changes: 11 additions & 5 deletions runtime/pubsub/internal/nsq/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,19 @@ func (l *topic) Subscribe(logger *zerolog.Logger, ackDeadline time.Duration, ret
return nil
}))

// connect the consumer to the NSQD
err = consumer.ConnectToNSQD(l.addr)
if err != nil {
panic(fmt.Sprintf("failed to connect %s to nsqd for topic %s: %v", implCfg.EncoreName, l.name, err))
}
// add the consumer to the known consumers
l.consumers[implCfg.EncoreName] = consumer

go func() {
// Allow the rest of the service to initialize before we connect to NSQD.
// This is necessary because NSQD is so fast the receiver can process messages
// before all package-level initialization functions have been called.
time.Sleep(100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to do this, than "100ms sleep" - this seems like we're just asking for trouble in the future; could there not be a apisdk.WaitForInit() method, which returns true once all services are init'ed

err = consumer.ConnectToNSQD(l.addr)
if err != nil {
panic(fmt.Sprintf("failed to connect %s to nsqd for topic %s: %v", implCfg.EncoreName, l.name, err))
}
}()
}

// PublishMessage publishes a message to an nsq Topic
Expand Down
25 changes: 24 additions & 1 deletion runtime/pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"fmt"
"time"

jsoniter "github.com/json-iterator/go"
Expand Down Expand Up @@ -49,7 +50,7 @@ type Subscription[T any] struct {
//
// var Subscription = pubsub.NewSubscription(MyTopic, "my-subscription", pubsub.SubscriptionConfig[*MyEvent]{
// Handler: HandleEvent,
// RetryPolicy: &pubsub.RetryPolicy { MaxRetries: 10 },
// RetryPolicy: &pubsub.RetryPolicy{MaxRetries: 10},
// })
//
// func HandleEvent(ctx context.Context, event *MyEvent) error {
Expand Down Expand Up @@ -278,3 +279,25 @@ func marshalParams[Resp any](json jsoniter.API, resp Resp) []byte {
data, _ := json.Marshal(resp)
return data
}

// MethodHandler is used to define a subscription Handler that references a service struct method.
//
// Example Usage:
//
// //encore:service
// type Service struct {}
//
// func (s *Service) Method(ctx context.Context, msg *Event) error { /* ... */ }
//
// var _ = pubsub.NewSubscription(Topic, "subscription-name", pubsub.SubscriptionConfig[*Event]{
// Handler: pubsub.MethodHandler((*MyService).MyMethod),
// // ...
// })
func MethodHandler[T, SvcStruct any](handler func(s SvcStruct, ctx context.Context, msg T) error) func(ctx context.Context, msg T) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ Perfect, adding support while keeping type satefy in the IDE

// The use of MethodHandler acts as a sentinel for the code generator,
// which replaces the call with some generated code to initialize the service struct.
// As such this function should never be called in practice.
return func(ctx context.Context, msg T) error {
return fmt.Errorf("pubsub.MethodHandler is not usable in this context")
}
}
15 changes: 11 additions & 4 deletions runtime/pubsub/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,27 @@ const extCorrelationIDAttribute = "encore_ext_correlation_id"
// the target cloud. (i.e. ack deadline may be brought within the supported range
// by the target cloud pubsub implementation).
type SubscriptionConfig[T any] struct {
// The function which will be called to process a message
// Handler is the function which will be called to process a message
// sent on the topic.
//
// It is important for this function to block and not return
// To reference a method on an [Encore service struct]
// you can use the [MethodHandler] function. For example:
//
// Handler: pubsub.MethodHandler((*MyService).MyMethod)
//
// It is important for the Handler function to block and not return
// until all processing relating to the message has been completed.
//
// When this function returns a `nil`, the message will be
// When the handler returns a nil error the message will be
// acknowledged (acked) from the topic, and should not be redelivered.
//
// When this function returns an `error`, the message will be
// When this function returns a non-nil error the message will be
// negatively acknowledged (nacked), which will cause a redelivery
// attempt to be made (unless the retry policy's MaxRetries has been reached).
//
// This field is required.
//
// [Encore service struct]: https://encore.dev/docs/primitives/services-and-apis#service-structs
Handler func(ctx context.Context, msg T) error

// Filter is a boolean expression using =, !=, IN, &&
Expand Down
Loading