Skip to content

Commit

Permalink
feat: introduce runner.Embed() to embed AnyCable into http servers
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Feb 16, 2024
1 parent c384a9a commit 0a9fcac
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .mdlrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rules "~MD013", "~MD033", "~MD034", "~MD036"
rules "~MD013", "~MD033", "~MD034", "~MD036", "~MD010"
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## master

- Allowing embedding AnyCable into existing web applications. ([@palkan][])

You can now set up an AnyCable instance without an HTTP server and mount AnyCable WebSocket/SSE handlers wherever you like.

- Log format has changed. ([@palkan][])

We've migrated to Go `slog` package and its default text handler format.
Expand Down
22 changes: 20 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
OUTPUT ?= dist/anycable-go
GOBENCHDIST ?= dist/gobench
EMBEDDEDDIST ?= dist/embedded-cable

export GO111MODULE=on

Expand Down Expand Up @@ -66,6 +67,9 @@ build:
build-gobench:
go build -tags "mrb gops" -ldflags $(LD_FLAGS) -o $(GOBENCHDIST) cmd/gobench-cable/main.go

build-embedded:
go build -tags "mrb gops" -ldflags $(LD_FLAGS) -o $(EMBEDDEDDIST) cmd/embedded-cable/main.go

download-mruby:
go mod download github.com/mitchellh/go-mruby

Expand Down Expand Up @@ -114,6 +118,9 @@ run:
run-gobench:
go run -ldflags $(LD_FLAGS) -tags "mrb gops" ./cmd/gobench-cable/main.go

run-embedded:
go run -ldflags $(LD_FLAGS) -tags "mrb gops" ./cmd/embedded-cable/main.go

build-protos:
protoc --proto_path=./etc --go_out=plugins=grpc:./protos --grpchan_out=./protos ./etc/rpc.proto

Expand All @@ -129,6 +136,9 @@ benchmarks: build
tmp/anycable-go-test:
go build $(TEST_BUILD_FLAGS) -tags mrb -race -o tmp/anycable-go-test cmd/anycable-go/main.go

tmp/anycable-embedded-test:
go build $(TEST_BUILD_FLAGS) -tags mrb -race -o tmp/anycable-embedded-test cmd/embedded-cable/main.go

test-conformance: tmp/anycable-go-test
bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable"

Expand Down Expand Up @@ -160,7 +170,15 @@ test-conformance-broker-redis: tmp/anycable-go-test
test-conformance-broker-nats: tmp/anycable-go-test
ANYCABLE_BROKER=nats ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4343 ANYCABLE_PUBSUB=nats ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb

