Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple channel rules from library core #129

Merged
merged 23 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 66 additions & 79 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

**This library has no v1 release yet, API still evolves. Use with strict versioning.**

Centrifuge library is a real-time core of [Centrifugo](https://github.com/centrifugal/centrifugo) server. It's also supposed to be a general purpose real-time messaging library for Go programming language. The library is based on a strict client-server protocol based on Protobuf schema and solves several problems developer may come across when building complex real-time applications – like scalability (millions of connections), proper connection management, fast reconnect with message recovery, fallback option.
Centrifuge library is a real-time core of [Centrifugo](https://github.com/centrifugal/centrifugo) server. It's also supposed to be a general purpose real-time messaging library for Go programming language. The library built on top of strict client-server protocol schema and exposes various real-time oriented primitives for a developer. Centrifuge solves several problems a developer may come across when building complex real-time applications – like scalability (millions of connections), proper persistent connection management and invalidation, fast reconnect with message recovery, WebSocket fallback option.

Library highlights:

Expand All @@ -14,13 +14,14 @@ Library highlights:
* SockJS polyfill library support for browsers where WebSocket not available (JSON only)
* Built-in horizontal scalability with Redis PUB/SUB, consistent Redis sharding, Sentinel and Redis Cluster for HA
* Possibility to register custom PUB/SUB broker, history and presence storage implementations
* Native authentication over HTTP middleware or JWT-based
* Native authentication over HTTP middleware or token-based
* Bidirectional asynchronous message communication and RPC calls
* Channel (room) concept to broadcast message to all channel subscribers
* Channel concept to broadcast message to active subscribers
* Client-side and server-side subscriptions
* Presence information for channels (show all active clients in channel)
* History information for channels (last messages published into channel)
* Join/leave events for channels (aka client goes online/offline)
* Message recovery mechanism for channels to survive short network disconnects or node restart
* Message recovery mechanism for channels to survive PUB/SUB delivery problems, short network disconnects or node restart
* Prometheus instrumentation
* Client libraries for main application environments (see below)

Expand Down Expand Up @@ -55,119 +56,105 @@ Create file `main.go` with the following code:
package main

import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"

// Import this library.
"github.com/centrifugal/centrifuge"
)

// Function to handle Centrifuge internal logs.
func handleLog(e centrifuge.LogEntry) {
log.Printf("%s: %v", e.Message, e.Fields)
}

// Wait until program interrupted. When interrupted gracefully shutdown Node.
func waitExitSignal(n *centrifuge.Node) {
sigCh := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_ = n.Shutdown(ctx)
done <- true
}()
<-done
// Authentication middleware. Centrifuge expects Credentials with current user ID.
func auth(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Put authentication credentials into request Context. Since we don't
// have any session backend here we simply set user ID as empty string.
// Users with empty ID called anonymous users, in real app you should
// decide whether anonymous users allowed to connect to your server
// or not. There is also another way to set Credentials - returning them
// from ConnectingHandler which is called after client sent first command
// to server called Connect. Without provided Credentials connection won't
// be accepted.
cred := &centrifuge.Credentials{
UserID: "",
}
newCtx := centrifuge.SetCredentials(ctx, cred)
r = r.WithContext(newCtx)
h.ServeHTTP(w, r)
})
}

