Skip to content

Commit

Permalink
golint and two new tests (#8)
Browse files Browse the repository at this point in the history
* chg: golint

* chg: "Logging" unhandled errors

* new: More invocation/streaminvocation tests

* chg: golint

* chg: public JsonHubProtocol

* chg: fixed doc comments
  • Loading branch information
philippseith authored and davidfowl committed Jan 3, 2020
1 parent b0cfb0c commit cd24f0c
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 28 deletions.
1 change: 1 addition & 0 deletions signalr-go-server/pkg/signalr/clientproxy.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package signalr

//ClientProxy allows the hub to send messages to one or more of its clients
type ClientProxy interface {
Send(target string, args ...interface{})
}
Expand Down
1 change: 1 addition & 0 deletions signalr-go-server/pkg/signalr/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package signalr

import "io"

// Connection describes a connection between signalR client and Server
type Connection interface {
io.Reader
io.Writer
Expand Down
1 change: 1 addition & 0 deletions signalr-go-server/pkg/signalr/groupmanager.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package signalr

// GroupManager manages the client groups of the hub
type GroupManager interface {
AddToGroup(groupName string, connectionID string)
RemoveFromGroup(groupName string, connectionID string)
Expand Down
7 changes: 7 additions & 0 deletions signalr-go-server/pkg/signalr/hub.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
package signalr

// HubInterface is a hubs interface
type HubInterface interface {
Initialize(hubContext HubContext)
OnConnected(connectionID string)
OnDisconnected(connectionID string)
}

// Hub is a base class for hubs
type Hub struct {
context HubContext
}

// Initialize initializes a hub with a HubContext
func (h *Hub) Initialize(ctx HubContext) {
h.context = ctx
}

// Clients returns the clients of this hub
func (h *Hub) Clients() HubClients {
return h.context.Clients()
}

// Groups returns the client groups of this hub
func (h *Hub) Groups() GroupManager {
return h.context.Groups()
}

// OnConnected is called when the hub is connected
func (h *Hub) OnConnected(string) {}

//OnDisconnected is called when the hub is disconnected
func (h *Hub) OnDisconnected(string) {}
4 changes: 4 additions & 0 deletions signalr-go-server/pkg/signalr/hubclients.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package signalr

// HubClients gives the hub access to various client groups
// All() gets a ClientProxy that can be used to invoke methods on all clients connected to the hub
// Client() gets a ClientProxy that can be used to invoke methods on the specified client connection
// Group() gets a ClientProxy that can be used to invoke methods on all connections in the specified group
type HubClients interface {
All() ClientProxy
Client(connectionID string) ClientProxy
Expand Down
27 changes: 19 additions & 8 deletions signalr-go-server/pkg/signalr/hubconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package signalr

import (
"bytes"
"fmt"
"sync/atomic"
)

Expand Down Expand Up @@ -46,7 +47,9 @@ func (c *defaultHubConnection) Close(error string) {
Error: error,
AllowReconnect: true,
}
c.Protocol.WriteMessage(closeMessage, c.Connection)
if err := c.Protocol.WriteMessage(closeMessage, c.Connection); err != nil {
fmt.Printf("cannot close connection %v: %v", c.GetConnectionID(), err)
}
}

func (c *defaultHubConnection) GetConnectionID() string {
Expand All @@ -60,15 +63,19 @@ func (c *defaultHubConnection) SendInvocation(target string, args []interface{})
Arguments: args,
}

c.Protocol.WriteMessage(invocationMessage, c.Connection)
if err := c.Protocol.WriteMessage(invocationMessage, c.Connection); err != nil {
fmt.Printf("cannot send invocation %v %v over connection %v: %v", target, args, c.GetConnectionID(), err)
}
}

func (c *defaultHubConnection) Ping() {
var pingMessage = hubMessage{
Type: 6,
}

c.Protocol.WriteMessage(pingMessage, c.Connection)
if err := c.Protocol.WriteMessage(pingMessage, c.Connection); err != nil {
fmt.Printf("cannot ping over connection %v: %v", c.GetConnectionID(), err)
}
}

func (c *defaultHubConnection) Receive() (interface{}, error) {
Expand All @@ -80,10 +87,10 @@ func (c *defaultHubConnection) Receive() (interface{}, error) {
// Partial message, need more data
// ReadMessage read data out of the buf, so its gone there: refill
buf.Write(data[:n])
if n, err = c.Connection.Read(data); err != nil {
return nil, err
} else {
if n, err = c.Connection.Read(data); err == nil {
buf.Write(data[:n])
} else {
return nil, err
}
} else {
return message, err
Expand All @@ -99,7 +106,9 @@ func (c *defaultHubConnection) Completion(id string, result interface{}, error s
Error: error,
}

c.Protocol.WriteMessage(completionMessage, c.Connection)
if err := c.Protocol.WriteMessage(completionMessage, c.Connection); err != nil {
fmt.Printf("cannot send completion for invocation %v over connection %v: %v", id, c.GetConnectionID(), err)
}
}

func (c *defaultHubConnection) StreamItem(id string, item interface{}) {
Expand All @@ -109,5 +118,7 @@ func (c *defaultHubConnection) StreamItem(id string, item interface{}) {
Item: item,
}

c.Protocol.WriteMessage(streamItemMessage, c.Connection)
if err := c.Protocol.WriteMessage(streamItemMessage, c.Connection); err != nil {
fmt.Printf("cannot send stream item for invocation %v over connection %v: %v", id, c.GetConnectionID(), err)
}
}
3 changes: 3 additions & 0 deletions signalr-go-server/pkg/signalr/hubcontext.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package signalr

// HubContext is a context abstraction for a hub
// Clients() gets a HubClients that can be used to invoke methods on clients connected to the hub
// Groups() gets a GroupManager that can be used to add and remove connections to named groups
type HubContext interface {
Clients() HubClients
Groups() GroupManager
Expand Down
8 changes: 8 additions & 0 deletions signalr-go-server/pkg/signalr/hublifetimemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ package signalr

import "sync"

// HubLifetimeManager is a lifetime manager abstraction for hub instances
// OnConnected() is called when a connection is started
// OnDisconnected() is called when a connection is finished
// InvokeAll() sends an invocation message to all hub connections
// InvokeClient() sends an invocation message to a specified hub connection
// InvokeGroup() sends an invocation message to a specified group of hub connections
// AddToGroup() adds a connection to the specified group
// RemoveFromGroup() removes a connection from the specified group
type HubLifetimeManager interface {
OnConnected(conn hubConnection)
OnDisconnected(conn hubConnection)
Expand Down
16 changes: 16 additions & 0 deletions signalr-go-server/pkg/signalr/invocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,20 @@ var _ = Describe("Invocation", func() {
})
})
})

Describe("Missing method invocation", func() {
conn := connect(&invocationHub{})
Context("When a missing server method invoked by the client", func() {
It("should return an error", func() {
_, err := conn.clientSend(`{"type":1,"invocationId": "0000","target":"missing"}`)
Expect(err).To(BeNil())
recv := (<-conn.received).(completionMessage)
Expect(recv).NotTo(BeNil())
Expect(recv.InvocationID).To(Equal("0000"))
Expect(recv.Result).To(BeNil())
Expect(recv.Error).NotTo(BeNil())
Expect(len(recv.Error)).To(BeNumerically(">", 0))
})
})
})
})
6 changes: 2 additions & 4 deletions signalr-go-server/pkg/signalr/jsonhubprotocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package signalr
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
)
Expand All @@ -26,14 +27,11 @@ func (j *JsonHubProtocol) UnmarshalArgument(argument interface{}, value interfac
func (j *JsonHubProtocol) ReadMessage(buf *bytes.Buffer) (interface{}, bool, error) {
data, err := parseTextMessageFormat(buf)
switch {
case err == io.EOF:
case errors.Is(err, io.EOF):
return nil, false, err
case err != nil:
return nil, true, err
}
if err != nil {
return nil, true, err
}

message := hubMessage{}
err = json.Unmarshal(data, &message)
Expand Down
11 changes: 7 additions & 4 deletions signalr-go-server/pkg/signalr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@ import (
"time"
)

// Server is a SignalR server for one type of hub
type Server struct {
hub HubInterface
lifetimeManager HubLifetimeManager
defaultHubClients defaultHubClients
defaultHubClients HubClients
groupManager GroupManager
}

// NewServer creates a new server for one type of hub
func NewServer(hub HubInterface) *Server {
lifetimeManager := defaultHubLifetimeManager{}
return &Server{
hub: hub,
lifetimeManager: &lifetimeManager,
defaultHubClients: defaultHubClients{
defaultHubClients: &defaultHubClients{
lifetimeManager: &lifetimeManager,
allCache: allClientProxy{lifetimeManager: &lifetimeManager},
},
Expand All @@ -33,6 +35,7 @@ func NewServer(hub HubInterface) *Server {
}
}

// Run runs the server on one connection. The same server might be run on different connections in parallel
func (s *Server) Run(conn Connection) {
if protocol, err := processHandshake(conn); err != nil {
fmt.Println(err)
Expand Down Expand Up @@ -65,7 +68,7 @@ func (s *Server) Run(conn Connection) {
// argument build failed
hubConn.Completion(invocation.InvocationID, nil, err.Error())
} else if clientStreaming {
// let the receiving method run idependently
// let the receiving method run independently
go func() {
defer func() {
if err := recover(); err != nil {
Expand Down Expand Up @@ -127,7 +130,7 @@ type hubInfo struct {
func (s *Server) newHubInfo() *hubInfo {

s.hub.Initialize(&defaultHubContext{
clients: &s.defaultHubClients,
clients: s.defaultHubClients,
groups: s.groupManager,
})

Expand Down
2 changes: 1 addition & 1 deletion signalr-go-server/pkg/signalr/signalr_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ func TestSignalr(t *testing.T) {
func connect(hubProto HubInterface) *testingConnection {
server := NewServer(hubProto)
conn := newTestingConnection()
go server.messageLoop(conn)
go server.Run(conn)
return conn
}
25 changes: 25 additions & 0 deletions signalr-go-server/pkg/signalr/streaminvocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func (s *streamHub) SimpleStream() <-chan int {
return r
}

func (s *streamHub) SimpleInt() int {
streamInvocationQueue <- "SimpleInt()"
return -1
}

var _ = Describe("Streaminvocation", func() {

Describe("Simple stream invocation", func() {
Expand Down Expand Up @@ -79,4 +84,24 @@ var _ = Describe("Streaminvocation", func() {
})
})
})

Describe("Stream invocation of method with no stream result", func() {
conn := connect(&streamHub{})
Context("When invoked by the client", func() {
It("should be invoked on the server, return one stream item with the \"no stream\" result and a final completion without result", func() {
_, err := conn.clientSend(`{"type":4,"invocationId": "yyy","target":"simpleint"}`)
Expect(err).To(BeNil())
Expect(<-streamInvocationQueue).To(Equal("SimpleInt()"))
sRecv := (<-conn.received).(streamItemMessage)
Expect(sRecv).NotTo(BeNil())
Expect(sRecv.InvocationID).To(Equal("yyy"))
Expect(sRecv.Item).To(Equal(float64(-1)))
cRecv := (<-conn.received).(completionMessage)
Expect(cRecv).NotTo(BeNil())
Expect(cRecv.InvocationID).To(Equal("yyy"))
Expect(cRecv.Result).To(BeNil())
Expect(cRecv.Error).To(Equal(""))
})
})
})
})
14 changes: 9 additions & 5 deletions signalr-go-server/pkg/signalr/testingconnection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package signalr
import (
"bytes"
"encoding/json"
"fmt"
"github.com/onsi/ginkgo"
"io"
)

Expand Down Expand Up @@ -37,9 +39,11 @@ func newTestingConnection() *testingConnection {
}
// Send initial Handshake
go func() {
conn.clientSend(`{"protocol": "json","version": 1}`)
if _, err := conn.clientSend(`{"protocol": "json","version": 1}`); err != nil {
ginkgo.Fail(fmt.Sprint(err))
}
}()
conn.received = make(chan interface{}, 20)
conn.received = make(chan interface{}, 0)
go func() {
for {
if message, err := conn.clientReceive(); err == nil {
Expand Down Expand Up @@ -75,10 +79,10 @@ func (t *testingConnection) clientReceive() (string, error) {
for {
if message, err := buf.ReadString(30); err != nil {
buf.Write(data[:n])
if n, err = t.cliReader.Read(data); err != nil {
return "", err
} else {
if n, err = t.cliReader.Read(data); err == nil {
buf.Write(data[:n])
} else{
return "", err
}
} else {
return message[:len(message)-1], nil
Expand Down
8 changes: 6 additions & 2 deletions signalr-go-server/pkg/signalr/websocketServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,15 @@ func negotiateHandler(w http.ResponseWriter, req *http.Request) {
},
}

json.NewEncoder(w).Encode(response)
if err := json.NewEncoder(w).Encode(response); err != nil {
fmt.Println(err)
}
}

func getConnectionID() string {
bytes := make([]byte, 16)
rand.Read(bytes)
if _, err := rand.Read(bytes); err != nil {
fmt.Println(err)
}
return base64.StdEncoding.EncodeToString(bytes)
}
6 changes: 2 additions & 4 deletions signalr-go-server/pkg/signalr/websocketconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ func (w *webSocketConnection) Read(p []byte) (n int, err error) {
var data []byte
if err = websocket.Message.Receive(w.ws, &data); err != nil {
return 0, err
} else {
w.r = bytes.NewReader(data)
return w.r.Read(p)
}
} else {
w.r = bytes.NewReader(data)
return w.r.Read(p)
}
return w.r.Read(p)
}

0 comments on commit cd24f0c

Please sign in to comment.