Skip to content

Commit

Permalink
Merge pull request #11 from composer22/feature/server
Browse files Browse the repository at this point in the history
Feature/server - chat mngr; cleanup tests; robust close
  • Loading branch information
composer22 committed Apr 27, 2015
2 parents 96a7700 + e259042 commit 61e93e4
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 467 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# chattypantz
[![License MIT](https://img.shields.io/npm/l/express.svg)](http://opensource.org/licenses/MIT)
[![Build Status](https://travis-ci.org/composer22/chattypantz.svg?branch=master)](http://travis-ci.org/composer22/chattypantz)
[![Current Release](https://img.shields.io/badge/release-v0.1.1-brightgreen.svg)](https://github.com/composer22/chattypantz/releases/tag/v0.1.1)
[![Current Release](https://img.shields.io/badge/release-v0.1.2-brightgreen.svg)](https://github.com/composer22/chattypantz/releases/tag/v0.1.2)
[![Coverage Status](https://coveralls.io/repos/composer22/chattypantz/badge.svg?branch=master)](https://coveralls.io/r/composer22/chattypantz?branch=master)

![chattypantz-logo](assets/img/chattypantz.png)
Expand Down
163 changes: 163 additions & 0 deletions server/chat_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package server

import (
"errors"
"fmt"
"sync"

"golang.org/x/net/websocket"
)

// ChatManager represents a control hub of chat rooms and chatters for the server.
type ChatManager struct {
rooms map[string]*ChatRoom // A list of rooms on the server.
chatters map[*Chatter]bool // A list of chatters on the server.
done chan bool // Shut down chatters and rooms
maxRooms int // Maximum number of rooms allowed to be created.
maxIdle int // Maximum idle time allowed for a ws connection.
log *ChatLogger // Application log for events.
wg sync.WaitGroup // Synchronizer for manager reqq.
mu sync.Mutex // Lock for update.
}

// ChatManagerNew is a factory function that returns a new instance of a chat manager.
func ChatManagerNew(n int, mi int, cl *ChatLogger) *ChatManager {
return &ChatManager{
rooms: make(map[string]*ChatRoom),
chatters: make(map[*Chatter]bool),
done: make(chan bool),
maxRooms: n,
maxIdle: mi,
log: cl,
}
}

// list returns a list of chat room names.
func (m *ChatManager) list() []string {
m.mu.Lock()
defer m.mu.Unlock()
var names []string
for n := range m.rooms {
names = append(names, n)
}
return names
}

// find will find a chat room for a given name.
func (m *ChatManager) find(n string) (*ChatRoom, error) {
m.mu.Lock()
r, ok := m.rooms[n]
m.mu.Unlock()
if !ok {
return nil, errors.New(fmt.Sprintf(`Chatroom "%s" not found.`, n))
}
return r, nil
}

// findCreate returns a chat room for a given name or create a new one.
func (m *ChatManager) findCreate(n string) (*ChatRoom, error) {
r, err := m.find(n)
if err != nil {
mr := m.MaxRooms()
m.mu.Lock() // cover rooms
if mr > 0 && mr == len(m.rooms) {
m.mu.Unlock()
return nil, errors.New("Maximum number of rooms reached. Cannot create new room.")
}
r = ChatRoomNew(n, m.done, m.log, &m.wg)
m.rooms[n] = r
m.wg.Add(1)
go r.Run()
m.mu.Unlock()
}
return r, nil
}

// removeChatterAllRooms sends a broadcast to all rooms to release the chatter.
func (m *ChatManager) removeChatterAllRooms(c *Chatter) {
m.mu.Lock()
defer m.mu.Unlock()
for _, r := range m.rooms {
if q, err := ChatRequestNew(c, r.name, ChatReqTypeLeave, ""); err == nil {
r.reqq <- q
}
}
}

// getRoomStats returns statistics from each room.
func (m *ChatManager) getRoomStats() []*ChatRoomStats {
m.mu.Lock()
defer m.mu.Unlock()
var s = []*ChatRoomStats{}
for _, r := range m.rooms {
s = append(s, r.ChatRoomStatsNew())
}
return s
}

// registerChatter registers a new chatter with the chat manager.
func (m *ChatManager) registerNewChatter(ws *websocket.Conn) *Chatter {
m.mu.Lock()
defer m.mu.Unlock()
c := ChatterNew(m, ws, m.log)
m.chatters[c] = true
return c
}

// getChatterStats returns statistics from all chatters
func (m *ChatManager) getChatterStats() []*ChatterStats {
m.mu.Lock()
defer m.mu.Unlock()
var s = []*ChatterStats{}
for c := range m.chatters {
s = append(s, c.ChatterStatsNew())
}
return s
}

// unregisterChatter removes a new chatter from the chat manager.
func (m *ChatManager) unregisterChatter(c *Chatter) {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.chatters[c]; ok {
delete(m.chatters, c)
}
}

// Shuts down the chatters and the rooms. Used by server on quit.
func (m *ChatManager) shutdownAll() {
close(m.done)
m.wg.Wait()
m.mu.Lock()
m.rooms = make(map[string]*ChatRoom)
m.chatters = make(map[*Chatter]bool)
m.mu.Unlock()
}

// MaxRooms returns the current maximum number of rooms allowed on the server.
func (m *ChatManager) MaxRooms() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.maxRooms
}

// SetMaxRooms sets the maximum number of rooms allowed on the server.
func (m *ChatManager) SetMaxRooms(mr int) {
m.mu.Lock()
defer m.mu.Unlock()
m.maxRooms = mr
}

// MaxIdle returns the current maximum idle time for a connection.
func (m *ChatManager) MaxIdle() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.maxIdle
}

// SetMaxIdle sets the maximum idle time for a connection.
func (m *ChatManager) SetMaxIdle(mi int) {
m.mu.Lock()
defer m.mu.Unlock()
m.maxIdle = mi
}
40 changes: 18 additions & 22 deletions server/chat_room.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ type ChatRoom struct {
reqCount uint64 // Total requests received.
rspCount uint64 // Total responses sent.
reqq chan *ChatRequest // Channel to receive requests.
done chan bool // Channel to receive signal to shutdown now.
log *ChatLogger // Application log for events.
mu sync.Mutex // Lock against stats.
wg *sync.WaitGroup // Wait group for the run from the chat room manager.
}

// ChatRoomNew is a factory function that returns a new instance of a chat room.
func ChatRoomNew(n string, cl *ChatLogger, g *sync.WaitGroup) *ChatRoom {
func ChatRoomNew(n string, d chan bool, cl *ChatLogger, g *sync.WaitGroup) *ChatRoom {
return &ChatRoom{
name: n,
chatters: make(map[*Chatter]bool),
reqq: make(chan *ChatRequest, maxChatRoomReq),
done: d,
log: cl,
wg: g,
}
Expand All @@ -43,6 +45,8 @@ func (r *ChatRoom) Run() {
r.start = time.Now()
for {
select {
case <-r.done: // Server signal quit
return
case req, ok := <-r.reqq:
if !ok { // Assume ch closed and shutdown notification
return
Expand Down Expand Up @@ -219,34 +223,26 @@ func (r *ChatRoom) isMemberName(n string) bool {

// sendResponse sends a message to a single chatter in the room.
func (r *ChatRoom) sendResponse(c *Chatter, rt int, ct string, l []string) {
if c.isConnected() {
if l == nil {
l = []string{}
}
if rsp, err := ChatResponseNew(r.name, rt, ct, l); err == nil {
r.mu.Lock()
r.lastRsp = time.Now()
r.rspCount++
r.mu.Unlock()
c.rspq <- rsp
}
if l == nil {
l = []string{}
}
c.sendResponse(r.name, rt, ct, l)
r.mu.Lock()
r.lastRsp = time.Now()
r.rspCount++
r.mu.Unlock()
}

// sendResponseAll sends a message to all chatters in the room.
func (r *ChatRoom) sendResponseAll(rt int, ct string, l []string) {
if l == nil {
l = []string{}
}
if rsp, err := ChatResponseNew(r.name, rt, ct, l); err == nil {
r.mu.Lock()
for c := range r.chatters {
if c.isConnected() {
r.lastRsp = time.Now()
r.rspCount++
c.rspq <- rsp
}
}
r.mu.Unlock()
r.mu.Lock()
for c := range r.chatters {
c.sendResponse(r.name, rt, ct, l)
r.lastRsp = time.Now()
r.rspCount++
}
r.mu.Unlock()
}
100 changes: 0 additions & 100 deletions server/chat_room_manager.go

This file was deleted.

38 changes: 0 additions & 38 deletions server/chat_room_manager_test.go

This file was deleted.

Loading

0 comments on commit 61e93e4

Please sign in to comment.