-
Notifications
You must be signed in to change notification settings - Fork 217
/
message_test.go
94 lines (87 loc) · 2.46 KB
/
message_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package kafka_sarama_test
import (
"testing"
"github.com/IBM/sarama"
"github.com/stretchr/testify/require"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/test"
)
var (
testEvent = test.FullEvent()
structuredConsumerMessage = &sarama.ConsumerMessage{
Value: func() []byte {
b, _ := format.JSON.Marshal(&testEvent)
return b
}(),
Headers: []*sarama.RecordHeader{{
Key: []byte("content-type"),
Value: []byte(cloudevents.ApplicationCloudEventsJSON),
}},
}
binaryConsumerMessage = &sarama.ConsumerMessage{
Value: []byte("hello world!"),
Headers: mustToSaramaConsumerHeaders(map[string]string{
"ce_type": testEvent.Type(),
"ce_source": testEvent.Source(),
"ce_id": testEvent.ID(),
"ce_time": test.Timestamp.String(),
"ce_specversion": "1.0",
"ce_dataschema": test.Schema.String(),
"ce_datacontenttype": "text/json",
"ce_subject": "receiverTopic",
"ce_exta": "someext",
}),
}
)
func TestNewMessage(t *testing.T) {
tests := []struct {
name string
consumerMessage *sarama.ConsumerMessage
expectedEncoding binding.Encoding
}{
{
name: "Structured encoding",
consumerMessage: structuredConsumerMessage,
expectedEncoding: binding.EncodingStructured,
},
{
name: "Binary encoding",
consumerMessage: binaryConsumerMessage,
expectedEncoding: binding.EncodingBinary,
},
{
name: "Unknown encoding",
consumerMessage: &sarama.ConsumerMessage{
Value: []byte("{}"),
Headers: []*sarama.RecordHeader{{
Key: []byte("content-type"),
Value: []byte("application/json"),
}},
},
expectedEncoding: binding.EncodingUnknown,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := kafka_sarama.NewMessageFromConsumerMessage(tt.consumerMessage)
require.NotNil(t, got)
require.Equal(t, tt.expectedEncoding, got.ReadEncoding())
})
}
}
func mustToSaramaConsumerHeaders(m map[string]string) []*sarama.RecordHeader {
res := make([]*sarama.RecordHeader, len(m))
i := 0
for k, v := range m {
res[i] = &sarama.RecordHeader{Key: []byte(k), Value: []byte(v)}
i++
}
return res
}