Skip to content

Commit

Permalink
pubsub: add MethodHandler
Browse files Browse the repository at this point in the history
This adds support for having Pub/Sub handlers
that reference a method on a service struct.
  • Loading branch information
eandre committed Jun 2, 2023
1 parent 93e54f1 commit 89cee3e
Show file tree
Hide file tree
Showing 16 changed files with 492 additions and 27 deletions.
5 changes: 5 additions & 0 deletions e2e-tests/echo_app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

qt "github.com/frankban/quicktest"
"go.uber.org/goleak"
Expand Down Expand Up @@ -119,6 +120,10 @@ func doTestEndToEndWithApp(t *testing.T, env []string) {
req := httptest.NewRequest("POST", "/echo.Publish", nil)
run.ServeHTTP(w, req)
c.Assert(w.Code, qt.Equals, 200)

// Wait a bit to allow the message to be consumed.
time.Sleep(100 * time.Millisecond)

stats, err := app.NSQ.Stats()
c.Assert(err, qt.IsNil)
c.Assert(len(stats.Producers), qt.Equals, 1)
Expand Down
52 changes: 52 additions & 0 deletions e2e-tests/testdata/testscript/pubsub_method_handler.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
run
publish pointer '{"Data": "test"}'
publish non-pointer '{"Data": "test"}'
checklog '{"topic": "pointer", "subscription": "pointer", "event": {"Data": "test"}, "message": "pointer method"}'
checklog '{"topic": "non-pointer", "subscription": "non-pointer", "event": {"Data": "test"}, "message": "non-pointer method"}'


-- svc/svc.go --
package svc

import (
"context"
"encore.dev/rlog"
"encore.dev/pubsub"
)

//encore:service
type Service struct{}

func (s *Service) PointerMethod(ctx context.Context, event *Event) error {
rlog.Info("pointer method", "event", event)
return nil
}

func (s Service) NonPointerMethod(ctx context.Context, event *Event) error {
rlog.Info("non-pointer method", "event", event)
return nil
}

type Event struct {
Data string
}

var Pointer = pubsub.NewTopic[*Event]("pointer", pubsub.TopicConfig{
DeliveryGuarantee: pubsub.AtLeastOnce,
})

var NonPointer = pubsub.NewTopic[*Event]("non-pointer", pubsub.TopicConfig{
DeliveryGuarantee: pubsub.AtLeastOnce,
})

var _ = pubsub.NewSubscription(Pointer, "pointer",
pubsub.SubscriptionConfig[*Event]{
Handler: pubsub.MethodHandler((*Service).PointerMethod),
},
)

var _ = pubsub.NewSubscription(NonPointer, "non-pointer",
pubsub.SubscriptionConfig[*Event]{
Handler: pubsub.MethodHandler(Service.NonPointerMethod),
},
)
2 changes: 1 addition & 1 deletion e2e-tests/testscript_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func doRun(t *testing.T, experiments []string) {
if topic.Depth == 0 {
break
}
ts.Logf("waiting for queue to be processed, depth: %d", topic.Depth)
ts.Logf("waiting for %q queue to be processed, depth: %d", topic.TopicName, topic.Depth)
time.Sleep(100 * time.Millisecond)
}
}
Expand Down
36 changes: 34 additions & 2 deletions runtime/appruntime/apisdk/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"fmt"
"sync"

