forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fake_loggerevent_streamingclient.go
129 lines (111 loc) · 3.44 KB
/
fake_loggerevent_streamingclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package fakevtctlclient
import (
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/youtube/vitess/go/vt/logutil"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
// FakeLoggerEventStreamingClient is the base for the fakes for the vtctlclient and vtworkerclient.
// It allows to register a (multi-)line string for a given command and return the result as channel which streams it back.
type FakeLoggerEventStreamingClient struct {
results map[string]*result
// mu guards all fields of the structs.
mu sync.Mutex
}
// NewFakeLoggerEventStreamingClient creates a new fake.
func NewFakeLoggerEventStreamingClient() *FakeLoggerEventStreamingClient {
return &FakeLoggerEventStreamingClient{results: make(map[string]*result)}
}
// generateKey returns a map key for a []string.
// ([]string is not supported as map key.)
func generateKey(args []string) string {
return strings.Join(args, " ")
}
// result contains the result the fake should respond for a given command.
type result struct {
output string
err error
// count is the number of times this result is registered for the same
// command. With each stream of this result, count will be decreased by one.
count int
}
func (r1 result) Equals(r2 result) bool {
return r1.output == r2.output &&
((r1.err == nil && r2.err == nil) ||
(r1.err != nil && r2.err != nil && r1.err.Error() == r2.err.Error()))
}
// RegisterResult registers for a given command (args) the result which the fake should return.
// Once the result was returned, it will be automatically deregistered.
func (f *FakeLoggerEventStreamingClient) RegisterResult(args []string, output string, err error) error {
f.mu.Lock()
defer f.mu.Unlock()
k := generateKey(args)
v := result{output, err, 1}
if result, ok := f.results[k]; ok {
if result.Equals(v) {
result.count++
return nil
}
return fmt.Errorf("A different result (%v) is already registered for command: %v", result, args)
}
f.results[k] = &v
return nil
}
// RegisteredCommands returns a list of commands which are currently registered.
// This is useful to check that all registered results have been consumed.
func (f *FakeLoggerEventStreamingClient) RegisteredCommands() []string {
f.mu.Lock()
defer f.mu.Unlock()
var commands []string
for k := range f.results {
commands = append(commands, k)
}
return commands
}
type streamResultAdapter struct {
lines []string
index int
err error
}
func (s *streamResultAdapter) Recv() (*logutilpb.Event, error) {
if s.index < len(s.lines) {
result := &logutilpb.Event{
Time: logutil.TimeToProto(time.Now()),
Level: logutilpb.Level_CONSOLE,
File: "fakevtctlclient",
Line: -1,
Value: s.lines[s.index],
}
s.index++
return result, nil
}
if s.err == nil {
return nil, io.EOF
}
return nil, s.err
}
// StreamResult returns an EventStream which streams back a registered result as logging events.
func (f *FakeLoggerEventStreamingClient) StreamResult(args []string) (logutil.EventStream, error) {
f.mu.Lock()
defer f.mu.Unlock()
k := generateKey(args)
result, ok := f.results[k]
if !ok {
return nil, fmt.Errorf("No response was registered for args: %v", args)
}
result.count--
if result.count == 0 {
delete(f.results, k)
}
return &streamResultAdapter{
lines: strings.Split(result.output, "\n"),
index: 0,
err: result.err,
}, nil
}