/
nats_rooms.go
131 lines (115 loc) · 2.59 KB
/
nats_rooms.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package room
import (
"errors"
"github.com/nats-io/nats.go"
"log"
"sync"
)
var _ IRooms = (*NatsRooms)(nil)
// NatsRooms is a rooms by nats implement IRooms interface
// nats is a message queue, so we don't need to use channel to send message
type NatsRooms struct {
NewRoom func(id string, conn *nats.Conn) IRoom
rooms map[string]IRoom
roomsMutex sync.RWMutex
openCh chan string
closeCh chan string
natsConn *nats.Conn
openSubscription *nats.Subscription
closeSubscription *nats.Subscription
}
func NewNatsRooms(natsConn *nats.Conn, newRoom func(id string, conn *nats.Conn) IRoom) *NatsRooms {
return &NatsRooms{NewRoom: newRoom, natsConn: natsConn, roomsMutex: sync.RWMutex{}}
}
func (r *NatsRooms) Init() (err error) {
if r.NewRoom == nil {
return errors.New("new room is nil")
}
r.openSubscription, err = r.natsConn.Subscribe("room_open", func(msg *nats.Msg) {
r.openCh <- string(msg.Data)
})
if err != nil {
return err
}
r.closeSubscription, err = r.natsConn.Subscribe("room_close", func(msg *nats.Msg) {
r.openCh <- string(msg.Data)
})
if err != nil {
return err
}
r.rooms = make(map[string]IRoom)
r.openCh = make(chan string, 1000)
r.closeCh = make(chan string, 1000)
go r.init()
return nil
}
func (r *NatsRooms) init() {
for {
select {
case id := <-r.openCh:
r.roomsMutex.Lock()
r.rooms[id] = r.NewRoom(id, r.natsConn)
err := r.rooms[id].Init()
if err != nil {
log.Printf("init room error: %v", err)
delete(r.rooms, id)
}
r.roomsMutex.Unlock()
case id := <-r.closeCh:
r.roomsMutex.Lock()
err := r.rooms[id].Close()
if err != nil {
log.Printf("close room error: %v", err)
}
delete(r.rooms, id)
r.roomsMutex.Unlock()
}
}
}
func (r *NatsRooms) Room(id string) (IRoom, error) {
r.roomsMutex.RLock()
defer r.roomsMutex.RUnlock()
if room, ok := r.rooms[id]; ok {
return room, nil
}
return nil, errors.New("room is not exist")
}
func (r *NatsRooms) OpenRoom(id string) (IRoom, error) {
err := r.natsConn.Publish("room_open", []byte(id))
if err != nil {
return nil, err
}
for {
room, err := r.Room(id)
if err != nil {
continue
}
return room, nil
}
}
func (r *NatsRooms) CloseRoom(id string) error {
err := r.natsConn.Publish("room_close", []byte(id))
if err != nil {
return err
}
for {
_, err := r.Room(id)
if err == nil {
continue
}
return nil
}
}
func (r *NatsRooms) Close() error {
err := r.openSubscription.Unsubscribe()
if err != nil {
return err
}
err = r.closeSubscription.Unsubscribe()
if err != nil {
return err
}
close(r.openCh)
close(r.closeCh)
return nil
}