diff --git a/.mdlrc b/.mdlrc index 08f5ead9..753e7623 100644 --- a/.mdlrc +++ b/.mdlrc @@ -1 +1 @@ -rules "~MD013", "~MD033", "~MD034", "~MD036" +rules "~MD013", "~MD033", "~MD034", "~MD036", "~MD010" diff --git a/CHANGELOG.md b/CHANGELOG.md index 91c60317..9f0718fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## master +- Allowing embedding AnyCable into existing web applications. ([@palkan][]) + +You can now set up an AnyCable instance without an HTTP server and mount AnyCable WebSocket/SSE handlers wherever you like. + - Log format has changed. ([@palkan][]) We've migrated to Go `slog` package and its default text handler format. diff --git a/Makefile b/Makefile index 8507c2d1..bd6f02fd 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ OUTPUT ?= dist/anycable-go GOBENCHDIST ?= dist/gobench +EMBEDDEDDIST ?= dist/embedded-cable export GO111MODULE=on @@ -66,6 +67,9 @@ build: build-gobench: go build -tags "mrb gops" -ldflags $(LD_FLAGS) -o $(GOBENCHDIST) cmd/gobench-cable/main.go +build-embedded: + go build -tags "mrb gops" -ldflags $(LD_FLAGS) -o $(EMBEDDEDDIST) cmd/embedded-cable/main.go + download-mruby: go mod download github.com/mitchellh/go-mruby @@ -114,6 +118,9 @@ run: run-gobench: go run -ldflags $(LD_FLAGS) -tags "mrb gops" ./cmd/gobench-cable/main.go +run-embedded: + go run -ldflags $(LD_FLAGS) -tags "mrb gops" ./cmd/embedded-cable/main.go + build-protos: protoc --proto_path=./etc --go_out=plugins=grpc:./protos --grpchan_out=./protos ./etc/rpc.proto @@ -129,6 +136,9 @@ benchmarks: build tmp/anycable-go-test: go build $(TEST_BUILD_FLAGS) -tags mrb -race -o tmp/anycable-go-test cmd/anycable-go/main.go +tmp/anycable-embedded-test: + go build $(TEST_BUILD_FLAGS) -tags mrb -race -o tmp/anycable-embedded-test cmd/embedded-cable/main.go + test-conformance: tmp/anycable-go-test bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" @@ -160,7 +170,15 @@ test-conformance-broker-redis: tmp/anycable-go-test test-conformance-broker-nats: tmp/anycable-go-test ANYCABLE_BROKER=nats ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4343 ANYCABLE_PUBSUB=nats ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb -test-conformance-all: test-conformance test-conformance-ssl test-conformance-http +test-conformance-embedded: tmp/anycable-embedded-test + \ + ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret \ + ANYCABLE_HTTP_RPC_SECRET=rpc_secret ANYCABLE_HTTP_RPC_MOUNT_PATH=/_anycable \ + ANYCABLE_RPC_HOST=http://localhost:9292/_anycable \ + ANYCABLE_HEADERS=cookie,x-api-token \ + bundle exec anyt -c "tmp/anycable-embedded-test" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broadcast_tests/*.rb + +test-conformance-all: test-conformance test-conformance-ssl test-conformance-http test-conformance-embedded TESTFILE ?= features/*.testfile test-features: build @@ -194,4 +212,4 @@ licenses: bin/go-licenses @env GOFLAGS="-tags=mrb" $$(go env GOPATH)/bin/go-licenses csv github.com/anycable/anycable-go/cli 2>/dev/null | awk -F',' '{ print $$3 }' | sort | uniq | grep -v "Unknown" @env GOFLAGS="-tags=mrb" $$(go env GOPATH)/bin/go-licenses csv github.com/anycable/anycable-go/cli 2>/dev/null | grep "Unknown" | grep -v "anycable-go" || echo "No unknown licenses 👌" -.PHONY: tmp/anycable-go-test vendor +.PHONY: tmp/anycable-go-test tmp/anycable-embedded-test vendor diff --git a/cli/cli.go b/cli/cli.go index 47919992..145f4f77 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -98,17 +98,19 @@ func (r *Runner) checkAndSetDefaults() error { } } - _, err := logger.InitLogger(r.config.LogFormat, r.config.LogLevel) - if err != nil { - return errorx.Decorate(err, "!!! Failed to initialize logger !!!") - } + if r.log == nil { + _, err := logger.InitLogger(r.config.LogFormat, r.config.LogLevel) + if err != nil { + return errorx.Decorate(err, "failed to initialize default logger") + } - r.log = slog.With("context", "main").With("nodeid", r.config.ID) + r.log = slog.With("context", "main").With("nodeid", r.config.ID) + } - err = r.config.LoadPresets(r.log) + err := r.config.LoadPresets(r.log) if err != nil { - return errorx.Decorate(err, "!!! Failed to load configuration presets !!!") + return errorx.Decorate(err, "failed to load configuration presets") } server.SSL = &r.config.SSL @@ -143,7 +145,7 @@ func (r *Runner) checkAndSetDefaults() error { metrics, err := r.initMetrics(&r.config.Metrics) if err != nil { - return errorx.Decorate(err, "!!! Failed to initialize metrics writer !!!") + return errorx.Decorate(err, "failed to initialize metrics writer") } r.metrics = metrics @@ -160,13 +162,72 @@ func (r *Runner) Run() error { r.log.Info(fmt.Sprintf("Starting %s %s%s (pid: %d, open file limit: %s, gomaxprocs: %d)", r.name, version.Version(), mrubySupport, os.Getpid(), utils.OpenFileLimit(), numProcs)) + appNode, err := r.runNode() + + if err != nil { + return err + } + + wsServer, err := server.ForPort(strconv.Itoa(r.config.Port)) + if err != nil { + return errorx.Decorate(err, "failed to initialize WebSocket server at %s:%d", r.config.Host, r.config.Port) + } + + wsHandler, err := r.websocketHandlerFactory(appNode, r.config, r.log) + if err != nil { + return errorx.Decorate(err, "failed to initialize WebSocket handler") + } + + for _, path := range r.config.Path { + wsServer.SetupHandler(path, wsHandler) + r.log.Info(fmt.Sprintf("Handle WebSocket connections at %s%s", wsServer.Address(), path)) + } + + for path, handlerFactory := range r.websocketEndpoints { + handler, err := handlerFactory(appNode, r.config, r.log) + if err != nil { + return errorx.Decorate(err, "failed to initialize WebSocket handler for %s", path) + } + wsServer.SetupHandler(path, handler) + } + + wsServer.SetupHandler(r.config.HealthPath, http.HandlerFunc(server.HealthHandler)) + r.log.Info(fmt.Sprintf("Handle health requests at %s%s", wsServer.Address(), r.config.HealthPath)) + + if r.config.SSE.Enabled { + r.log.Info( + fmt.Sprintf("Handle SSE requests at %s%s", + wsServer.Address(), r.config.SSE.Path), + ) + + sseHandler, err := r.defaultSSEHandler(appNode, wsServer.ShutdownCtx(), r.config) + + if err != nil { + return errorx.Decorate(err, "failed to initialize SSE handler") + } + + wsServer.SetupHandler(r.config.SSE.Path, sseHandler) + } + + go r.startWSServer(wsServer) + go r.metrics.Run() // nolint:errcheck + + // We MUST first stop the server (=stop accepting new connections), then gracefully disconnect active clients + r.shutdownables = append([]Shutdownable{wsServer}, r.shutdownables...) + r.setupSignalHandlers() + + // Wait for an error (or none) + return <-r.errChan +} + +func (r *Runner) runNode() (*node.Node, error) { metrics := r.metrics r.shutdownables = append(r.shutdownables, metrics) controller, err := r.newController(metrics) if err != nil { - return err + return nil, err } appNode := node.NewNode( @@ -190,12 +251,12 @@ func (r *Runner) Run() error { subscriber, err := r.subscriberFactory(appNode, r.config, r.log) if err != nil { - return errorx.Decorate(err, "couldn't configure pub/sub") + return nil, errorx.Decorate(err, "couldn't configure pub/sub") } appBroker, err := r.brokerFactory(subscriber, r.config, r.log) if err != nil { - return errorx.Decorate(err, "!!! Failed to initialize broker !!!") + return nil, errorx.Decorate(err, "failed to initialize broker") } if appBroker != nil { @@ -205,7 +266,7 @@ func (r *Runner) Run() error { disconnector, err := r.disconnectorFactory(appNode, r.config, r.log) if err != nil { - return errorx.Decorate(err, "!!! Failed to initialize disconnector !!!") + return nil, errorx.Decorate(err, "failed to initialize disconnector") } go disconnector.Run() // nolint:errcheck @@ -215,7 +276,7 @@ func (r *Runner) Run() error { service, enatsErr := r.embedNATS(&r.config.EmbeddedNats) if enatsErr != nil { - return errorx.Decorate(enatsErr, "failed to start embedded NATS server") + return nil, errorx.Decorate(enatsErr, "failed to start embedded NATS server") } desc := service.Description() @@ -232,18 +293,18 @@ func (r *Runner) Run() error { err = appNode.Start() if err != nil { - return errorx.Decorate(err, "!!! Failed to initialize application !!!") + return nil, errorx.Decorate(err, "failed to initialize application") } err = subscriber.Start(r.errChan) if err != nil { - return errorx.Decorate(err, "!!! Subscriber failed !!!") + return nil, errorx.Decorate(err, "failed to start subscriber") } if appBroker != nil { err = appBroker.Start(r.errChan) if err != nil { - return errorx.Decorate(err, "!!! Broker failed !!!") + return nil, errorx.Decorate(err, "failed to start broker") } } @@ -253,7 +314,7 @@ func (r *Runner) Run() error { broadcasters, berr := r.broadcastersFactory(appNode, r.config, r.log) if berr != nil { - return errorx.Decorate(err, "couldn't configure broadcasters") + return nil, errorx.Decorate(err, "couldn't configure broadcasters") } for _, broadcaster := range broadcasters { @@ -267,7 +328,7 @@ func (r *Runner) Run() error { err = broadcaster.Start(r.errChan) if err != nil { - return errorx.Decorate(err, "!!! Broadcaster failed !!!") + return nil, errorx.Decorate(err, "failed to start broadcaster") } r.shutdownables = append(r.shutdownables, broadcaster) @@ -276,61 +337,13 @@ func (r *Runner) Run() error { err = controller.Start() if err != nil { - return errorx.Decorate(err, "!!! RPC failed !!!") - } - - wsServer, err := server.ForPort(strconv.Itoa(r.config.Port)) - if err != nil { - return errorx.Decorate(err, "!!! Failed to initialize WebSocket server at %s:%d !!!", r.config.Host, r.config.Port) - } - - wsHandler, err := r.websocketHandlerFactory(appNode, r.config, r.log) - if err != nil { - return errorx.Decorate(err, "!!! Failed to initialize WebSocket handler !!!") - } - - for _, path := range r.config.Path { - wsServer.SetupHandler(path, wsHandler) - r.log.Info(fmt.Sprintf("Handle WebSocket connections at %s%s", wsServer.Address(), path)) + return nil, errorx.Decorate(err, "failed to initialize RPC controller") } - for path, handlerFactory := range r.websocketEndpoints { - handler, err := handlerFactory(appNode, r.config, r.log) - if err != nil { - return errorx.Decorate(err, "!!! Failed to initialize WebSocket handler for %s !!!", path) - } - wsServer.SetupHandler(path, handler) - } - - wsServer.SetupHandler(r.config.HealthPath, http.HandlerFunc(server.HealthHandler)) - r.log.Info(fmt.Sprintf("Handle health requests at %s%s", wsServer.Address(), r.config.HealthPath)) - - if r.config.SSE.Enabled { - r.log.Info( - fmt.Sprintf("Handle SSE requests at %s%s", - wsServer.Address(), r.config.SSE.Path), - ) - - sseHandler, err := r.defaultSSEHandler(appNode, wsServer.ShutdownCtx(), r.config) - - if err != nil { - return errorx.Decorate(err, "!!! Failed to initialize SSE handler !!!") - } - - wsServer.SetupHandler(r.config.SSE.Path, sseHandler) - } - - go r.startWSServer(wsServer) - go r.startMetrics(metrics) - - // We MUST first stop the server (=stop accepting new connections), then gracefully disconnect active clients - r.shutdownables = append([]Shutdownable{wsServer, appNode, appBroker}, r.shutdownables...) + r.shutdownables = append([]Shutdownable{appNode, appBroker}, r.shutdownables...) r.announceGoPools() - r.setupSignalHandlers() - - // Wait for an error (or none) - return <-r.errChan + return appNode, nil } func (r *Runner) setMaxProcs() int { diff --git a/cli/embed.go b/cli/embed.go new file mode 100644 index 00000000..a7a83ca8 --- /dev/null +++ b/cli/embed.go @@ -0,0 +1,72 @@ +package cli + +import ( + "context" + "fmt" + "net/http" + + "github.com/anycable/anycable-go/node" + "github.com/anycable/anycable-go/utils" + "github.com/anycable/anycable-go/version" +) + +// A minimal interface to the underlying Runner for embedding AnyCable into your own Go HTTP application. +type Embedded struct { + n *node.Node + r *Runner +} + +// WebSocketHandler returns an HTTP handler to serve WebSocket connections via AnyCable. +func (e *Embedded) WebSocketHandler() (http.Handler, error) { + wsHandler, err := e.r.websocketHandlerFactory(e.n, e.r.config, e.r.log) + + if err != nil { + return nil, err + } + + return wsHandler, nil +} + +// SSEHandler returns an HTTP handler to serve SSE connections via AnyCable. +// Please, provide your HTTP server's shutdown context to terminate SSE connections gracefully +// on server shutdown. +func (e *Embedded) SSEHandler(ctx context.Context) (http.Handler, error) { + sseHandler, err := e.r.defaultSSEHandler(e.n, ctx, e.r.config) + + if err != nil { + return nil, err + } + + return sseHandler, nil +} + +// Shutdown stops the AnyCable node gracefully. +func (e *Embedded) Shutdown(ctx context.Context) error { + for _, shutdownable := range e.r.shutdownables { + if err := shutdownable.Shutdown(ctx); err != nil { + return err + } + } + + return nil +} + +// Embed starts the application without setting up HTTP handlers, signalts, etc. +// You can use it to embed AnyCable into your own Go HTTP application. +func (r *Runner) Embed() (*Embedded, error) { + r.announceDebugMode() + mrubySupport := r.initMRuby() + + r.log.Info(fmt.Sprintf("Starting embedded %s %s%s (open file limit: %s)", r.name, version.Version(), mrubySupport, utils.OpenFileLimit())) + + appNode, err := r.runNode() + if err != nil { + return nil, err + } + + embed := &Embedded{n: appNode, r: r} + + go r.startMetrics(r.metrics) + + return embed, nil +} diff --git a/cli/options.go b/cli/options.go index f71f64d3..5f6ff811 100644 --- a/cli/options.go +++ b/cli/options.go @@ -192,6 +192,17 @@ Use shutdown_timeout instead.`) return &c, nil, false } +// NewConfig returns a new AnyCable configuration combining default values and values from the environment. +func NewConfig() *config.Config { + c, err, _ := NewConfigFromCLI([]string{}) + + if err != nil { + panic(err) + } + + return c +} + // Flags ordering issue: https://github.com/urfave/cli/pull/1430 const ( diff --git a/cli/runner_options.go b/cli/runner_options.go index 28e4ca0e..90c743ef 100644 --- a/cli/runner_options.go +++ b/cli/runner_options.go @@ -176,9 +176,18 @@ func WithDefaultBroker() Option { }) } +// WithTelemetry enables AnyCable telemetry unless ANYCABLE_DISABLE_TELEMETRY is set func WithTelemetry() Option { return func(r *Runner) error { r.telemetryEnabled = os.Getenv("ANYCABLE_DISABLE_TELEMETRY") != "true" return nil } } + +// WithLogger set ups a logger for the AnyCable app +func WithLogger(logger *slog.Logger) Option { + return func(r *Runner) error { + r.log = logger + return nil + } +} diff --git a/cmd/embedded-cable/main.go b/cmd/embedded-cable/main.go new file mode 100644 index 00000000..6b8310c8 --- /dev/null +++ b/cmd/embedded-cable/main.go @@ -0,0 +1,86 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "net/http" + "os" + "time" + + "github.com/anycable/anycable-go/cli" + _ "github.com/anycable/anycable-go/diagnostics" + "github.com/anycable/anycable-go/utils" +) + +func main() { + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{})) + + opts := []cli.Option{ + cli.WithName("AnyCable"), + cli.WithDefaultRPCController(), + cli.WithDefaultBroker(), + cli.WithDefaultSubscriber(), + cli.WithDefaultBroadcaster(), + cli.WithTelemetry(), + cli.WithLogger(logger), + } + + c := cli.NewConfig() + + runner, err := cli.NewRunner(c, opts) + + if err != nil { + fmt.Printf("%+v\n", err) + os.Exit(1) + } + + anycable, err := runner.Embed() + + if err != nil { + fmt.Printf("%+v\n", err) + os.Exit(1) + } + + wsHandler, err := anycable.WebSocketHandler() + + if err != nil { + fmt.Printf("%+v\n", err) + os.Exit(1) + } + + seeHandler, err := anycable.SSEHandler(context.Background()) + + if err != nil { + fmt.Printf("%+v\n", err) + os.Exit(1) + } + + http.Handle("/cable", wsHandler) + http.Handle("/sse", seeHandler) + + go http.ListenAndServe(":8080", nil) // nolint:errcheck,gosec + + // Graceful shutdown (to ensure AnyCable sends disconnect notices) + s := utils.NewGracefulSignals(10 * time.Second) + ch := make(chan error, 1) + + s.HandleForceTerminate(func() { + logger.Warn("Immediate termination requested. Stopped") + ch <- nil + }) + + s.Handle(func(ctx context.Context) error { + logger.Info("Shutting down... (hit Ctrl-C to stop immediately or wait for up to 10s for graceful shutdown)") + return nil + }) + s.Handle(anycable.Shutdown) + s.Handle(func(ctx context.Context) error { + ch <- nil + return nil + }) + + s.Listen() + + <-ch +} diff --git a/docs/library.md b/docs/library.md index 13d9ca33..5c85466c 100644 --- a/docs/library.md +++ b/docs/library.md @@ -11,3 +11,38 @@ Why building a WebSocket application with AnyCable-Go (and not other Go librarie - Bulletproof code, which has been used production for years. To get started with an application development with AnyCable-Go, you can use our template repository: [anycable-go-scaffold](https://github.com/anycable/anycable-go-scaffold). + +## Embedding + +You can also embed AnyCable into your existing web application in case you want to serve AnyCable WebSocket/SSE connections via the same HTTP server as other requests (e.g., if you build a smart reverse-proxy). + +Here is a minimal example Go code (you can find the full and up-to-date version [here](https://github.com/anycable/anycable-go/blob/master/cmd/embedded-cable/main.go)): + +```go +package main + +import ( + "net/http" + + "github.com/anycable/anycable-go/cli" +) + +func main() { + opts := []cli.Option{ + cli.WithName("AnyCable"), + cli.WithDefaultRPCController(), + cli.WithDefaultBroker(), + cli.WithDefaultSubscriber(), + cli.WithDefaultBroadcaster(), + } + + c := cli.NewConfig() + runner, _ := cli.NewRunner(c, opts) + anycable, _ := runner.Embed() + + wsHandler, _ := anycable.WebSocketHandler() + http.Handle("/cable", wsHandler) + + http.ListenAndServe(":8080", nil) +} +``` diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index e87e6dd4..7c2a3a40 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -2,7 +2,6 @@ package telemetry import ( "context" - "fmt" "maps" "os" "runtime" @@ -62,7 +61,7 @@ func NewTracker(instrumenter *metrics.Metrics, c *config.Config, tc *Config) *Tr } func (t *Tracker) Announce() string { - return fmt.Sprintf("Anonymized telemetry is on. Learn more: https://docs.anycable.io/anycable-go/telemetry") + return "Anonymized telemetry is on. Learn more: https://docs.anycable.io/anycable-go/telemetry" } func (t *Tracker) Collect() {