-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
server.go
119 lines (98 loc) · 2.41 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package device
import (
"fmt"
"time"
"github.com/golang/protobuf/ptypes"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/plugins/device/proto"
context "golang.org/x/net/context"
)
// devicePluginServer wraps a device plugin and exposes it via gRPC.
type devicePluginServer struct {
broker *plugin.GRPCBroker
impl DevicePlugin
}
func (d *devicePluginServer) Fingerprint(req *proto.FingerprintRequest, stream proto.DevicePlugin_FingerprintServer) error {
ctx := stream.Context()
outCh, err := d.impl.Fingerprint(ctx)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case resp, ok := <-outCh:
// The output channel has been closed, end the stream
if !ok {
return nil
}
// Handle any error
if resp.Error != nil {
return resp.Error
}
// Convert the devices
out := convertStructDeviceGroups(resp.Devices)
// Build the response
presp := &proto.FingerprintResponse{
DeviceGroup: out,
}
// Send the devices
if err := stream.Send(presp); err != nil {
return err
}
}
}
}
func (d *devicePluginServer) Reserve(ctx context.Context, req *proto.ReserveRequest) (*proto.ReserveResponse, error) {
resp, err := d.impl.Reserve(req.GetDeviceIds())
if err != nil {
return nil, err
}
// Make the response
presp := &proto.ReserveResponse{
ContainerRes: convertStructContainerReservation(resp),
}
return presp, nil
}
func (d *devicePluginServer) Stats(req *proto.StatsRequest, stream proto.DevicePlugin_StatsServer) error {
ctx := stream.Context()
// Retrieve the collection interval
interval, err := ptypes.Duration(req.CollectionInterval)
if err != nil {
return fmt.Errorf("failed to parse collection interval: %v", err)
}
// Default the duration if we get an invalid duration
if interval.Nanoseconds() == 0 {
interval = time.Second
}
outCh, err := d.impl.Stats(ctx, interval)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case resp, ok := <-outCh:
// The output channel has been closed, end the stream
if !ok {
return nil
}
// Handle any error
if resp.Error != nil {
return resp.Error
}
// Convert the devices
out := convertStructDeviceGroupsStats(resp.Groups)
// Build the response
presp := &proto.StatsResponse{
Groups: out,
}
// Send the devices
if err := stream.Send(presp); err != nil {
return err
}
}
}
}