func main() {
// We use default config here as starting point. Default config contains
// reasonable values for available options.
cfg := centrifuge.DefaultConfig
// In this example we want client to do all possible actions with server
// without any authentication and authorization. Insecure flag DISABLES
// many security related checks in library. This is only to make example
// short. In real app you most probably want authenticate and authorize
// access to server. See godoc and examples in repo for more details.
cfg.ClientInsecure = true
// By default clients can not publish messages into channels. Setting this
// option to true we allow them to publish.
cfg.Publish = true

// Centrifuge library exposes logs with different log level. In your app
// you can set special function to handle these log entries in a way you want.
cfg.LogLevel = centrifuge.LogLevelDebug
cfg.LogHandler = handleLog

// Node is the core object in Centrifuge library responsible for many useful
// things. Here we initialize new Node instance and pass config to it.
// things. Here we initialize new Node instance and pass Config to it.
node, _ := centrifuge.New(cfg)

// ClientConnected node event handler is a point where you generally create a
// binding between Centrifuge and your app business logic. Callback function you
// pass here will be called every time new connection established with server.
// Inside this callback function you can set various event handlers for connection.
node.On().ClientConnected(func(ctx context.Context, client *centrifuge.Client) {
// Set Subscribe Handler to react on every channel subscription attempt
// initiated by client. Here you can theoretically return an error or
// disconnect client from server if needed. But now we just accept
// all subscriptions.
client.On().Subscribe(func(e centrifuge.SubscribeEvent) centrifuge.SubscribeReply {
log.Printf("client subscribes on channel %s", e.Channel)
return centrifuge.SubscribeReply{}
})

// Set Publish Handler to react on every channel Publication sent by client.
// Inside this method you can validate client permissions to publish into
// channel. But in our simple chat app we allow everyone to publish into
// any channel.
client.On().Publish(func(e centrifuge.PublishEvent) centrifuge.PublishReply {
log.Printf("client publishes into channel %s: %s", e.Channel, string(e.Data))
return centrifuge.PublishReply{}
})

// Set Disconnect Handler to react on client disconnect events.
client.On().Disconnect(func(e centrifuge.DisconnectEvent) centrifuge.DisconnectReply {
log.Printf("client disconnected")
return centrifuge.DisconnectReply{}
})

// Set ConnectHandler called when client successfully connected to Node.
node.On().Connect(func(c *centrifuge.Client) {
// In our example transport will always be Websocket but it can also be SockJS.
transportName := client.Transport().Name()
transportName := c.Transport().Name()
// In our example clients connect with JSON protocol but it can also be Protobuf.
transportEncoding := client.Transport().Encoding()

transportEncoding := c.Transport().Encoding()
log.Printf("client connected via %s (%s)", transportName, transportEncoding)
})

// Run node.
// Set SubscribeHandler to react on every channel subscription attempt
// initiated by client. Here you can theoretically return an error or
// disconnect client from server if needed. But now we just accept
// all subscriptions to all channels. In real life you may use a more
// complex permission check here.
node.On().Subscribe(func(c *centrifuge.Client, e centrifuge.SubscribeEvent) centrifuge.SubscribeReply {
log.Printf("client subscribes on channel %s", e.Channel)
return centrifuge.SubscribeReply{}
})

// By default, clients can not publish messages into channels. By setting
// PublishHandler we tell Centrifuge that publish from client side is possible.
// Now each time client calls publish method this handler will be called and
// you have a possibility to validate publication request before message will
// be published into channel and reach active subscribers. In our simple chat
// app we allow everyone to publish into any channel.
node.On().Publish(func(c *centrifuge.Client, e centrifuge.PublishEvent) centrifuge.PublishReply {
log.Printf("client publishes into channel %s: %s", e.Channel, string(e.Data))
return centrifuge.PublishReply{}
})

// Set Disconnect handler to react on client disconnect events.
node.On().Disconnect(func(c *centrifuge.Client, e centrifuge.DisconnectEvent) {
log.Printf("client disconnected")
})

// Run node. This method does not block.
if err := node.Run(); err != nil {
panic(err)
}

// Configure http routes.
// Now configure HTTP routes.

// The first route is for handling Websocket connections.
http.Handle("/connection/websocket", centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{}))
// Serve Websocket connections using WebsocketHandler.
wsHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{})
http.Handle("/connection/websocket", auth(wsHandler))

// The second route is for serving index.html file.
http.Handle("/", http.FileServer(http.Dir("./")))

// Start HTTP server.
go func() {
if err := http.ListenAndServe(":8000", nil); err != nil {
panic(err)
}
}()

// Run until interrupted.
waitExitSignal(node)
log.Printf("Starting server, visit http://localhost:8000")
if err := http.ListenAndServe(":8000", nil); err != nil {
panic(err)
}
}
```

Expand Down Expand Up @@ -222,6 +209,6 @@ go run main.go

Open several browser tabs with http://localhost:8000 and see chat in action.

This example is only the top of an iceberg. Though it should give you an insight on library API.
This example is only the top of an iceberg. Though it should give you an insight on library API.

Keep in mind that Centrifuge library is not a framework to build chat apps. It's a general purpose real-time transport for your messages with some helpful primitives. You can build many kinds of real-time apps on top of this library including chats but depending on application you may need to write business logic yourself.
51 changes: 21 additions & 30 deletions _examples/benchmarks/benchmark_gobwas/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,46 +49,37 @@ func main() {
flag.Parse()

cfg := centrifuge.DefaultConfig
cfg.Publish = true
cfg.LogLevel = centrifuge.LogLevelError
cfg.LogHandler = handleLog
cfg.ClientInsecure = true

node, _ := centrifuge.New(cfg)

node.On().ClientConnected(func(ctx context.Context, client *centrifuge.Client) {
node.On().Connecting(func(ctx context.Context, e centrifuge.ConnectEvent) centrifuge.ConnectReply {
return centrifuge.ConnectReply{
Credentials: &centrifuge.Credentials{
UserID: "bench",
},
}
})

client.On().Subscribe(func(e centrifuge.SubscribeEvent) centrifuge.SubscribeReply {
log.Printf("user %s subscribes on %s", client.UserID(), e.Channel)
return centrifuge.SubscribeReply{}
})
node.On().Connect(func(c *centrifuge.Client) {})

client.On().Unsubscribe(func(e centrifuge.UnsubscribeEvent) centrifuge.UnsubscribeReply {
return centrifuge.UnsubscribeReply{}
})
node.On().Subscribe(func(c *centrifuge.Client, e centrifuge.SubscribeEvent) centrifuge.SubscribeReply {
return centrifuge.SubscribeReply{}
})

client.On().Publish(func(e centrifuge.PublishEvent) centrifuge.PublishReply {
// Do not log here - lots of publications expected.
return centrifuge.PublishReply{}
})
node.On().Publish(func(c *centrifuge.Client, e centrifuge.PublishEvent) centrifuge.PublishReply {
return centrifuge.PublishReply{}
})

client.On().Message(func(e centrifuge.MessageEvent) centrifuge.MessageReply {
// Do not log here - lots of messages expected.
err := client.Send(e.Data)
if err != nil {
if err != io.EOF {
log.Fatalln("error senfing to client:", err.Error())
}
node.On().Message(func(c *centrifuge.Client, e centrifuge.MessageEvent) centrifuge.MessageReply {
err := c.Send(e.Data)
if err != nil {
if err != io.EOF {
log.Fatalln("error sending to client:", err.Error())
}
return centrifuge.MessageReply{}
})

client.On().Disconnect(func(e centrifuge.DisconnectEvent) centrifuge.DisconnectReply {
log.Printf("user %s disconnected", client.UserID())
return centrifuge.DisconnectReply{}
})

log.Printf("user %s connected via %s with encoding: %s", client.UserID(), client.Transport().Name(), client.Transport().Encoding())
}
return centrifuge.MessageReply{}
})

if err := node.Run(); err != nil {
Expand Down
52 changes: 21 additions & 31 deletions _examples/benchmarks/benchmark_gorilla/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ func main() {
log.Printf("NumCPU: %d", runtime.NumCPU())

cfg := centrifuge.DefaultConfig
cfg.Publish = true
cfg.LogLevel = centrifuge.LogLevelError
cfg.LogHandler = handleLog
cfg.ClientInsecure = true

node, _ := centrifuge.New(cfg)

Expand All @@ -61,40 +59,32 @@ func main() {
node.SetEngine(engine)
}

node.On().ClientConnected(func(ctx context.Context, client *centrifuge.Client) {
node.On().Connecting(func(ctx context.Context, e centrifuge.ConnectEvent) centrifuge.ConnectReply {
return centrifuge.ConnectReply{
Credentials: &centrifuge.Credentials{
UserID: "bench",
},
}
})

client.On().Subscribe(func(e centrifuge.SubscribeEvent) centrifuge.SubscribeReply {
log.Printf("user %s subscribes on %s", client.UserID(), e.Channel)
return centrifuge.SubscribeReply{}
})
node.On().Connect(func(c *centrifuge.Client) {})

client.On().Unsubscribe(func(e centrifuge.UnsubscribeEvent) centrifuge.UnsubscribeReply {
// log.Printf("user %s unsubscribed from %s", client.UserID(), e.Channel)
return centrifuge.UnsubscribeReply{}
})
node.On().Subscribe(func(c *centrifuge.Client, e centrifuge.SubscribeEvent) centrifuge.SubscribeReply {
return centrifuge.SubscribeReply{}
})

client.On().Publish(func(e centrifuge.PublishEvent) centrifuge.PublishReply {
// Do not log here - lots of publications expected.
return centrifuge.PublishReply{}
})
node.On().Publish(func(c *centrifuge.Client, e centrifuge.PublishEvent) centrifuge.PublishReply {
return centrifuge.PublishReply{}
})

client.On().Message(func(e centrifuge.MessageEvent) centrifuge.MessageReply {
// Do not log here - lots of messages expected.
err := client.Send(e.Data)
if err != nil {
if err != io.EOF {
log.Fatalln("error senfing to client:", err.Error())
}
node.On().Message(func(c *centrifuge.Client, e centrifuge.MessageEvent) centrifuge.MessageReply {
err := c.Send(e.Data)
if err != nil {
if err != io.EOF {
log.Fatalln("error sending to client:", err.Error())
}
return centrifuge.MessageReply{}
})

client.On().Disconnect(func(e centrifuge.DisconnectEvent) centrifuge.DisconnectReply {
log.Printf("user %s disconnected", client.UserID())
return centrifuge.DisconnectReply{}
})

log.Printf("user %s connected via %s with encoding: %s", client.UserID(), client.Transport().Name(), client.Transport().Encoding())
}
return centrifuge.MessageReply{}
})

if err := node.Run(); err != nil {
Expand Down
Loading