This repository has been archived by the owner on Feb 1, 2021. It is now read-only.
/
token.go
142 lines (117 loc) · 2.97 KB
/
token.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package token
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/docker/docker/pkg/discovery"
)
const discoveryURL = "https://discovery.hub.docker.com/v1"
// Discovery is exported.
type Discovery struct {
heartbeat time.Duration
ttl time.Duration
url string
token string
}
func init() {
Init()
}
// Init is exported.
func Init() {
discovery.Register("token", &Discovery{})
}
// Initialize is exported.
func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration, ttl time.Duration, _ map[string]string) error {
if i := strings.LastIndex(urltoken, "/"); i != -1 {
s.url = "https://" + urltoken[:i]
s.token = urltoken[i+1:]
} else {
s.url = discoveryURL
s.token = urltoken
}
if s.token == "" {
return errors.New("token is empty")
}
s.heartbeat = heartbeat
s.ttl = ttl
return nil
}
// fetch returns the list of entries for the discovery service at the specified endpoint.
func (s *Discovery) fetch() (discovery.Entries, error) {
resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var addrs []string
if resp.StatusCode == http.StatusOK {
if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil {
return nil, fmt.Errorf("Failed to decode response: %v", err)
}
} else {
return nil, fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode)
}
return discovery.CreateEntries(addrs)
}
// Watch is exported.
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
ch := make(chan discovery.Entries)
ticker := time.NewTicker(s.heartbeat)
errCh := make(chan error)
go func() {
defer close(ch)
defer close(errCh)
// Send the initial entries if available.
currentEntries, err := s.fetch()
if err != nil {
errCh <- err
} else {
ch <- currentEntries
}
// Periodically send updates.
for {
select {
case <-ticker.C:
newEntries, err := s.fetch()
if err != nil {
errCh <- err
continue
}
// Check if the file has really changed.
if !newEntries.Equals(currentEntries) {
ch <- newEntries
}
currentEntries = newEntries
case <-stopCh:
ticker.Stop()
return
}
}
}()
return ch, errCh
}
// Register adds a new entry identified by the into the discovery service.
func (s *Discovery) Register(addr string) error {
buf := strings.NewReader(addr)
resp, err := http.Post(fmt.Sprintf("%s/%s/%s?ttl=%d", s.url,
"clusters", s.token, uint64(s.ttl.Seconds())), "application/json", buf)
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// CreateCluster returns a unique cluster token.
func (s *Discovery) CreateCluster() (string, error) {
resp, err := http.Post(fmt.Sprintf("%s/%s", s.url, "clusters"), "", nil)
if err != nil {
return "", err
}
defer resp.Body.Close()
token, err := ioutil.ReadAll(resp.Body)
return string(token), err
}