-
Notifications
You must be signed in to change notification settings - Fork 197
/
Copy patheventbridge_test.go
127 lines (108 loc) · 4.42 KB
/
eventbridge_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package eventbridge
import (
"context"
"fmt"
"testing"
"time"
"github.com/containerd/containerd/api/events"
eventtypes "github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/typeurl/v2"
"github.com/stretchr/testify/require"
)
const (
namespace = "tests"
)
func TestEventBridgeAttach(t *testing.T) {
ctx, cancel := context.WithCancel(namespaces.WithNamespace(context.Background(), namespace))
source := exchange.NewExchange()
sink := exchange.NewExchange()
getter := NewGetterService(ctx, source)
attachErrCh := Attach(ctx, getter, sink)
verifyPublishAndReceive(ctx, t, source, sink)
cancel()
timeout := time.Second
select {
case err := <-attachErrCh:
if err != context.Canceled {
require.NoError(t, err, "unexpected Attach error")
}
case <-time.After(timeout):
require.Fail(t, "timeout", "waiting for Attach to return timed out after %s", timeout.String())
}
}
func TestEventBridgeRepublish(t *testing.T) {
ctx, cancel := context.WithCancel(namespaces.WithNamespace(context.Background(), namespace))
source := exchange.NewExchange()
sink := exchange.NewExchange()
republishErrCh := Republish(ctx, source, sink)
verifyPublishAndReceive(ctx, t, source, sink)
cancel()
timeout := time.Second
select {
case err := <-republishErrCh:
if err != context.Canceled {
require.NoError(t, err, "unexpected Republish error")
}
case <-time.After(timeout):
require.Fail(t, "timeout", "waiting for Republish to return timed out after %s", timeout.String())
}
}
func verifyPublishAndReceive(ctx context.Context, t *testing.T, source eventtypes.Publisher, sink eventtypes.Subscriber) {
topic := "/just/container/things"
sinkEventCh, sinkErrorCh := sink.Subscribe(ctx, fmt.Sprintf(`topic=="%s"`, topic))
const taskLimit uint32 = 100
for i := range taskLimit {
taskExitEvent := &events.TaskExit{
ContainerID: fmt.Sprintf("container-%d", i),
ID: fmt.Sprintf("id-%d", i),
Pid: i,
ExitStatus: i + 1,
ExitedAt: protobuf.ToTimestamp(time.Now().UTC()),
}
err := source.Publish(ctx, topic, taskExitEvent)
require.NoError(t, err, "error while publishing to source")
timeout := time.Second
select {
case <-time.After(timeout):
require.Fail(t, "timeout", "waiting for published message timed out after %s", timeout.String())
case envelope := <-sinkEventCh:
require.Equal(t, topic, envelope.Topic, "received expected envelope topic")
require.Equal(t, namespace, envelope.Namespace, "received expected envelope namespace")
require.WithinDuration(t, time.Now().UTC(), envelope.Timestamp, timeout, "received expected envelope timestamp")
receivedEvent, err := typeurl.UnmarshalAny(envelope.Event)
require.NoError(t, err, "failed to unmarshal event from topic %s", envelope.Topic)
switch receivedTaskExitEvent := receivedEvent.(type) {
case *events.TaskExit:
require.Equal(t, taskExitEvent.ContainerID, receivedTaskExitEvent.ContainerID, "received expected ContainerID")
require.Equal(t, taskExitEvent.ID, receivedTaskExitEvent.ID, "received expected ID")
require.Equal(t, taskExitEvent.Pid, receivedTaskExitEvent.Pid, "received expected Pid")
require.Equal(t, taskExitEvent.ExitStatus, receivedTaskExitEvent.ExitStatus, "received expected ExitStatus")
require.Equal(t, taskExitEvent.ExitedAt.AsTime(), receivedTaskExitEvent.ExitedAt.AsTime(), "received expected ExitedAt")
default:
require.Fail(t, "unexpected event", "received unexpected event type on topic %s", envelope.Topic)
}
case err := <-sinkErrorCh:
require.Fail(t, "unexpected error", "unexpectedly received on sink error chan: %v", err)
}
select {
case err := <-sinkErrorCh:
require.Fail(t, "unexpected error", "unexpectedly received on sink error chan: %v", err)
default:
}
}
}