-
Notifications
You must be signed in to change notification settings - Fork 2k
/
client.go
150 lines (128 loc) · 4.01 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package device
import (
"context"
"io"
"time"
"github.com/LK4D4/joincontext"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/nomad/helper/pluginutils/grpcutils"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/device/proto"
)
// devicePluginClient implements the client side of a remote device plugin, using
// gRPC to communicate to the remote plugin.
type devicePluginClient struct {
// basePluginClient is embedded to give access to the base plugin methods.
*base.BasePluginClient
client proto.DevicePluginClient
// doneCtx is closed when the plugin exits
doneCtx context.Context
}
// Fingerprint is used to retrieve the set of devices and their health from the
// device plugin. An error may be immediately returned if the fingerprint call
// could not be made or as part of the streaming response. If the context is
// cancelled, the error will be propagated.
func (d *devicePluginClient) Fingerprint(ctx context.Context) (<-chan *FingerprintResponse, error) {
// Join the passed context and the shutdown context
joinedCtx, _ := joincontext.Join(ctx, d.doneCtx)
var req proto.FingerprintRequest
stream, err := d.client.Fingerprint(joinedCtx, &req)
if err != nil {
return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx)
}
out := make(chan *FingerprintResponse, 1)
go d.handleFingerprint(ctx, stream, out)
return out, nil
}
// handleFingerprint should be launched in a goroutine and handles converting
// the gRPC stream to a channel. Exits either when context is cancelled or the
// stream has an error.
func (d *devicePluginClient) handleFingerprint(
reqCtx context.Context,
stream proto.DevicePlugin_FingerprintClient,
out chan *FingerprintResponse) {
defer close(out)
for {
resp, err := stream.Recv()
if err != nil {
if err != io.EOF {
out <- &FingerprintResponse{
Error: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx),
}
}
// End the stream
return
}
// Send the response
f := &FingerprintResponse{
Devices: convertProtoDeviceGroups(resp.GetDeviceGroup()),
}
select {
case <-reqCtx.Done():
return
case out <- f:
}
}
}
func (d *devicePluginClient) Reserve(deviceIDs []string) (*ContainerReservation, error) {
// Build the request
req := &proto.ReserveRequest{
DeviceIds: deviceIDs,
}
// Make the request
resp, err := d.client.Reserve(d.doneCtx, req)
if err != nil {
return nil, grpcutils.HandleGrpcErr(err, d.doneCtx)
}
// Convert the response
out := convertProtoContainerReservation(resp.GetContainerRes())
return out, nil
}
// Stats is used to retrieve device statistics from the device plugin. An error
// may be immediately returned if the stats call could not be made or as part of
// the streaming response. If the context is cancelled, the error will be
// propagated.
func (d *devicePluginClient) Stats(ctx context.Context, interval time.Duration) (<-chan *StatsResponse, error) {
// Join the passed context and the shutdown context
joinedCtx, _ := joincontext.Join(ctx, d.doneCtx)
req := proto.StatsRequest{
CollectionInterval: ptypes.DurationProto(interval),
}
stream, err := d.client.Stats(joinedCtx, &req)
if err != nil {
return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx)
}
out := make(chan *StatsResponse, 1)
go d.handleStats(ctx, stream, out)
return out, nil
}
// handleStats should be launched in a goroutine and handles converting
// the gRPC stream to a channel. Exits either when context is cancelled or the
// stream has an error.
func (d *devicePluginClient) handleStats(
reqCtx context.Context,
stream proto.DevicePlugin_StatsClient,
out chan *StatsResponse) {
defer close(out)
for {
resp, err := stream.Recv()
if err != nil {
if err != io.EOF {
out <- &StatsResponse{
Error: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx),
}
}
// End the stream
return
}
// Send the response
s := &StatsResponse{
Groups: convertProtoDeviceGroupsStats(resp.GetGroups()),
}
select {
case <-reqCtx.Done():
return
case out <- s:
}
}
}