test-conformance-all: test-conformance test-conformance-ssl test-conformance-http
test-conformance-embedded: tmp/anycable-embedded-test
\
ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret \
ANYCABLE_HTTP_RPC_SECRET=rpc_secret ANYCABLE_HTTP_RPC_MOUNT_PATH=/_anycable \
ANYCABLE_RPC_HOST=http://localhost:9292/_anycable \
ANYCABLE_HEADERS=cookie,x-api-token \
bundle exec anyt -c "tmp/anycable-embedded-test" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broadcast_tests/*.rb

test-conformance-all: test-conformance test-conformance-ssl test-conformance-http test-conformance-embedded

TESTFILE ?= features/*.testfile
test-features: build
Expand Down Expand Up @@ -194,4 +212,4 @@ licenses: bin/go-licenses
@env GOFLAGS="-tags=mrb" $$(go env GOPATH)/bin/go-licenses csv github.com/anycable/anycable-go/cli 2>/dev/null | awk -F',' '{ print $$3 }' | sort | uniq | grep -v "Unknown"
@env GOFLAGS="-tags=mrb" $$(go env GOPATH)/bin/go-licenses csv github.com/anycable/anycable-go/cli 2>/dev/null | grep "Unknown" | grep -v "anycable-go" || echo "No unknown licenses 👌"

.PHONY: tmp/anycable-go-test vendor
.PHONY: tmp/anycable-go-test tmp/anycable-embedded-test vendor
151 changes: 82 additions & 69 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,19 @@ func (r *Runner) checkAndSetDefaults() error {
}
}

_, err := logger.InitLogger(r.config.LogFormat, r.config.LogLevel)
if err != nil {
return errorx.Decorate(err, "!!! Failed to initialize logger !!!")
}
if r.log == nil {
_, err := logger.InitLogger(r.config.LogFormat, r.config.LogLevel)
if err != nil {
return errorx.Decorate(err, "failed to initialize default logger")
}

r.log = slog.With("context", "main").With("nodeid", r.config.ID)
r.log = slog.With("context", "main").With("nodeid", r.config.ID)
}

err = r.config.LoadPresets(r.log)
err := r.config.LoadPresets(r.log)

if err != nil {
return errorx.Decorate(err, "!!! Failed to load configuration presets !!!")
return errorx.Decorate(err, "failed to load configuration presets")
}

server.SSL = &r.config.SSL
Expand Down Expand Up @@ -143,7 +145,7 @@ func (r *Runner) checkAndSetDefaults() error {
metrics, err := r.initMetrics(&r.config.Metrics)

if err != nil {
return errorx.Decorate(err, "!!! Failed to initialize metrics writer !!!")
return errorx.Decorate(err, "failed to initialize metrics writer")
}

r.metrics = metrics
Expand All @@ -160,13 +162,72 @@ func (r *Runner) Run() error {

r.log.Info(fmt.Sprintf("Starting %s %s%s (pid: %d, open file limit: %s, gomaxprocs: %d)", r.name, version.Version(), mrubySupport, os.Getpid(), utils.OpenFileLimit(), numProcs))

appNode, err := r.runNode()

if err != nil {
return err
}

wsServer, err := server.ForPort(strconv.Itoa(r.config.Port))
if err != nil {
return errorx.Decorate(err, "failed to initialize WebSocket server at %s:%d", r.config.Host, r.config.Port)
}

wsHandler, err := r.websocketHandlerFactory(appNode, r.config, r.log)
if err != nil {
return errorx.Decorate(err, "failed to initialize WebSocket handler")
}

for _, path := range r.config.Path {
wsServer.SetupHandler(path, wsHandler)
r.log.Info(fmt.Sprintf("Handle WebSocket connections at %s%s", wsServer.Address(), path))
}

for path, handlerFactory := range r.websocketEndpoints {
handler, err := handlerFactory(appNode, r.config, r.log)
if err != nil {
return errorx.Decorate(err, "failed to initialize WebSocket handler for %s", path)
}
wsServer.SetupHandler(path, handler)
}

wsServer.SetupHandler(r.config.HealthPath, http.HandlerFunc(server.HealthHandler))
r.log.Info(fmt.Sprintf("Handle health requests at %s%s", wsServer.Address(), r.config.HealthPath))

if r.config.SSE.Enabled {
r.log.Info(
fmt.Sprintf("Handle SSE requests at %s%s",
wsServer.Address(), r.config.SSE.Path),
)

sseHandler, err := r.defaultSSEHandler(appNode, wsServer.ShutdownCtx(), r.config)

if err != nil {
return errorx.Decorate(err, "failed to initialize SSE handler")
}

wsServer.SetupHandler(r.config.SSE.Path, sseHandler)
}

go r.startWSServer(wsServer)
go r.metrics.Run() // nolint:errcheck

// We MUST first stop the server (=stop accepting new connections), then gracefully disconnect active clients
r.shutdownables = append([]Shutdownable{wsServer}, r.shutdownables...)
r.setupSignalHandlers()

// Wait for an error (or none)
return <-r.errChan
}

func (r *Runner) runNode() (*node.Node, error) {
metrics := r.metrics

r.shutdownables = append(r.shutdownables, metrics)

controller, err := r.newController(metrics)
if err != nil {
return err
return nil, err
}

appNode := node.NewNode(
Expand All @@ -190,12 +251,12 @@ func (r *Runner) Run() error {
subscriber, err := r.subscriberFactory(appNode, r.config, r.log)

if err != nil {
return errorx.Decorate(err, "couldn't configure pub/sub")
return nil, errorx.Decorate(err, "couldn't configure pub/sub")
}

appBroker, err := r.brokerFactory(subscriber, r.config, r.log)
if err != nil {
return errorx.Decorate(err, "!!! Failed to initialize broker !!!")
return nil, errorx.Decorate(err, "failed to initialize broker")
}

if appBroker != nil {
Expand All @@ -205,7 +266,7 @@ func (r *Runner) Run() error {

disconnector, err := r.disconnectorFactory(appNode, r.config, r.log)
if err != nil {
return errorx.Decorate(err, "!!! Failed to initialize disconnector !!!")
return nil, errorx.Decorate(err, "failed to initialize disconnector")
}

go disconnector.Run() // nolint:errcheck
Expand All @@ -215,7 +276,7 @@ func (r *Runner) Run() error {
service, enatsErr := r.embedNATS(&r.config.EmbeddedNats)

if enatsErr != nil {
return errorx.Decorate(enatsErr, "failed to start embedded NATS server")
return nil, errorx.Decorate(enatsErr, "failed to start embedded NATS server")
}

desc := service.Description()
Expand All @@ -232,18 +293,18 @@ func (r *Runner) Run() error {
err = appNode.Start()

if err != nil {
return errorx.Decorate(err, "!!! Failed to initialize application !!!")
return nil, errorx.Decorate(err, "failed to initialize application")
}

err = subscriber.Start(r.errChan)
if err != nil {
return errorx.Decorate(err, "!!! Subscriber failed !!!")
return nil, errorx.Decorate(err, "failed to start subscriber")
}

if appBroker != nil {
err = appBroker.Start(r.errChan)
if err != nil {
return errorx.Decorate(err, "!!! Broker failed !!!")
return nil, errorx.Decorate(err, "failed to start broker")
}
}

Expand All @@ -253,7 +314,7 @@ func (r *Runner) Run() error {
broadcasters, berr := r.broadcastersFactory(appNode, r.config, r.log)

if berr != nil {
return errorx.Decorate(err, "couldn't configure broadcasters")
return nil, errorx.Decorate(err, "couldn't configure broadcasters")
}

for _, broadcaster := range broadcasters {
Expand All @@ -267,7 +328,7 @@ func (r *Runner) Run() error {

err = broadcaster.Start(r.errChan)
if err != nil {
return errorx.Decorate(err, "!!! Broadcaster failed !!!")
return nil, errorx.Decorate(err, "failed to start broadcaster")
}

r.shutdownables = append(r.shutdownables, broadcaster)
Expand All @@ -276,61 +337,13 @@ func (r *Runner) Run() error {

err = controller.Start()
if err != nil {
return errorx.Decorate(err, "!!! RPC failed !!!")
}

wsServer, err := server.ForPort(strconv.Itoa(r.config.Port))
if err != nil {
return errorx.Decorate(err, "!!! Failed to initialize WebSocket server at %s:%d !!!", r.config.Host, r.config.Port)
}

wsHandler, err := r.websocketHandlerFactory(appNode, r.config, r.log)
if err != nil {
return errorx.Decorate(err, "!!! Failed to initialize WebSocket handler !!!")
}

for _, path := range r.config.Path {
wsServer.SetupHandler(path, wsHandler)
r.log.Info(fmt.Sprintf("Handle WebSocket connections at %s%s", wsServer.Address(), path))
return nil, errorx.Decorate(err, "failed to initialize RPC controller")
}

for path, handlerFactory := range r.websocketEndpoints {
handler, err := handlerFactory(appNode, r.config, r.log)
if err != nil {
return errorx.Decorate(err, "!!! Failed to initialize WebSocket handler for %s !!!", path)
}
wsServer.SetupHandler(path, handler)
}

wsServer.SetupHandler(r.config.HealthPath, http.HandlerFunc(server.HealthHandler))
r.log.Info(fmt.Sprintf("Handle health requests at %s%s", wsServer.Address(), r.config.HealthPath))

if r.config.SSE.Enabled {
r.log.Info(
fmt.Sprintf("Handle SSE requests at %s%s",
wsServer.Address(), r.config.SSE.Path),
)

sseHandler, err := r.defaultSSEHandler(appNode, wsServer.ShutdownCtx(), r.config)

if err != nil {
return errorx.Decorate(err, "!!! Failed to initialize SSE handler !!!")
}

wsServer.SetupHandler(r.config.SSE.Path, sseHandler)
}

go r.startWSServer(wsServer)
go r.startMetrics(metrics)

// We MUST first stop the server (=stop accepting new connections), then gracefully disconnect active clients
r.shutdownables = append([]Shutdownable{wsServer, appNode, appBroker}, r.shutdownables...)
r.shutdownables = append([]Shutdownable{appNode, appBroker}, r.shutdownables...)

r.announceGoPools()
r.setupSignalHandlers()

// Wait for an error (or none)
return <-r.errChan
return appNode, nil
}

func (r *Runner) setMaxProcs() int {
Expand Down
72 changes: 72 additions & 0 deletions cli/embed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package cli

import (
"context"
"fmt"
"net/http"

"github.com/anycable/anycable-go/node"
"github.com/anycable/anycable-go/utils"
"github.com/anycable/anycable-go/version"
)

// A minimal interface to the underlying Runner for embedding AnyCable into your own Go HTTP application.
type Embedded struct {
n *node.Node
r *Runner
}

// WebSocketHandler returns an HTTP handler to serve WebSocket connections via AnyCable.
func (e *Embedded) WebSocketHandler() (http.Handler, error) {
wsHandler, err := e.r.websocketHandlerFactory(e.n, e.r.config, e.r.log)

if err != nil {
return nil, err
}

return wsHandler, nil
}

// SSEHandler returns an HTTP handler to serve SSE connections via AnyCable.
// Please, provide your HTTP server's shutdown context to terminate SSE connections gracefully
// on server shutdown.
func (e *Embedded) SSEHandler(ctx context.Context) (http.Handler, error) {
sseHandler, err := e.r.defaultSSEHandler(e.n, ctx, e.r.config)

if err != nil {
return nil, err
}

return sseHandler, nil
}

// Shutdown stops the AnyCable node gracefully.
func (e *Embedded) Shutdown(ctx context.Context) error {
for _, shutdownable := range e.r.shutdownables {
if err := shutdownable.Shutdown(ctx); err != nil {
return err
}
}

return nil
}

// Embed starts the application without setting up HTTP handlers, signalts, etc.
// You can use it to embed AnyCable into your own Go HTTP application.
func (r *Runner) Embed() (*Embedded, error) {
r.announceDebugMode()
mrubySupport := r.initMRuby()

r.log.Info(fmt.Sprintf("Starting embedded %s %s%s (open file limit: %s)", r.name, version.Version(), mrubySupport, utils.OpenFileLimit()))

appNode, err := r.runNode()
if err != nil {
return nil, err
}

embed := &Embedded{n: appNode, r: r}

go r.startMetrics(r.metrics)

return embed, nil
}
Loading

0 comments on commit 0a9fcac

Please sign in to comment.