-
Notifications
You must be signed in to change notification settings - Fork 107
/
event.go
130 lines (107 loc) · 3.16 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package inmem
import (
"context"
"sync"
"github.com/benbjohnson/wtf"
)
// EventBufferSize is the buffer size of the channel for each subscription.
const EventBufferSize = 16
// Ensure type implements interface.
var _ wtf.EventService = (*EventService)(nil)
// EventService represents a service for managing events in the system.
type EventService struct {
mu sync.Mutex
m map[int]map[*Subscription]struct{} // subscriptions by user ID
}
// NewEventService returns a new instance of EventService.
func NewEventService() *EventService {
return &EventService{
m: make(map[int]map[*Subscription]struct{}),
}
}
// PublishEvent publishes event to all of a user's subscriptions.
//
// If user's channel is full then the user is disconnected. This is to prevent
// slow users from blocking progress.
func (s *EventService) PublishEvent(userID int, event wtf.Event) {
s.mu.Lock()
defer s.mu.Unlock()
// Skip if the user is not subscribed at all.
subs := s.m[userID]
if len(subs) == 0 {
return
}
// Publish event to all subscriptions for the user.
for sub := range subs {
select {
case sub.c <- event:
default:
s.unsubscribe(sub)
}
}
}
// Subscribe creates a new subscription for the currently logged in user.
// Returns EUNAUTHORIZED if user is not logged in.
func (s *EventService) Subscribe(ctx context.Context) (wtf.Subscription, error) {
// Fetch current user's ID.
userID := wtf.UserIDFromContext(ctx)
if userID == 0 {
return nil, wtf.Errorf(wtf.EUNAUTHORIZED, "Must be logged in to subscribe to events.")
}
// Create new subscription for the user.
sub := &Subscription{
service: s,
userID: userID,
c: make(chan wtf.Event, EventBufferSize),
}
// Add to list of user's subscriptions.
// Subscritions are stored as a map for each user so we can easily delete them.
subs, ok := s.m[userID]
if !ok {
subs = make(map[*Subscription]struct{})
s.m[userID] = subs
}
subs[sub] = struct{}{}
return sub, nil
}
// Unsubscribe disconnects sub from the service.
func (s *EventService) Unsubscribe(sub *Subscription) {
s.mu.Lock()
defer s.mu.Unlock()
s.unsubscribe(sub)
}
func (s *EventService) unsubscribe(sub *Subscription) {
// Only close the underlying channel once. Otherwise Go will panic.
sub.once.Do(func() {
close(sub.c)
})
// Find subscription map for user. Exit if one does not exist.
subs, ok := s.m[sub.userID]
if !ok {
return
}
// Remove subscription from map.
delete(subs, sub)
// Stop tracking user if they no longer have any subscriptions.
if len(subs) == 0 {
delete(s.m, sub.userID)
}
}
// Ensure type implements interface.
var _ wtf.Subscription = (*Subscription)(nil)
// Subscription represents a stream of user-related events.
type Subscription struct {
service *EventService // service subscription was created from
userID int // subscribed user
c chan wtf.Event // channel of events
once sync.Once // ensures c only closed once
}
// Close disconnects the subscription from the service it was created from.
func (s *Subscription) Close() error {
s.service.Unsubscribe(s)
return nil
}
// C returns a receive-only channel of user-related events.
func (s *Subscription) C() <-chan wtf.Event {
return s.c
}