forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache_test_helpers.go
224 lines (184 loc) · 6.33 KB
/
cache_test_helpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package integration
import (
"bytes"
"context"
"os"
"strings"
"testing"
"time"
"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
yaml "gopkg.in/yaml.v3"
"github.com/benthosdev/benthos/v4/internal/component/cache"
"github.com/benthosdev/benthos/v4/internal/component/metrics"
"github.com/benthosdev/benthos/v4/internal/config"
"github.com/benthosdev/benthos/v4/internal/docs"
"github.com/benthosdev/benthos/v4/internal/filepath/ifs"
"github.com/benthosdev/benthos/v4/internal/log"
"github.com/benthosdev/benthos/v4/internal/manager"
)
// CacheTestConfigVars exposes some variables injected into template configs for
// cache unit tests.
type CacheTestConfigVars struct {
// A unique identifier for separating this test configuration from others.
// Usually used to access a different topic, consumer group, directory, etc.
ID string
// A Port to use in connector URLs. Allowing tests to override this
// potentially enables tests that check for faulty connections by bridging.
Port string
// Generic variables.
Var1 string
Var2 string
}
// CachePreTestFn is an optional closure to be called before tests are run, this
// is an opportunity to mutate test config variables and mess with the
// environment.
type CachePreTestFn func(t testing.TB, ctx context.Context, testID string, vars *CacheTestConfigVars)
type cacheTestEnvironment struct {
configTemplate string
configVars CacheTestConfigVars
preTest CachePreTestFn
timeout time.Duration
ctx context.Context
log log.Modular
stats *metrics.Namespaced
}
func newCacheTestEnvironment(t *testing.T, confTemplate string) cacheTestEnvironment {
t.Helper()
u4, err := uuid.NewV4()
require.NoError(t, err)
return cacheTestEnvironment{
configTemplate: confTemplate,
configVars: CacheTestConfigVars{
ID: u4.String(),
},
timeout: time.Second * 90,
ctx: context.Background(),
log: log.Noop(),
stats: metrics.Noop(),
}
}
func (e cacheTestEnvironment) RenderConfig() string {
return strings.NewReplacer(
"$ID", e.configVars.ID,
"$PORT", e.configVars.Port,
"$VAR1", e.configVars.Var1,
"$VAR2", e.configVars.Var2,
).Replace(e.configTemplate)
}
//------------------------------------------------------------------------------
// CacheTestOptFunc is an opt func for customizing the behaviour of cache tests,
// these are useful for things that are integration environment specific, such
// as the port of the service being interacted with.
type CacheTestOptFunc func(*cacheTestEnvironment)
// CacheTestOptTimeout describes an optional timeout spanning the entirety of
// the test suite.
func CacheTestOptTimeout(timeout time.Duration) CacheTestOptFunc {
return func(env *cacheTestEnvironment) {
env.timeout = timeout
}
}
// CacheTestOptLogging allows components to log with the given log level. This
// is useful for diagnosing issues.
func CacheTestOptLogging(level string) CacheTestOptFunc {
return func(env *cacheTestEnvironment) {
logConf := log.NewConfig()
logConf.LogLevel = level
var err error
env.log, err = log.New(os.Stdout, ifs.OS(), logConf)
if err != nil {
panic(err)
}
}
}
// CacheTestOptPort defines the port of the integration service.
func CacheTestOptPort(port string) CacheTestOptFunc {
return func(env *cacheTestEnvironment) {
env.configVars.Port = port
}
}
// CacheTestOptVarOne sets an arbitrary variable for the test that can be
// injected into templated configs.
func CacheTestOptVarOne(v string) CacheTestOptFunc {
return func(env *cacheTestEnvironment) {
env.configVars.Var1 = v
}
}
// CacheTestOptVarTwo sets an arbitrary variable for the test that can be
// injected into templated configs.
func CacheTestOptVarTwo(v string) CacheTestOptFunc {
return func(env *cacheTestEnvironment) {
env.configVars.Var2 = v
}
}
// CacheTestOptPreTest adds a closure to be executed before each test.
func CacheTestOptPreTest(fn CachePreTestFn) CacheTestOptFunc {
return func(env *cacheTestEnvironment) {
env.preTest = fn
}
}
//------------------------------------------------------------------------------
type cacheTestDefinitionFn func(*testing.T, *cacheTestEnvironment)
// CacheTestDefinition encompasses a unit test to be executed against an
// integration environment. These tests are generic and can be run against any
// configuration containing an input and an output that are connected.
type CacheTestDefinition struct {
fn func(*testing.T, *cacheTestEnvironment)
}
// CacheTestList is a list of cache test definitions that can be run with a
// single template and function args.
type CacheTestList []CacheTestDefinition
// CacheTests creates a list of tests from variadic arguments.
func CacheTests(tests ...CacheTestDefinition) CacheTestList {
return tests
}
// Run all the tests against a config template. Tests are run in parallel.
func (i CacheTestList) Run(t *testing.T, configTemplate string, opts ...CacheTestOptFunc) {
for _, test := range i {
env := newCacheTestEnvironment(t, configTemplate)
for _, opt := range opts {
opt(&env)
}
var done func()
env.ctx, done = context.WithTimeout(env.ctx, env.timeout)
t.Cleanup(done)
test.fn(t, &env)
}
}
//------------------------------------------------------------------------------
func namedCacheTest(name string, test cacheTestDefinitionFn) CacheTestDefinition {
return CacheTestDefinition{
fn: func(t *testing.T, env *cacheTestEnvironment) {
t.Run(name, func(t *testing.T) {
t.Parallel()
if env.preTest != nil {
env.preTest(t, env.ctx, env.configVars.ID, &env.configVars)
}
test(t, env)
})
},
}
}
//------------------------------------------------------------------------------
func initCache(t *testing.T, env *cacheTestEnvironment) cache.V1 {
t.Helper()
confBytes := []byte(env.RenderConfig())
s := config.New()
dec := yaml.NewDecoder(bytes.NewReader(confBytes))
dec.KnownFields(true)
require.NoError(t, dec.Decode(&s))
lints, err := config.LintBytes(docs.NewLintConfig(), confBytes)
require.NoError(t, err)
assert.Empty(t, lints)
manager, err := manager.New(s.ResourceConfig, manager.OptSetLogger(env.log), manager.OptSetMetrics(env.stats))
require.NoError(t, err)
var c cache.V1
require.NoError(t, manager.AccessCache(env.ctx, "testcache", func(v cache.V1) {
c = v
}))
return c
}
func closeCache(t *testing.T, cache cache.V1) {
require.NoError(t, cache.Close(context.Background()))
}