-
Notifications
You must be signed in to change notification settings - Fork 134
/
client.go
141 lines (116 loc) · 3.36 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//go:generate moq -out client_moq.go . JobEventReader
package events
import (
"context"
"sync"
"google.golang.org/grpc"
"github.com/gogo/protobuf/types"
"github.com/armadaproject/armada/internal/common/grpc/grpcpool"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/client"
)
// JobEventReader is the interface for retrieving job set event messages
type JobEventReader interface {
GetJobEventMessage(ctx context.Context, jobReq *api.JobSetRequest) (api.Event_GetJobSetEventsClient, error)
Health(ctx context.Context, empty *types.Empty) (*api.HealthCheckResponse, error)
Close()
}
// EventClient is the local struct for retrieving events from the api using the grpc client
type EventClient struct {
config *client.ApiConnectionDetails
conn *grpc.ClientConn
mux *sync.Mutex
}
// NewEventClient returns a new EventClient
func NewEventClient(config *client.ApiConnectionDetails) *EventClient {
return &EventClient{
config: config,
mux: &sync.Mutex{},
}
}
// GetJobEventMessage performs all the steps for obtaining an event message
func (ec *EventClient) GetJobEventMessage(ctx context.Context, jobReq *api.JobSetRequest) (api.Event_GetJobSetEventsClient, error) {
err := ec.ensureApiConnection()
if err != nil {
return nil, err
}
eventClient := api.NewEventClient(ec.conn)
stream, err := eventClient.GetJobSetEvents(ctx, jobReq)
if err != nil {
return nil, err
}
return stream, nil
}
func (ec *EventClient) Health(ctx context.Context, empty *types.Empty) (*api.HealthCheckResponse, error) {
err := ec.ensureApiConnection()
if err != nil {
return nil, err
}
eventClient := api.NewEventClient(ec.conn)
health, err := eventClient.Health(ctx, empty)
return health, err
}
// Close will close the api connection if established
func (ec *EventClient) Close() {
ec.mux.Lock()
defer ec.mux.Unlock()
if ec.hasConn() {
ec.conn.Close()
ec.conn = nil
}
}
// hasConn tests whether client already has an api conn
func (ec *EventClient) hasConn() bool {
return ec.conn != nil
}
// ensureApiConnection will establish api connection if needed
func (ec *EventClient) ensureApiConnection() error {
if ec.hasConn() {
return nil
}
ec.mux.Lock()
defer ec.mux.Unlock()
conn, connErr := client.CreateApiConnection(ec.config)
if connErr != nil {
return connErr
}
ec.conn = conn
return nil
}
type PooledEventClient struct {
pool *grpcpool.Pool
}
func NewPooledEventClient(pool *grpcpool.Pool) *PooledEventClient {
return &PooledEventClient{
pool: pool,
}
}
// GetJobEventMessage performs all the steps for obtaining an event message
func (pec *PooledEventClient) GetJobEventMessage(ctx context.Context, jobReq *api.JobSetRequest) (api.Event_GetJobSetEventsClient, error) {
cc, err := pec.pool.Get(ctx)
if err != nil {
return nil, err
}
defer cc.Close()
eventClient := api.NewEventClient(cc.ClientConn)
stream, err := eventClient.GetJobSetEvents(ctx, jobReq)
if err != nil {
return nil, err
}
return stream, nil
}
func (pec *PooledEventClient) Health(ctx context.Context, empty *types.Empty) (*api.HealthCheckResponse, error) {
cc, err := pec.pool.Get(ctx)
if err != nil {
return nil, err
}
defer cc.Close()
eventClient := api.NewEventClient(cc.ClientConn)
health, err := eventClient.Health(ctx, empty)
if err != nil {
cc.Unhealthy()
return nil, err
}
return health, err
}
func (ec *PooledEventClient) Close() {}