/
stream.go
116 lines (99 loc) · 2.87 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package xds
import (
"context"
"errors"
"io"
envoy_service_discovery "github.com/cilium/proxy/go/envoy/service/discovery/v3"
"github.com/cilium/cilium/pkg/time"
)
// Stream is the subset of the gRPC bi-directional stream types which is used
// by Server.
type Stream interface {
// Send sends a xDS response back to the client.
Send(*envoy_service_discovery.DiscoveryResponse) error
// Recv receives a xDS request from the client.
Recv() (*envoy_service_discovery.DiscoveryRequest, error)
}
// MockStream is a mock implementation of Stream used for testing.
type MockStream struct {
ctx context.Context
recv chan *envoy_service_discovery.DiscoveryRequest
sent chan *envoy_service_discovery.DiscoveryResponse
recvTimeout time.Duration
sentTimeout time.Duration
}
// NewMockStream creates a new mock Stream for testing.
func NewMockStream(ctx context.Context, recvSize, sentSize int, recvTimeout, sentTimeout time.Duration) *MockStream {
return &MockStream{
ctx: ctx,
recv: make(chan *envoy_service_discovery.DiscoveryRequest, recvSize),
sent: make(chan *envoy_service_discovery.DiscoveryResponse, sentSize),
recvTimeout: recvTimeout,
sentTimeout: sentTimeout,
}
}
func (s *MockStream) Send(resp *envoy_service_discovery.DiscoveryResponse) error {
subCtx, cancel := context.WithTimeout(s.ctx, s.sentTimeout)
select {
case <-subCtx.Done():
cancel()
if errors.Is(subCtx.Err(), context.Canceled) {
return io.EOF
}
return subCtx.Err()
case s.sent <- resp:
cancel()
return nil
}
}
func (s *MockStream) Recv() (*envoy_service_discovery.DiscoveryRequest, error) {
subCtx, cancel := context.WithTimeout(s.ctx, s.recvTimeout)
select {
case <-subCtx.Done():
cancel()
if errors.Is(subCtx.Err(), context.Canceled) {
return nil, io.EOF
}
return nil, subCtx.Err()
case req := <-s.recv:
cancel()
return req, nil
}
}
// SendRequest queues a request to be received by calling Recv.
func (s *MockStream) SendRequest(req *envoy_service_discovery.DiscoveryRequest) error {
subCtx, cancel := context.WithTimeout(s.ctx, s.recvTimeout)
select {
case <-subCtx.Done():
cancel()
if errors.Is(subCtx.Err(), context.Canceled) {
return io.EOF
}
return subCtx.Err()
case s.recv <- req:
cancel()
return nil
}
}
// RecvResponse receives a response that was queued by calling Send.
func (s *MockStream) RecvResponse() (*envoy_service_discovery.DiscoveryResponse, error) {
subCtx, cancel := context.WithTimeout(s.ctx, s.sentTimeout)
select {
case <-subCtx.Done():
cancel()
if errors.Is(subCtx.Err(), context.Canceled) {
return nil, io.EOF
}
return nil, subCtx.Err()
case resp := <-s.sent:
cancel()
return resp, nil
}
}
// Close closes the resources used by this MockStream.
func (s *MockStream) Close() {
close(s.recv)
close(s.sent)
}