forked from gambol99/go-marathon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cluster.go
209 lines (178 loc) · 4.67 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
/*
Copyright 2014 Rohith All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package marathon
import (
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"time"
)
const (
marathonNodeUp = 0
marathonNodeDown = 1
)
// Cluster is the interface for the marathon cluster impl
type Cluster interface {
URL() string
// retrieve a member from the cluster
GetMember() (string, error)
// make the last member as down
MarkDown()
// the size of the cluster
Size() int
// the members which are available
Active() []string
// the members which are NOT available
NonActive() []string
}
type marathonCluster struct {
sync.RWMutex
// the cluster url
url string
// a link list of members
members *marathonNode
// the number of members
size int
// the protocol
protocol string
// the current host
active *marathonNode
}
// String returns a string representation of the cluster
func (r *marathonCluster) String() string {
return fmt.Sprintf("url: %s|%s, members: %s, size: %d, active: %s",
r.protocol, r.url, r.members, r.size, r.active)
}
type marathonNode struct {
// the name / ip address of the host
hostname string
// the status of the host
status int
// the next member in the list
next *marathonNode
}
func (member marathonNode) String() string {
status := "UP"
if member.status == marathonNodeDown {
status = "DOWN"
}
return fmt.Sprintf("member: %s:%s", member.hostname, status)
}
func newCluster(marathonURL string) (Cluster, error) {
cluster := new(marathonCluster)
// step: parse the marathon url
marathon, err := url.Parse(marathonURL)
if err != nil {
return nil, ErrInvalidEndpoint
}
// step: check the protocol
if marathon.Scheme != "http" && marathon.Scheme != "https" {
return nil, ErrInvalidEndpoint
}
cluster.protocol = marathon.Scheme
cluster.url = marathonURL
/* step: create a link list of the hosts */
var previous *marathonNode
for index, host := range strings.SplitN(marathon.Host, ",", -1) {
// step: create a new cluster member
node := new(marathonNode)
node.hostname = host
cluster.size++
// step: if the first member
if index == 0 {
cluster.members = node
cluster.active = node
previous = node
} else {
previous.next = node
previous = node
}
}
// step: close the link list
previous.next = cluster.active
return cluster, nil
}
func (r *marathonCluster) URL() string {
return r.url
}
func (r *marathonCluster) Active() []string {
return r.memberStatus(marathonNodeUp)
}
func (r *marathonCluster) NonActive() []string {
return r.memberStatus(marathonNodeDown)
}
func (r *marathonCluster) memberStatus(status int) []string {
var list []string
r.RLock()
defer r.RUnlock()
member := r.members
for i := 0; i < r.size; i++ {
if member.status == status {
list = append(list, member.hostname)
}
member = member.next
}
return list
}
// Retrieve the current member, i.e. the current endpoint in use
func (r *marathonCluster) GetMember() (string, error) {
r.Lock()
defer r.Unlock()
for i := 0; i < r.size; i++ {
if r.active.status == marathonNodeUp {
return r.GetMarathonURL(r.active), nil
}
// move to the next member
if r.active.next != nil {
r.active = r.active.next
} else {
return "", errors.New("no cluster members available at the moment")
}
}
// we reached the end and there were no members available
defer r.MarkDown()
return "", ErrMarathonDown
}
// Retrieves the current marathon url
func (r *marathonCluster) GetMarathonURL(node *marathonNode) string {
return fmt.Sprintf("%s://%s", r.protocol, node.hostname)
}
// MarkDown downs node the current endpoint as down and waits for it to come back only
func (r *marathonCluster) MarkDown() {
r.Lock()
defer r.Unlock()
node := r.active
node.status = marathonNodeDown
// step: create a go-routine to place the member back in
go func() {
for {
response, err := http.Get(r.GetMarathonURL(node) + "/ping")
if err == nil && response.StatusCode == 200 {
node.status = marathonNodeUp
return
}
<-time.After(time.Duration(5 * time.Second))
}
}()
// step: move to the next member
if r.active.next != nil {
r.active = r.active.next
}
}
// Six retrieve the size of the cluster
func (r *marathonCluster) Size() int {
return r.size
}