Skip to content

Commit

Permalink
Fix E2E test
Browse files Browse the repository at this point in the history
  • Loading branch information
atanasdinov committed Oct 28, 2022
1 parent eba77f2 commit a5f0006
Showing 1 changed file with 32 additions and 13 deletions.
45 changes: 32 additions & 13 deletions pubsub_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"encoding/json"
"sync"
"testing"
"time"
Expand All @@ -9,23 +10,37 @@ import (
"github.com/stretchr/testify/require"
)

const e2eTestTopic = "e2eTestTopic"

type testMessage struct {
key string
value map[string]string
headers map[string]string
}

func TestE2EPubSub(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test as it requires a connection to Kafka.")
}

const e2eTestTopic = "e2eTestTopic"

producer := NewKafkaProducer(e2eTestTopic)
require.NoError(t, producer.ConnectivityCheck())

consumedMessagesLock := &sync.RWMutex{}
consumedMessages := []Message{}
consumedMessages := []testMessage{}

messageHandler := func(message Message) {
consumedMessagesLock.Lock()
consumedMessages = append(consumedMessages, message)
consumedMessagesLock.Unlock()
defer consumedMessagesLock.Unlock()

var value map[string]string
require.NoError(t, json.Unmarshal(message.Value, &value))

consumedMessages = append(consumedMessages, testMessage{
key: message.Key,
value: value,
headers: message.Headers,
})
}

consumer := NewKafkaConsumer(e2eTestTopic)
Expand All @@ -35,25 +50,29 @@ func TestE2EPubSub(t *testing.T) {

require.NoError(t, consumer.ConnectivityCheck())

producedMessages := []Message{
producedMessages := []testMessage{
{
Key: "id1",
Value: []byte(`{"data":"message 1"}`),
Headers: map[string]string{
key: "id1",
value: map[string]string{
"data": "message 1",
},
headers: map[string]string{
"key1": "value1",
},
},
{
Key: "id2",
Value: []byte(`{"data":"message 2"}`),
Headers: map[string]string{
key: "id2",
value: map[string]string{
"data": "message 2",
},
headers: map[string]string{
"key2": "value2",
"key3": "value3",
},
},
}
for _, message := range producedMessages {
assert.NoError(t, producer.SendMessage(message.Key, message.Value, message.Headers))
assert.NoError(t, producer.SendMessage(message.key, message.value, message.headers))
}

time.Sleep(time.Second) // Let message handling take place.
Expand Down

0 comments on commit a5f0006

Please sign in to comment.