Skip to content

Commit

Permalink
add kafka topic/partition/offset to the extension of event (#896)
Browse files Browse the repository at this point in the history
* add kafka partition/offset to the extension of event

Signed-off-by: myan <myan@redhat.com>

* initialize header with kafka information

Signed-off-by: myan <myan@redhat.com>

* fixed the integration test

Signed-off-by: myan <myan@redhat.com>

* fixed the sarama binding test issue

Signed-off-by: myan <myan@redhat.com>

* add check empty in the integration test

Signed-off-by: myan <myan@redhat.com>

* verfiy the kafka topic value

Signed-off-by: myan <myan@redhat.com>

* clarify the test code

Signed-off-by: myan <myan@redhat.com>

---------

Signed-off-by: myan <myan@redhat.com>
  • Loading branch information
yanmxa committed May 25, 2023
1 parent bc9170f commit b7a65db
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
12 changes: 9 additions & 3 deletions protocol/kafka_sarama/v2/message.go
Expand Up @@ -8,6 +8,7 @@ package kafka_sarama
import (
"bytes"
"context"
"strconv"
"strings"

"github.com/cloudevents/sdk-go/v2/binding"
Expand Down Expand Up @@ -35,22 +36,27 @@ type Message struct {
}

// Check if http.Message implements binding.Message
var _ binding.Message = (*Message)(nil)
var _ binding.MessageMetadataReader = (*Message)(nil)
var (
_ binding.Message = (*Message)(nil)
_ binding.MessageMetadataReader = (*Message)(nil)
)

// NewMessageFromConsumerMessage returns a binding.Message that holds the provided ConsumerMessage.
// The returned binding.Message *can* be read several times safely
// This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance
func NewMessageFromConsumerMessage(cm *sarama.ConsumerMessage) *Message {
var contentType string
headers := make(map[string][]byte, len(cm.Headers))
headers := make(map[string][]byte, len(cm.Headers)+3)
for _, r := range cm.Headers {
k := strings.ToLower(string(r.Key))
if k == contentTypeHeader {
contentType = string(r.Value)
}
headers[k] = r.Value
}
headers[prefix+"kafkaoffset"] = []byte(strconv.FormatInt(cm.Offset, 10))
headers[prefix+"kafkapartition"] = []byte(strconv.FormatInt(int64(cm.Partition), 10))
headers[prefix+"kafkatopic"] = []byte(cm.Topic)
return NewMessage(cm.Value, contentType, headers)
}

Expand Down
23 changes: 18 additions & 5 deletions test/integration/kafka_sarama/kafka_test.go
Expand Up @@ -21,16 +21,30 @@ import (
)

const (
TEST_GROUP_ID = "test_group_id"
TEST_GROUP_ID = "test_group_id"
KAFKA_OFFSET = "kafkaoffset"
KAFKA_PARTITION = "kafkapartition"
KAFKA_TOPIC = "kafkatopic"
)

var TopicName = "test-ce-client-" + uuid.New().String()

func TestSendEvent(t *testing.T) {
test.EachEvent(t, test.Events(), func(t *testing.T, eventIn event.Event) {
eventIn = test.ConvertEventExtensionsToString(t, eventIn)
clienttest.SendReceive(t, func() interface{} {
return protocolFactory(t)
}, eventIn, func(e event.Event) {
test.AssertEventEquals(t, eventIn, test.ConvertEventExtensionsToString(t, e))
}, eventIn, func(eventOut event.Event) {
eventOut = test.ConvertEventExtensionsToString(t, eventOut)

require.Equal(t, TopicName, eventOut.Extensions()[KAFKA_TOPIC])
require.NotNil(t, eventOut.Extensions()[KAFKA_PARTITION])
require.NotNil(t, eventOut.Extensions()[KAFKA_OFFSET])

test.AllOf(
test.HasExactlyAttributesEqualTo(eventIn.Context),
test.HasData(eventIn.Data()),
)
})
})
}
Expand Down Expand Up @@ -60,11 +74,10 @@ func testClient(t testing.TB) sarama.Client {
func protocolFactory(t testing.TB) *kafka_sarama.Protocol {
client := testClient(t)

topicName := "test-ce-client-" + uuid.New().String()
options := []kafka_sarama.ProtocolOptionFunc{
kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID),
}
p, err := kafka_sarama.NewProtocolFromClient(client, topicName, topicName, options...)
p, err := kafka_sarama.NewProtocolFromClient(client, TopicName, TopicName, options...)
require.NoError(t, err)

return p
Expand Down
26 changes: 19 additions & 7 deletions test/integration/kafka_sarama_binding/kafka_test.go
Expand Up @@ -27,11 +27,14 @@ import (
)

const (
TEST_GROUP_ID = "test_group_id"
TEST_GROUP_ID = "test_group_id"
KAFKA_OFFSET = "kafkaoffset"
KAFKA_PARTITION = "kafkapartition"
KAFKA_TOPIC = "kafkatopic"
)

func TestSendStructuredMessageToStructured(t *testing.T) {
close, s, r := testSenderReceiver(t)
close, s, r, _ := testSenderReceiver(t)
defer close()
EachEvent(t, Events(), func(t *testing.T, eventIn event.Event) {
eventIn = ConvertEventExtensionsToString(t, eventIn)
Expand All @@ -46,7 +49,7 @@ func TestSendStructuredMessageToStructured(t *testing.T) {
}

func TestSendBinaryMessageToBinary(t *testing.T) {
close, s, r := testSenderReceiver(t)
close, s, r, topicName := testSenderReceiver(t)
defer close()
EachEvent(t, Events(), func(t *testing.T, eventIn event.Event) {
eventIn = ConvertEventExtensionsToString(t, eventIn)
Expand All @@ -55,7 +58,16 @@ func TestSendBinaryMessageToBinary(t *testing.T) {
test.SendReceive(t, binding.WithPreferredEventEncoding(context.TODO(), binding.EncodingBinary), in, s, r, func(out binding.Message) {
eventOut := MustToEvent(t, context.Background(), out)
assert.Equal(t, binding.EncodingBinary, out.ReadEncoding())
AssertEventEquals(t, eventIn, ConvertEventExtensionsToString(t, eventOut))
eventOut = ConvertEventExtensionsToString(t, eventOut)

require.Equal(t, topicName, eventOut.Extensions()[KAFKA_TOPIC])
require.NotNil(t, eventOut.Extensions()[KAFKA_PARTITION])
require.NotNil(t, eventOut.Extensions()[KAFKA_OFFSET])

AllOf(
HasExactlyAttributesEqualTo(eventIn.Context),
HasData(eventIn.Data()),
)
})
})
}
Expand All @@ -82,7 +94,7 @@ func testClient(t testing.TB) sarama.Client {
return client
}

func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receiver) {
func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receiver, string) {
client := testClient(t)

topicName := "test-ce-client-" + uuid.New().String()
Expand All @@ -97,11 +109,11 @@ func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receive
return func() {
require.NoError(t, p.Close(context.TODO()))
require.NoError(t, client.Close())
}, p, p
}, p, p, topicName
}

func BenchmarkSendReceive(b *testing.B) {
c, s, r := testSenderReceiver(b)
c, s, r, _ := testSenderReceiver(b)
defer c() // Cleanup
test.BenchmarkSendReceive(b, s, r)
}

0 comments on commit b7a65db

Please sign in to comment.