forked from moby/libnetwork
/
event.go
168 lines (144 loc) · 3.56 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package serf
import (
"fmt"
"net"
"sync"
"time"
)
// EventType are all the types of events that may occur and be sent
// along the Serf channel.
type EventType int
const (
EventMemberJoin EventType = iota
EventMemberLeave
EventMemberFailed
EventMemberUpdate
EventMemberReap
EventUser
EventQuery
)
func (t EventType) String() string {
switch t {
case EventMemberJoin:
return "member-join"
case EventMemberLeave:
return "member-leave"
case EventMemberFailed:
return "member-failed"
case EventMemberUpdate:
return "member-update"
case EventMemberReap:
return "member-reap"
case EventUser:
return "user"
case EventQuery:
return "query"
default:
panic(fmt.Sprintf("unknown event type: %d", t))
}
}
// Event is a generic interface for exposing Serf events
// Clients will usually need to use a type switches to get
// to a more useful type
type Event interface {
EventType() EventType
String() string
}
// MemberEvent is the struct used for member related events
// Because Serf coalesces events, an event may contain multiple members.
type MemberEvent struct {
Type EventType
Members []Member
}
func (m MemberEvent) EventType() EventType {
return m.Type
}
func (m MemberEvent) String() string {
switch m.Type {
case EventMemberJoin:
return "member-join"
case EventMemberLeave:
return "member-leave"
case EventMemberFailed:
return "member-failed"
case EventMemberUpdate:
return "member-update"
case EventMemberReap:
return "member-reap"
default:
panic(fmt.Sprintf("unknown event type: %d", m.Type))
}
}
// UserEvent is the struct used for events that are triggered
// by the user and are not related to members
type UserEvent struct {
LTime LamportTime
Name string
Payload []byte
Coalesce bool
}
func (u UserEvent) EventType() EventType {
return EventUser
}
func (u UserEvent) String() string {
return fmt.Sprintf("user-event: %s", u.Name)
}
// Query is the struct used EventQuery type events
type Query struct {
LTime LamportTime
Name string
Payload []byte
serf *Serf
id uint32 // ID is not exported, since it may change
addr []byte // Address to respond to
port uint16 // Port to respond to
deadline time.Time // Must respond by this deadline
respLock sync.Mutex
}
func (q *Query) EventType() EventType {
return EventQuery
}
func (q *Query) String() string {
return fmt.Sprintf("query: %s", q.Name)
}
// Deadline returns the time by which a response must be sent
func (q *Query) Deadline() time.Time {
return q.deadline
}
// Respond is used to send a response to the user query
func (q *Query) Respond(buf []byte) error {
q.respLock.Lock()
defer q.respLock.Unlock()
// Check if we've already responded
if q.deadline.IsZero() {
return fmt.Errorf("Response already sent")
}
// Ensure we aren't past our response deadline
if time.Now().After(q.deadline) {
return fmt.Errorf("Response is past the deadline")
}
// Create response
resp := messageQueryResponse{
LTime: q.LTime,
ID: q.id,
From: q.serf.config.NodeName,
Payload: buf,
}
// Format the response
raw, err := encodeMessage(messageQueryResponseType, &resp)
if err != nil {
return fmt.Errorf("Failed to format response: %v", err)
}
// Check the size limit
if len(raw) > QueryResponseSizeLimit {
return fmt.Errorf("response exceeds limit of %d bytes", QueryResponseSizeLimit)
}
// Send the response
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
return err
}
// Clera the deadline, response sent
q.deadline = time.Time{}
return nil
}