forked from mongodb/mongo-go-driver
/
unified_spec_runner.go
376 lines (318 loc) · 12.8 KB
/
unified_spec_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package unified
import (
"context"
"fmt"
"io/ioutil"
"path"
"strings"
"testing"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/internal/assert"
"go.mongodb.org/mongo-driver/internal/spectest"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
)
var (
skippedTestDescriptions = map[string]string{
// GODRIVER-1773: This test runs a "find" with limit=4 and batchSize=3. It expects batchSize values of three for
// the "find" and one for the "getMore", but we send three for both.
"A successful find event with a getmore and the server kills the cursor (<= 4.4)": "See GODRIVER-1773",
// TODO(GODRIVER-2843): Fix and unskip these test cases.
"Find operation with snapshot": "Test fails frequently. See GODRIVER-2843",
"Write commands with snapshot session do not affect snapshot reads": "Test fails frequently. See GODRIVER-2843",
// TODO(GODRIVER-3043): Avoid Appending Write/Read Concern in Atlas Search
// Index Helper Commands.
"dropSearchIndex ignores read and write concern": "Sync GODRIVER-3074, but skip testing bug GODRIVER-3043",
"listSearchIndexes ignores read and write concern": "Sync GODRIVER-3074, but skip testing bug GODRIVER-3043",
"updateSearchIndex ignores the read and write concern": "Sync GODRIVER-3074, but skip testing bug GODRIVER-3043",
}
logMessageValidatorTimeout = 10 * time.Millisecond
lowHeartbeatFrequency = 50 * time.Millisecond
)
// TestCase holds and runs a unified spec test case
type TestCase struct {
Description string `bson:"description"`
RunOnRequirements []mtest.RunOnBlock `bson:"runOnRequirements"`
SkipReason *string `bson:"skipReason"`
Operations []*operation `bson:"operations"`
ExpectedEvents []*expectedEvents `bson:"expectEvents"`
ExpectLogMessages []*clientLogMessages `bson:"expectLogMessages"`
Outcome []*collectionData `bson:"outcome"`
initialData []*collectionData
createEntities []map[string]*entityOptions
killAllSessions bool
schemaVersion string
entities *EntityMap
loopDone chan struct{}
}
func (tc *TestCase) performsDistinct() bool {
return tc.performsOperation("distinct")
}
func (tc *TestCase) setsFailPoint() bool {
return tc.performsOperation("failPoint")
}
func (tc *TestCase) startsTransaction() bool {
return tc.performsOperation("startTransaction")
}
func (tc *TestCase) performsOperation(name string) bool {
for _, op := range tc.Operations {
if op.Name == name {
return true
}
}
return false
}
// TestFile holds the contents of a unified spec test file
type TestFile struct {
Description string `bson:"description"`
SchemaVersion string `bson:"schemaVersion"`
RunOnRequirements []mtest.RunOnBlock `bson:"runOnRequirements"`
CreateEntities []map[string]*entityOptions `bson:"createEntities"`
InitialData []*collectionData `bson:"initialData"`
TestCases []*TestCase `bson:"tests"`
}
// runTestDirectory runs the files in the given directory, which must be in the unified spec format, with
// expectValidFail determining whether the tests should expect to pass or fail
func runTestDirectory(t *testing.T, directoryPath string, expectValidFail bool) {
for _, filename := range spectest.FindJSONFilesInDir(t, directoryPath) {
t.Run(filename, func(t *testing.T) {
runTestFile(t, path.Join(directoryPath, filename), expectValidFail)
})
}
}
// runTestFile runs the tests in the given file, with expectValidFail determining whether the tests should expect to pass or fail
func runTestFile(t *testing.T, filepath string, expectValidFail bool, opts ...*Options) {
content, err := ioutil.ReadFile(filepath)
assert.Nil(t, err, "ReadFile error for file %q: %v", filepath, err)
fileReqs, testCases := ParseTestFile(t, content, expectValidFail, opts...)
mtOpts := mtest.NewOptions().
RunOn(fileReqs...).
CreateClient(false)
mt := mtest.New(t, mtOpts)
for _, testCase := range testCases {
mtOpts := mtest.NewOptions().
RunOn(testCase.RunOnRequirements...).
CreateClient(false)
mt.RunOpts(testCase.Description, mtOpts, func(mt *mtest.T) {
defer func() {
// catch panics from looking up elements and fail if it's unexpected
if r := recover(); r != nil {
if !expectValidFail {
mt.Fatal(r)
}
}
}()
err := testCase.Run(mt)
if expectValidFail {
if err != nil {
return
}
mt.Fatalf("expected test to error, got nil")
}
if err != nil {
mt.Fatal(err)
}
})
}
}
func parseTestFile(testJSON []byte, opts ...*Options) ([]mtest.RunOnBlock, []*TestCase, error) {
var testFile TestFile
if err := bson.UnmarshalExtJSON(testJSON, false, &testFile); err != nil {
return nil, nil, err
}
op := MergeOptions(opts...)
for _, testCase := range testFile.TestCases {
testCase.initialData = testFile.InitialData
testCase.createEntities = testFile.CreateEntities
testCase.schemaVersion = testFile.SchemaVersion
testCase.entities = newEntityMap()
testCase.loopDone = make(chan struct{})
testCase.killAllSessions = *op.RunKillAllSessions
}
return testFile.RunOnRequirements, testFile.TestCases, nil
}
// ParseTestFile create an array of TestCases from the testJSON json blob
func ParseTestFile(t *testing.T, testJSON []byte, expectValidFail bool, opts ...*Options) ([]mtest.RunOnBlock, []*TestCase) {
t.Helper()
runOnRequirements, testCases, err := parseTestFile(testJSON, opts...)
if !expectValidFail {
assert.NoError(t, err, "error parsing test file")
}
return runOnRequirements, testCases
}
// GetEntities returns a pointer to the EntityMap for the TestCase. This should not be called until after
// the test is run
func (tc *TestCase) GetEntities() *EntityMap {
return tc.entities
}
// EndLoop will cause the runner to stop a loop operation if one is included in the test. If the test has finished
// running, this will panic
func (tc *TestCase) EndLoop() {
tc.loopDone <- struct{}{}
}
// LoggerSkipper is passed to TestCase.Run to allow it to perform logging and skipping operations
type LoggerSkipper interface {
Log(args ...interface{})
Logf(format string, args ...interface{})
Skip(args ...interface{})
Skipf(format string, args ...interface{})
}
// skipTestError indicates that a test must be skipped because the runner cannot execute it (e.g. the test requires
// an operation or option that the driver does not support).
type skipTestError struct {
reason string
}
// Error implements the error interface.
func (s skipTestError) Error() string {
return fmt.Sprintf("test must be skipped: %q", s.reason)
}
func newSkipTestError(reason string) error {
return &skipTestError{reason}
}
func isSkipTestError(err error) bool {
return err != nil && strings.Contains(err.Error(), "test must be skipped")
}
// Run runs the TestCase and returns an error if it fails
func (tc *TestCase) Run(ls LoggerSkipper) error {
if tc.SkipReason != nil {
ls.Skipf("skipping for reason: %q", *tc.SkipReason)
}
if skipReason, ok := skippedTestDescriptions[tc.Description]; ok {
ls.Skipf("skipping due to known failure: %v", skipReason)
}
// Validate that we support the schema declared by the test file before attempting to use its contents.
if err := checkSchemaVersion(tc.schemaVersion); err != nil {
return fmt.Errorf("schema version %q not supported: %v", tc.schemaVersion, err)
}
testCtx := newTestContext(context.Background(), tc.entities, tc.ExpectLogMessages, tc.setsFailPoint())
defer func() {
// If anything fails while doing test cleanup, we only log the error because the actual test may have already
// failed and that failure should be preserved.
for _, err := range disableUntargetedFailPoints(testCtx) {
ls.Log(err)
}
for _, err := range disableTargetedFailPoints(testCtx) {
ls.Log(err)
}
for _, err := range entities(testCtx).close(testCtx) {
ls.Log(err)
}
// Tests that started a transaction should terminate any sessions left open on the server. This is required even
// if the test attempted to commit/abort the transaction because an abortTransaction command can fail if it's
// sent to a mongos that isn't aware of the transaction.
if tc.startsTransaction() && tc.killAllSessions {
if err := terminateOpenSessions(context.Background()); err != nil {
ls.Logf("error terminating open transactions after failed test: %v", err)
}
}
close(tc.loopDone)
}()
// Set up collections based on the file-level initialData field.
for _, collData := range tc.initialData {
if err := collData.createCollection(testCtx); err != nil {
return fmt.Errorf("error setting up collection %q: %v", collData.namespace(), err)
}
}
// Set up entities based on the file-level createEntities field. For client entities, if the test will configure
// a fail point, set a low heartbeatFrequencyMS value into the URI options map if one is not already present.
// This speeds up recovery time for the client if the fail point forces the server to return a state change
// error.
for idx, entity := range tc.createEntities {
for entityType, entityOptions := range entity {
if entityType == "client" && hasOperationalFailpoint(testCtx) {
entityOptions.setHeartbeatFrequencyMS(lowHeartbeatFrequency)
}
if err := tc.entities.addEntity(testCtx, entityType, entityOptions); err != nil {
if isSkipTestError(err) {
ls.Skip(err)
}
return fmt.Errorf("error creating entity at index %d: %v", idx, err)
}
}
}
// Work around SERVER-39704.
if mtest.ClusterTopologyKind() == mtest.Sharded && tc.performsDistinct() {
if err := performDistinctWorkaround(testCtx); err != nil {
return fmt.Errorf("error performing \"distinct\" workaround: %v", err)
}
}
for idx, operation := range tc.Operations {
if err := operation.execute(testCtx, tc.loopDone); err != nil {
if isSkipTestError(err) {
ls.Skip(err)
}
return fmt.Errorf("error running operation %q at index %d: %v", operation.Name, idx, err)
}
}
// Create a validator for log messages and start the workers that will
// observe log messages as they occur operationally.
logMessageValidator := newLogMessageValidator(tc)
go startLogValidators(testCtx, logMessageValidator)
for _, client := range tc.entities.clients() {
client.stopListeningForEvents()
}
// One of the bulkWrite spec tests expects update and updateMany to be grouped together into a single batch,
// but this isn't the case because of GODRIVER-1157. To work around this, we skip event verification for this test.
// This guard should be removed when GODRIVER-1157 is done.
if tc.Description != "BulkWrite on server that doesn't support arrayFilters with arrayFilters on second op" {
for idx, expectedEvents := range tc.ExpectedEvents {
if err := verifyEvents(testCtx, expectedEvents); err != nil {
return fmt.Errorf("events verification failed at index %d: %v", idx, err)
}
}
}
for idx, collData := range tc.Outcome {
if err := collData.verifyContents(testCtx); err != nil {
return fmt.Errorf("error verifying outcome for collection %q at index %d: %v",
collData.namespace(), idx, err)
}
}
{
// Create a context with a deadline to use for log message
// validation. This will prevent any blocking from test cases
// with N messages where only N - K (0 < K < N) messages are
// observed.
ctx, cancel := context.WithTimeout(testCtx, logMessageValidatorTimeout)
defer cancel()
// For each client, verify that all expected log messages were
// received.
if err := stopLogValidators(ctx, logMessageValidator); err != nil {
return fmt.Errorf("error verifying log messages: %w", err)
}
}
return nil
}
func disableUntargetedFailPoints(ctx context.Context) []error {
var errs []error
for fpName, client := range failPoints(ctx) {
if err := disableFailPointWithClient(ctx, fpName, client); err != nil {
errs = append(errs, fmt.Errorf("error disabling fail point %q: %v", fpName, err))
}
}
return errs
}
func disableTargetedFailPoints(ctx context.Context) []error {
var errs []error
for fpName, host := range targetedFailPoints(ctx) {
commandFn := func(ctx context.Context, client *mongo.Client) error {
return disableFailPointWithClient(ctx, fpName, client)
}
if err := runCommandOnHost(ctx, host, commandFn); err != nil {
errs = append(errs, fmt.Errorf("error disabling targeted fail point %q on host %q: %v", fpName, host, err))
}
}
return errs
}
func disableFailPointWithClient(ctx context.Context, fpName string, client *mongo.Client) error {
cmd := bson.D{
{"configureFailPoint", fpName},
{"mode", "off"},
}
return client.Database("admin").RunCommand(ctx, cmd).Err()
}