"github.com/rs/zerolog"
Expand All @@ -14,7 +15,15 @@ import (

// Initializer is a service initializer.
type Initializer interface {
// ServiceName reports the name of the service.
ServiceName() string

// InitService initializes the service.
InitService() error

// GetDecl returns the service declaration,
// initializing it if necessary.
GetDecl() (any, error)
}

type Decl[T any] struct {
Expand All @@ -39,10 +48,22 @@ func (g *Decl[T]) Get() (*T, error) {
return g.instance, err
}

// GetDecl returns the API Decl, initializing it if necessary.
func (g *Decl[T]) GetDecl() (any, error) {
if err := g.InitService(); err != nil {
return nil, err
}
return g.instance, nil
}

func (g *Decl[T]) InitService() error {
return g.setupOnce.Do(func() error { return doSetupService(Singleton, g) })
}

func (g *Decl[T]) ServiceName() string {
return g.Service
}

func doSetupService[T any](mgr *Manager, decl *Decl[T]) (err error) {
curr := mgr.rt.Current()
if curr.Trace != nil && curr.Req != nil && decl.SetupDefLoc != 0 {
Expand All @@ -67,7 +88,7 @@ func doSetupService[T any](mgr *Manager, decl *Decl[T]) (err error) {
i, err := setupFn()
if err != nil {
mgr.rt.Logger().Error().Err(err).Str("service", decl.Service).Msg("service initialization failed")
return errs.B().Code(errs.Internal).Msg("service initialization failed").Err()
return errs.B().Code(errs.Internal).Msgf("service %s: initialization failed", decl.Service).Err()
}
decl.instance = i

Expand All @@ -85,19 +106,25 @@ type shutdowner interface {
}

func NewManager(rt *reqtrack.RequestTracker, rootLogger zerolog.Logger) *Manager {
return &Manager{rt: rt, rootLogger: rootLogger}
return &Manager{rt: rt, rootLogger: rootLogger, svcMap: make(map[string]Initializer)}
}

type Manager struct {
rt *reqtrack.RequestTracker
rootLogger zerolog.Logger
svcInit []Initializer
svcMap map[string]Initializer

shutdownMu sync.Mutex
shutdownHandlers []shutdowner
}

func (mgr *Manager) RegisterService(i Initializer) {
name := i.ServiceName()
if _, ok := mgr.svcMap[name]; ok {
panic(fmt.Sprintf("service %s: already registered", name))
}
mgr.svcMap[name] = i
mgr.svcInit = append(mgr.svcInit, i)
}

Expand All @@ -122,6 +149,11 @@ func (mgr *Manager) InitializeServices() error {
return nil
}

func (mgr *Manager) GetService(name string) (i Initializer, ok bool) {
i, ok = mgr.svcMap[name]
return i, ok
}

func (mgr *Manager) Shutdown(force context.Context) {
mgr.shutdownMu.Lock()
handlers := mgr.shutdownHandlers
Expand Down
26 changes: 26 additions & 0 deletions runtime/appruntime/apisdk/service/singleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package service

import (
"fmt"

"encore.dev/appruntime/shared/logging"
"encore.dev/appruntime/shared/reqtrack"
)
Expand All @@ -12,3 +14,27 @@ var Singleton = NewManager(reqtrack.Singleton, logging.RootLogger)
func Register(i Initializer) {
Singleton.RegisterService(i)
}

// Get returns the service initializer with the given name.
// The declaration is cast to the given type T.
func Get[T any](name string) (T, error) {
svc, ok := Singleton.GetService(name)
if !ok {
var zero T
return zero, fmt.Errorf("service.Get(%q): unknown service %s", name)
}

decl, err := svc.GetDecl()
if err != nil {
var zero T
return zero, err
}

s, ok := decl.(T)
if !ok {
var zero T
return zero, fmt.Errorf("service.Get(%q): service is of type %T, not %T", name, decl, zero)
}

return s, nil
}
16 changes: 11 additions & 5 deletions runtime/pubsub/internal/nsq/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,19 @@ func (l *topic) Subscribe(logger *zerolog.Logger, ackDeadline time.Duration, ret
return nil
}))

// connect the consumer to the NSQD
err = consumer.ConnectToNSQD(l.addr)
if err != nil {
panic(fmt.Sprintf("failed to connect %s to nsqd for topic %s: %v", implCfg.EncoreName, l.name, err))
}
// add the consumer to the known consumers
l.consumers[implCfg.EncoreName] = consumer

go func() {
// Allow the rest of the service to initialize before we connect to NSQD.
// This is necessary because NSQD is so fast the receiver can process messages
// before all package-level initialization functions have been called.
time.Sleep(100 * time.Millisecond)
err = consumer.ConnectToNSQD(l.addr)
if err != nil {
panic(fmt.Sprintf("failed to connect %s to nsqd for topic %s: %v", implCfg.EncoreName, l.name, err))
}
}()
}

// PublishMessage publishes a message to an nsq Topic
Expand Down
25 changes: 24 additions & 1 deletion runtime/pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"fmt"
"time"

jsoniter "github.com/json-iterator/go"
Expand Down Expand Up @@ -49,7 +50,7 @@ type Subscription[T any] struct {
//
// var Subscription = pubsub.NewSubscription(MyTopic, "my-subscription", pubsub.SubscriptionConfig[*MyEvent]{
// Handler: HandleEvent,
// RetryPolicy: &pubsub.RetryPolicy { MaxRetries: 10 },
// RetryPolicy: &pubsub.RetryPolicy{MaxRetries: 10},
// })
//
// func HandleEvent(ctx context.Context, event *MyEvent) error {
Expand Down Expand Up @@ -278,3 +279,25 @@ func marshalParams[Resp any](json jsoniter.API, resp Resp) []byte {
data, _ := json.Marshal(resp)
return data
}

// MethodHandler is used to define a subscription Handler that references a service struct method.
//
// Example Usage:
//
// //encore:service
// type Service struct {}
//
// func (s *Service) Method(ctx context.Context, msg *Event) error { /* ... */ }
//
// var _ = pubsub.NewSubscription(Topic, "subscription-name", pubsub.SubscriptionConfig[*Event]{
// Handler: pubsub.MethodHandler((*MyService).MyMethod),
// // ...
// })
func MethodHandler[T, SvcStruct any](handler func(s SvcStruct, ctx context.Context, msg T) error) func(ctx context.Context, msg T) error {
// The use of MethodHandler acts as a sentinel for the code generator,
// which replaces the call with some generated code to initialize the service struct.
// As such this function should never be called in practice.
return func(ctx context.Context, msg T) error {
return fmt.Errorf("pubsub.MethodHandler is not usable in this context")
}
}
15 changes: 11 additions & 4 deletions runtime/pubsub/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,27 @@ const extCorrelationIDAttribute = "encore_ext_correlation_id"
// the target cloud. (i.e. ack deadline may be brought within the supported range
// by the target cloud pubsub implementation).
type SubscriptionConfig[T any] struct {
// The function which will be called to process a message
// Handler is the function which will be called to process a message
// sent on the topic.
//
// It is important for this function to block and not return
// To reference a method on an [Encore service struct]
// you can use the [MethodHandler] function. For example:
//
// Handler: pubsub.MethodHandler((*MyService).MyMethod)
//
// It is important for the Handler function to block and not return
// until all processing relating to the message has been completed.
//
// When this function returns a `nil`, the message will be
// When the handler returns a nil error the message will be
// acknowledged (acked) from the topic, and should not be redelivered.
//
// When this function returns an `error`, the message will be
// When this function returns a non-nil error the message will be
// negatively acknowledged (nacked), which will cause a redelivery
// attempt to be made (unless the retry policy's MaxRetries has been reached).
//
// This field is required.
//
// [Encore service struct]: https://encore.dev/docs/primitives/services-and-apis#service-structs
Handler func(ctx context.Context, msg T) error

// Filter is a boolean expression using =, !=, IN, &&
Expand Down
22 changes: 22 additions & 0 deletions v2/app/validate_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (

"encr.dev/pkg/errors"
"encr.dev/pkg/option"
"encr.dev/pkg/paths"
"encr.dev/v2/internals/parsectx"
"encr.dev/v2/internals/pkginfo"
"encr.dev/v2/parser"
"encr.dev/v2/parser/apis/api"
"encr.dev/v2/parser/apis/servicestruct"
"encr.dev/v2/parser/infra/pubsub"
)

Expand All @@ -19,6 +21,7 @@ func (d *Desc) validatePubSub(pc *parsectx.Context, result *parser.Result) {
}
topics := make(map[string]topic)
topicsByBinding := make(map[pkginfo.QualifiedName]string)
serviceStructs := make(map[paths.Pkg]*servicestruct.ServiceStruct)

var subs []*pubsub.Subscription

Expand Down Expand Up @@ -55,6 +58,9 @@ func (d *Desc) validatePubSub(pc *parsectx.Context, result *parser.Result) {

case *pubsub.Subscription:
subs = append(subs, res)

case *servicestruct.ServiceStruct:
serviceStructs[res.Decl.File.Pkg.ImportPath] = res
}
}

Expand Down Expand Up @@ -103,6 +109,22 @@ func (d *Desc) validatePubSub(pc *parsectx.Context, result *parser.Result) {
}
}

// Do we have a method handler?
if method, ok := sub.MethodHandler.Get(); ok {
// Make sure the type is a service struct
if _, ok := serviceStructs[method.Decl.File.Pkg.ImportPath]; !ok {
pc.Errs.Add(
pubsub.ErrMethodHandlerTypeNotServiceStruct.
AtGoNode(sub.Handler, errors.AsError("handler specified here")),
)
} else if method.Decl.File.Pkg.ImportPath != sub.File.Pkg.ImportPath {
pc.Errs.Add(
pubsub.ErrMethodHandlerDifferentPackage.
AtGoNode(sub.Handler, errors.AsError("handler specified here")),
)
}
}

if !handlerIsAPIEndpoint {
qn, ok := sub.File.Names().ResolvePkgLevelRef(sub.Handler)
if ok {
Expand Down
Loading

0 comments on commit 89cee3e

Please sign in to comment.