Skip to content

Commit

Permalink
Add free list
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhavp committed Feb 3, 2016
1 parent ee3df3b commit ca7e689
Showing 1 changed file with 86 additions and 22 deletions.
108 changes: 86 additions & 22 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package wsevent

import (
"log"
"reflect"
"sync"
)
Expand Down Expand Up @@ -37,7 +38,7 @@ func (s *Server) call(client *Client, f reflect.Value, data []byte) (interface{}
return out[0].Interface(), nil
}

//Server
//Server represents an RPC server
type Server struct {
//maps room string to a list of clients in it
rooms map[string]([]*Client)
Expand All @@ -56,12 +57,63 @@ type Server struct {

codec ServerCodec
defaultHandler reflect.Value

reqMu *sync.Mutex
freeReq *request

replyMu *sync.Mutex
freeRep *reply
}

func (s *Server) getRequest() *request {
s.reqMu.Lock()
defer s.reqMu.Unlock()
req := s.freeReq
if req == nil {
req = new(request)
} else {
s.freeReq = req.next
*req = request{}
}

return req
}

func (s *Server) freeRequest(req *request) {
s.reqMu.Lock()
defer s.reqMu.Unlock()

req.next = s.freeReq
s.freeReq = req
}

func (s *Server) getReply() *reply {
s.replyMu.Lock()
defer s.replyMu.Unlock()
rep := s.freeRep
if rep == nil {
rep = new(reply)
} else {
s.freeRep = rep.next
*rep = reply{}
}
return rep
}

func (s *Server) freeReply(reply *reply) {
s.replyMu.Lock()
defer s.replyMu.Unlock()

reply.next = s.freeRep
s.freeRep = reply
}

//Return a new server object
//NewServer returns a new server
func NewServer(codec ServerCodec, defaultHandler interface{}) *Server {
value := reflect.ValueOf(defaultHandler)
verifyHandler(value, reflect.TypeOf(defaultHandler).Name(), "NewServer")
if !validHandler(value, reflect.TypeOf(defaultHandler).Name()) {
panic("NewServer: invalid default handler")
}

s := &Server{
rooms: make(map[string]([]*Client)),
Expand All @@ -76,13 +128,16 @@ func NewServer(codec ServerCodec, defaultHandler interface{}) *Server {

codec: codec,
defaultHandler: reflect.ValueOf(defaultHandler),

reqMu: new(sync.Mutex),
replyMu: new(sync.Mutex),
}

return s
}

//Add a client c to room r
func (s *Server) AddClient(c *Client, r string) {
//Join adds a client to the given room
func (s *Server) Join(c *Client, r string) {
s.joinedRoomsMu.RLock()
for _, room := range s.joinedRooms[c.ID] {
if r == room {
Expand All @@ -103,8 +158,8 @@ func (s *Server) AddClient(c *Client, r string) {
//log.Printf("Added %s to room %s", c.id, r)
}

//Remove client c from room r
func (s *Server) RemoveClient(client *Client, r string) {
//Leave removes the client from the given room
func (s *Server) Leave(client *Client, r string) {
s.roomsMu.Lock()
for i, joinedClient := range s.rooms[r] {
if client.ID == joinedClient.ID {
Expand Down Expand Up @@ -133,7 +188,7 @@ func (s *Server) RemoveClient(client *Client, r string) {

}

//Send all clients in room room data
//Broadcast given data to all clients in the given room
func (s *Server) Broadcast(room string, data string) {
s.roomsMu.RLock()
for _, client := range s.rooms[room] {
Expand All @@ -145,18 +200,22 @@ func (s *Server) Broadcast(room string, data string) {
s.roomsMu.RUnlock()
}

//BroadcastJSON broadcasts the json encoding of v to all clients in room
func (s *Server) BroadcastJSON(room string, v interface{}) {
s.roomsMu.RLock()
for _, client := range s.rooms[room] {
//log.Printf("sending to %s %s\n", client.id, room)
go func(c *Client) {
c.EmitJSON(v)
err := c.EmitJSON(v)
if err != nil {
log.Println(err)
}
}(client)
}
s.roomsMu.RUnlock()
}

//Returns a map of room name -> number of clients
//Rooms returns a map of room name -> number of clients
func (s *Server) Rooms() map[string]int {
rooms := make(map[string]int)

Expand All @@ -169,7 +228,7 @@ func (s *Server) Rooms() map[string]int {
return rooms
}

//Returns an array of rooms the client c has been added to
//RoomsJoined returns an array of rooms the client c has been added to
func (s *Server) RoomsJoined(id string) []string {
rooms := make([]string, len(s.joinedRooms[id]))
s.joinedRoomsMu.RLock()
Expand All @@ -180,24 +239,27 @@ func (s *Server) RoomsJoined(id string) []string {
return rooms
}

//Registers a callback for the event string. The callback must take 2 arguments,
//The client from which the message was received and the string message itself.
//On Registers a callback for the event string. It panics if the callback isn't
//valid
func (s *Server) On(event string, f interface{}) {
value := reflect.ValueOf(f)

verifyHandler(value, reflect.TypeOf(f).Name(), "On")
if !validHandler(value, reflect.TypeOf(f).Name()) {
panic("On: invalid callback for event " + event)
}

s.handlersLock.Lock()
s.handlers[event] = value
s.handlersLock.Unlock()
}

//A Receiver interface implements the Name method, which returns a name for the
//event, given a Handler's name
//event, given a registered function's name.
type Receiver interface {
Name(string) string
}

//Similar to net/rpc's Register, expect that rcvr needs to implement the
//Register is similar to net/rpc's Register, expect that rcvr needs to implement the
//Receiver interface
func (s *Server) Register(rcvr Receiver) {
rvalue := reflect.ValueOf(rcvr)
Expand All @@ -210,17 +272,19 @@ func (s *Server) Register(rcvr Receiver) {
continue
}

verifyHandler(method, name, "Register")
if !validHandler(method, name) {
continue
}

s.handlersLock.Lock()
s.handlers[rcvr.Name(name)] = method
s.handlersLock.Unlock()
}
}

func verifyHandler(method reflect.Value, name, prefix string) bool {
return method.Type().NumIn() != 2 &&
method.Type().NumOut() != 1 &&
method.Type().In(0) != reflect.TypeOf(&Client{}) &&
method.Type().In(1).Kind() != reflect.Struct
func validHandler(method reflect.Value, name string) bool {
return method.Type().NumIn() == 2 &&
method.Type().NumOut() == 1 &&
method.Type().In(0) == reflect.TypeOf(&Client{}) &&
method.Type().In(1).Kind() == reflect.Struct
}

0 comments on commit ca7e689

Please sign in to comment.