Skip to content

Commit

Permalink
Merge pull request #36 from gibsn/bug/issue_35_chat_refactoring
Browse files Browse the repository at this point in the history
chat: major refactoring
  • Loading branch information
Kirill Alekseev committed Jul 20, 2018
2 parents 3bab791 + 90d57a2 commit 603ad62
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 138 deletions.
194 changes: 110 additions & 84 deletions src/technochat/chat/chat.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package chat

import (
"fmt"
"log"
"net/http"
"sync"
"time"

"github.com/gorilla/websocket"

"technochat/chat/message"
"technochat/chat/user"
)

const (
Expand All @@ -18,8 +22,8 @@ const (
)

const (
pingTimer time.Duration = 30 * time.Second
pingTimeout time.Duration = 1 * time.Second
incomingBufferSize = 10
broadcastBufferSize = 10
)

var Upgrader = websocket.Upgrader{
Expand All @@ -30,10 +34,19 @@ type Chat struct {
ID string
ChatNames ChatNames

broadcast chan WSMessage
terminate chan struct{}
triggerShutdown sync.Once
triggerShutdownChan chan struct{}
shutdownChan chan struct{}
WG sync.WaitGroup

userConnectedChan chan *user.User
userDisconnectedChan chan *user.User
usersWG sync.WaitGroup

incomingChan chan *message.WSMessage
broadcastChan chan *message.WSMessage

corresps map[int]*User
corresps map[int]*user.User
correspsMx sync.RWMutex

restJoins int
Expand All @@ -45,116 +58,129 @@ type NewChatOpts struct {
}

func NewChat(opts NewChatOpts) *Chat {
return &Chat{
ID: opts.ID,
corresps: make(map[int]*User),
broadcast: make(chan WSMessage),
restJoins: opts.MaxJoins,
ChatNames: NewChatNames(),
c := &Chat{
ID: opts.ID,
corresps: make(map[int]*user.User),
incomingChan: make(chan *message.WSMessage, incomingBufferSize),
broadcastChan: make(chan *message.WSMessage, broadcastBufferSize),
restJoins: opts.MaxJoins,
ChatNames: NewChatNames(),
triggerShutdownChan: make(chan struct{}),
shutdownChan: make(chan struct{}),
userConnectedChan: make(chan *user.User),
userDisconnectedChan: make(chan *user.User),
}

c.WG.Add(2)
go c.handleUsers()
go c.handleCommunication()

return c
}

func (c *Chat) RestJoins() int {
return c.restJoins
}

func (c *Chat) AddUser(ws *websocket.Conn) *User {
if c.restJoins <= 0 {
return nil
func (c *Chat) SendServerNotify(str string) {
msg := &message.WSMessage{
Type: message.WSMsgTypeMessage,
Name: "server",
Data: str,
}

usr := NewUser()
usr.WS = ws
usr.Name, usr.ID = c.ChatNames.GenerateNameID()

c.correspsMx.Lock()
c.corresps[usr.ID] = usr
c.correspsMx.Unlock()
c.restJoins--

go c.HandleUserSending(usr)

return usr
if err := c.Broadcast(msg); err != nil {
log.Printf("error: chat: could not send server notification in chat %s: %v", c.ID, err)
}
}

func (c *Chat) DelUser(id int) {
func (c *Chat) broadcast(msg *message.WSMessage) {
c.correspsMx.RLock()
corr := c.corresps[id]
c.correspsMx.RUnlock()

if corr == nil {
return
}

log.Printf("info: chat: deleting user id=%d name=%s", id, corr.Name)

corr.terminateSend <- struct{}{}
corr.WS.Close()
defer c.correspsMx.RUnlock()

c.correspsMx.Lock()
delete(c.corresps, id)
c.correspsMx.Unlock()
for _, usr := range c.corresps {
c.correspsMx.RUnlock()
if err := usr.SendMessage(msg); err != nil {
log.Printf("errof: chat: could not send a message to user %s in chat %s: %v",
usr.Name, c.ID, err)
}

if len(c.corresps) == 0 && c.restJoins == 0 {
DelChat(c.ID)
c.correspsMx.RLock()
}
}

func (c *Chat) SendServerNotify(str string) {
c.SendAll(WSMessage{
Type: WSMsgTypeMessage,
Name: "server",
Data: str,
})
}
func (c *Chat) Broadcast(msg *message.WSMessage) error {
select {
case c.broadcastChan <- msg:
default:
return fmt.Errorf("queue is full")
}

func (c *Chat) SendAll(msg WSMessage) {
c.broadcast <- msg
return nil
}

func (c *Chat) HandleChatBroadcast() {
func (c *Chat) handleUsers() {
defer c.WG.Done()

for {
select {
case <-c.terminate:
case <-c.shutdownChan:
log.Printf("info: chat: closing users goroutine for chat [%s]", c.ID)
return
case msg := <-c.broadcast:
c.correspsMx.RLock()
for _, usr := range c.corresps {
c.correspsMx.RUnlock()
select {
case usr.send <- msg:
default:
log.Printf("error: chat: cant send broadcast msg to user %s", usr.Name)
}
c.correspsMx.RLock()

case connectedUser := <-c.userConnectedChan:
connectedUser.SendEvent(message.EventConnInitOk, connectedUser.Name)
c.SubscribeUser(connectedUser)
c.SendServerNotify("user " + connectedUser.Name + " has joined")

case disconnectedUser := <-c.userDisconnectedChan:
c.SendServerNotify("user " + disconnectedUser.Name + " has left")
if len(c.corresps) == 0 && c.restJoins == 0 {
log.Printf("info: chat: no users left in chat %s", c.ID)
c.TriggerShutdown()
}
c.correspsMx.RUnlock()
case <-time.After(ChatAFKLifetime):
log.Printf("chat: no activity in chat for %s. chat will be terminated", ChatAFKLifetime)
DelChat(c.ID)
}
}
}

func (c *Chat) HandleUserSending(usr *User) {
func (c *Chat) handleCommunication() {
defer c.WG.Done()

for {
select {
case <-usr.terminateSend:
case <-c.shutdownChan:
log.Printf("info: chat: closing communication goroutine for chat [%s]", c.ID)
return
case msg := <-usr.send:
err := usr.WS.WriteJSON(msg)
if err != nil {
log.Printf("error: chat: cant send a message to user %s: %v", usr.Name, err)
c.DelUser(usr.ID)
return
}
case <-time.After(pingTimer):
err := usr.WS.WriteControl(websocket.PingMessage, nil, time.Now().Add(pingTimeout))
if err != nil {
log.Printf("error: chat: cant send a ping message to user %s: %v", usr.Name, err)
c.DelUser(usr.ID)
return
}

case msg := <-c.incomingChan:
c.broadcast(msg)

case msg := <-c.broadcastChan:
c.broadcast(msg)

//TODO use NewTimer
case <-time.After(ChatAFKLifetime):
log.Printf("info: chat: no activity in chat %s for %s, shutting down", c.ID, ChatAFKLifetime)
c.SendServerNotify("closing chat due to inactivity for " + ChatAFKLifetime.String())
c.TriggerShutdown()
}
}
}

func (c *Chat) TriggerShutdown() {
c.triggerShutdown.Do(func() {
close(c.triggerShutdownChan)
})
}

func (c *Chat) Routine() {
select {
case <-c.triggerShutdownChan:
log.Printf("info: chat: triggered shutdown for chat [%s]", c.ID)
}

c.ShutdownUsers()

close(c.shutdownChan)
c.WG.Wait()
}
19 changes: 10 additions & 9 deletions src/technochat/chat/chatslist.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,30 @@ func AddChat(c *Chat) {
chatsList.mx.Lock()
chatsList.chats[c.ID] = c
chatsList.mx.Unlock()
go c.HandleChatBroadcast()
}

func GetChat(id string) *Chat {
chatsList.mx.Lock()
defer chatsList.mx.Unlock()

return chatsList.chats[id]
}

func DelChat(id string) {
chatsList.mx.Lock()
c := chatsList.chats[id]
chatsList.mx.Unlock()
defer chatsList.mx.Unlock()

if c == nil {
c, ok := chatsList.chats[id]
if !ok {
return
}

log.Printf("info: chat: deleting chat id=%s", id)
log.Printf("info: chat: deleting chat [%s]", c.ID)
delete(chatsList.chats, c.ID)
}

c.terminate <- struct{}{}
func HandleChat(c *Chat) {
c.Routine()

chatsList.mx.Lock()
delete(chatsList.chats, id)
chatsList.mx.Unlock()
DelChat(c.ID)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package chat
package message

type TypeID int

Expand Down
74 changes: 52 additions & 22 deletions src/technochat/chat/user.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,67 @@
package chat

import (
"log"

"github.com/gorilla/websocket"
)

const (
userSendBufferSize = 10
"technochat/chat/user"
)

type User struct {
WS *websocket.Conn
Name string
ID int
send chan WSMessage
terminateSend chan struct{}
func (c *Chat) AddUser(ws *websocket.Conn) *user.User {
if c.restJoins <= 0 {
return nil
}

usr := user.NewUser(ws)
usr.Name, usr.ID = c.ChatNames.GenerateNameID()

log.Printf("info: chat: new user [%d %s] in chat %s", usr.ID, usr.Name, c.ID)

c.correspsMx.Lock()
c.corresps[usr.ID] = usr
c.correspsMx.Unlock()
c.restJoins--

c.usersWG.Add(1)
c.userConnectedChan <- usr

return usr
}

func NewUser() *User {
return &User{
send: make(chan WSMessage, userSendBufferSize),
func (c *Chat) DelUser(id int) {
c.correspsMx.Lock()

usr, ok := c.corresps[id]
if !ok {
c.correspsMx.Unlock()
return
}

log.Printf("info: chat: deleting user [%d, %s] in chat %s", usr.ID, usr.Name, c.ID)

delete(c.corresps, id)
c.correspsMx.Unlock()

c.userDisconnectedChan <- usr
c.usersWG.Done()
}

func (u *User) SendEvent(event EventID, i interface{}) {
u.WS.WriteJSON(WSMessage{
Type: WSMsgTypeService,
Data: Event{event, i},
})
func (c *Chat) SubscribeUser(usr *user.User) {
go func() {
for msg := range usr.ReadStream() {
msg.Name = usr.Name
c.incomingChan <- msg
}
}()
}

func SendEvent(ws *websocket.Conn, event EventID, i interface{}) {
ws.WriteJSON(WSMessage{
Type: WSMsgTypeService,
Data: Event{event, i},
})
func (c *Chat) ShutdownUsers() {
c.correspsMx.Lock()
for _, usr := range c.corresps {
usr.TriggerShutdown()
}
c.correspsMx.Unlock()

c.usersWG.Wait()
}
Loading

0 comments on commit 603ad62

Please sign in to comment.