/
watch.go
138 lines (118 loc) · 2.84 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Original source: github.com/micro/go-micro/v3/util/kubernetes/client/watch.go
package client
import (
"bufio"
"context"
"encoding/json"
"errors"
"net/http"
"github.com/micro/micro/v3/service/runtime/kubernetes/api"
)
const (
// EventTypes used
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
Error EventType = "ERROR"
)
// Watcher is used to watch for events
type Watcher interface {
// A channel of events
Chan() <-chan Event
// Stop the watcher
Stop()
}
// EventType defines the possible types of events.
type EventType string
// Event represents a single event to a watched resource.
type Event struct {
Type EventType `json:"type"`
Object json.RawMessage `json:"object"`
}
// bodyWatcher scans the body of a request for chunks
type bodyWatcher struct {
results chan Event
cancel func()
stop chan bool
res *http.Response
req *api.Request
}
// Changes returns the results channel
func (wr *bodyWatcher) Chan() <-chan Event {
return wr.results
}
// Stop cancels the request
func (wr *bodyWatcher) Stop() {
select {
case <-wr.stop:
return
default:
// cancel the request
wr.cancel()
// stop the watcher
close(wr.stop)
}
}
func (wr *bodyWatcher) stream() {
reader := bufio.NewReader(wr.res.Body)
go func() {
for {
// read a line
b, err := reader.ReadBytes('\n')
if err != nil {
return
}
// send the event
var event Event
if err := json.Unmarshal(b, &event); err != nil {
continue
}
select {
case <-wr.stop:
return
case wr.results <- event:
}
}
}()
}
// newWatcher creates a k8s body watcher for
// a given http request
func newWatcher(req *api.Request) (Watcher, error) {
// set request context so we can cancel the request
ctx, cancel := context.WithCancel(context.Background())
req.Context(ctx)
// do the raw request
res, err := req.Raw()
if err != nil {
cancel()
return nil, err
}
if res.StatusCode < 200 || res.StatusCode >= 300 {
cancel()
// close the response body
res.Body.Close()
// return an error
return nil, errors.New(res.Request.URL.String() + ": " + res.Status)
}
wr := &bodyWatcher{
results: make(chan Event),
stop: make(chan bool),
cancel: cancel,
req: req,
res: res,
}
go wr.stream()
return wr, nil
}