Skip to content

Commit

Permalink
better API error handling, various comment fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jul 15, 2018
1 parent a516b15 commit c5ee04f
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 42 deletions.
36 changes: 21 additions & 15 deletions README.md
Expand Up @@ -67,7 +67,7 @@ func handleLog(e centrifuge.LogEntry) {
log.Printf("%s: %v", e.Message, e.Fields)
}

// Wait untill program interrupted. When interrupted gracefully shutdown Node.
// Wait until program interrupted. When interrupted gracefully shutdown Node.
func waitExitSignal(n *centrifuge.Node) {
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
Expand All @@ -92,32 +92,33 @@ func main() {
// short. In real app you most probably want authenticate and authorize
// access to server. See godoc and examples in repo for more details.
cfg.ClientInsecure = true
// By default clients can not publish messages into channels. Settings this
// By default clients can not publish messages into channels. Setting this
// option to true we allow them to publish.
cfg.Publish = true

// Node is the core object in Centrifuge library responsible for many useful
// things. Here we initialize new Node instance and pass config to it.
node, _ := centrifuge.New(cfg)

// On().Connect() method is a point where you start connecting Centrifuge
// with your app's business logic. Callback function you pass to On().Connect
// will be called every time new connection established with server. Inside
// this callback function you have to set various event handlers for incoming
// client connection.
// On().Connect() method is a point where you create a binding between
// Centrifuge and your app business logic. Callback function you pass
// to On().Connect will be called every time new connection established
// with server. Inside this callback function you can set various event
// handlers for incoming client connection.
node.On().Connect(func(ctx context.Context, client *centrifuge.Client, e centrifuge.ConnectEvent) centrifuge.ConnectReply {

// Set Subscribe Handler to react on every channel subscribtion attempt
// initiated by client. Here you can theoretically return an Error or
// Disconnect client from server if needed. But now we just accept subscription.
// initiated by client. Here you can theoretically return an error or
// disconnect client from server if needed. But now we just accept
// all subscriptions.
client.On().Subscribe(func(e centrifuge.SubscribeEvent) centrifuge.SubscribeReply {
log.Printf("client subscribes on channel %s", e.Channel)
return centrifuge.SubscribeReply{}
})

// Set Publish Handler to react on every channel Publication sent by client.
// Inside this method you can validate client permissions to publish into channel.
// But in our simple chat app we allow everyone to publish into any channel.
// Inside this method you can validate client permissions to publish into
// channel. But in our simple chat app we allow everyone to publish into
// any channel.
client.On().Publish(func(e centrifuge.PublishEvent) centrifuge.PublishReply {
log.Printf("client publishes into channel %s: %s", e.Channel, string(e.Data))
return centrifuge.PublishReply{}
Expand All @@ -129,12 +130,17 @@ func main() {
return centrifuge.DisconnectReply{}
})

log.Printf("client connected via %s", client.Transport().Name())
// In our example transport will always be Websocket but it can also be SockJS.
transportName := client.Transport().Name()
// In our example clients connect with JSON protocol but it can also be Protobuf.
transportEncoding := client.Transport().Encoding()

log.Printf("client connected via %s (%s)", transportName, transportEncoding)
return centrifuge.ConnectReply{}
})

// Centrifuge library exposes logs with different log level. In your app
// you can set special function to handle this log entries in a way you want.
// you can set special function to handle these log entries in a way you want.
node.SetLogHandler(centrifuge.LogLevelDebug, handleLog)

// Run node will start node's underlying Engine, launch several
Expand Down Expand Up @@ -195,7 +201,7 @@ Also create file `index.html` near `main.go` with content:
})
var input = document.getElementById("input");
input.addEventListener('keyup', function(e) {
if (e.keyCode == 13) {
if (e.keyCode == 13) { // ENTER key pressed
sub.publish(this.value);
input.value = '';
}
Expand Down
29 changes: 27 additions & 2 deletions api.go
Expand Up @@ -24,8 +24,14 @@ func (h *apiExecutor) Publish(ctx context.Context, cmd *apiproto.PublishRequest)

resp := &apiproto.PublishResponse{}

if ch == "" || len(data) == 0 {
h.node.logger.log(newLogEntry(LogLevelError, "channel and data required for publish", nil))
if ch == "" {
h.node.logger.log(newLogEntry(LogLevelError, "channel required for publish", nil))
resp.Error = apiproto.ErrorBadRequest
return resp
}

if len(data) == 0 {
h.node.logger.log(newLogEntry(LogLevelError, "data required for publish", nil))
resp.Error = apiproto.ErrorBadRequest
return resp
}
Expand Down Expand Up @@ -124,6 +130,20 @@ func (h *apiExecutor) Unsubscribe(ctx context.Context, cmd *apiproto.Unsubscribe
user := cmd.User
channel := cmd.Channel

if user == "" {
h.node.logger.log(newLogEntry(LogLevelError, "user required for unsubscribe", map[string]interface{}{"channel": channel, "user": user}))
resp.Error = apiproto.ErrorBadRequest
return resp
}

if channel != "" {
_, ok := h.node.ChannelOpts(channel)
if !ok {
resp.Error = apiproto.ErrorNamespaceNotFound
return resp
}
}

err := h.node.Unsubscribe(user, channel)
if err != nil {
h.node.logger.log(newLogEntry(LogLevelError, "error unsubscribing user from channel", map[string]interface{}{"channel": channel, "user": user, "error": err.Error()}))
Expand All @@ -140,6 +160,11 @@ func (h *apiExecutor) Disconnect(ctx context.Context, cmd *apiproto.DisconnectRe
resp := &apiproto.DisconnectResponse{}

user := cmd.User
if user == "" {
h.node.logger.log(newLogEntry(LogLevelError, "user required for disconnect", map[string]interface{}{}))
resp.Error = apiproto.ErrorBadRequest
return resp
}

err := h.node.Disconnect(user, false)
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions api_test.go
Expand Up @@ -111,6 +111,29 @@ func TestPresenceStatsAPI(t *testing.T) {
assert.Nil(t, resp.Error)
}

func TestDisconnectAPI(t *testing.T) {
node := nodeWithMemoryEngine()
api := newAPIExecutor(node)
resp := api.Disconnect(context.Background(), &apiproto.DisconnectRequest{})
assert.Equal(t, apiproto.ErrorBadRequest, resp.Error)
resp = api.Disconnect(context.Background(), &apiproto.DisconnectRequest{
User: "test",
})
assert.Nil(t, resp.Error)
}

func TestUnsubscribeAPI(t *testing.T) {
node := nodeWithMemoryEngine()
api := newAPIExecutor(node)
resp := api.Unsubscribe(context.Background(), &apiproto.UnsubscribeRequest{})
assert.Equal(t, apiproto.ErrorBadRequest, resp.Error)
resp = api.Unsubscribe(context.Background(), &apiproto.UnsubscribeRequest{
User: "test",
Channel: "test",
})
assert.Nil(t, resp.Error)
}

func TestChannelsAPI(t *testing.T) {
node := nodeWithMemoryEngine()
api := newAPIExecutor(node)
Expand Down
8 changes: 4 additions & 4 deletions client.go
Expand Up @@ -129,7 +129,7 @@ type Client struct {
eventHub *ClientEventHub
}

// newClient creates new client connection.
// newClient initializes new Client.
func newClient(ctx context.Context, n *Node, t transport) (*Client, error) {
uuidObject, err := uuid.NewV4()
if err != nil {
Expand Down Expand Up @@ -896,7 +896,7 @@ func (c *Client) connectCmd(cmd *proto.ConnectRequest) (*proto.ConnectResponse,
c.exp = credentials.ExpireAt
c.mu.Unlock()
} else if cmd.Token != "" {
// explicit auth Credentials not provided in context, try to look
// Explicit auth Credentials not provided in context, try to look
// for credentials in connect token.
token := cmd.Token

Expand Down Expand Up @@ -1703,8 +1703,8 @@ func (c *Client) presenceCmd(cmd *proto.PresenceRequest) (*proto.PresenceRespons
return resp, nil
}

// presenceStatsCmd handle request to get presence stats – short summary
// about clients in channel.
// presenceStatsCmd handles request to get presence stats – short summary
// about active clients in channel.
func (c *Client) presenceStatsCmd(cmd *proto.PresenceStatsRequest) (*proto.PresenceStatsResponse, *Disconnect) {

ch := cmd.Channel
Expand Down
25 changes: 4 additions & 21 deletions node.go
Expand Up @@ -501,47 +501,30 @@ func (n *Node) nodeCmd(node *controlproto.Node) error {
// Unsubscribe unsubscribes user from channel, if channel is equal to empty
// string then user will be unsubscribed from all channels.
func (n *Node) Unsubscribe(user string, ch string) error {

if user == "" {
return ErrorBadRequest
}

if ch != "" {
_, ok := n.ChannelOpts(ch)
if !ok {
return ErrorNamespaceNotFound
}
}

// First unsubscribe on this node.
err := n.hub.unsubscribe(user, ch)
if err != nil {
return ErrorInternal
return err
}
// Second send unsubscribe control message to other nodes.
err = n.pubUnsubscribe(user, ch)
if err != nil {
return ErrorInternal
return err
}
return nil
}

// Disconnect allows to close all user connections to Centrifugo.
func (n *Node) Disconnect(user string, reconnect bool) error {

if user == "" {
return ErrorBadRequest
}

// first disconnect user from this node
err := n.hub.disconnect(user, reconnect)
if err != nil {
return ErrorInternal
return err
}
// second send disconnect control message to other nodes
err = n.pubDisconnect(user, reconnect)
if err != nil {
return ErrorInternal
return err
}
return nil
}
Expand Down

0 comments on commit c5ee04f

Please sign in to comment.