Skip to content

Commit

Permalink
Try to fix race again - don't reuse clients for sender/receiver
Browse files Browse the repository at this point in the history
Signed-off-by: Doug Davis <dug@microsoft.com>
  • Loading branch information
duglin committed Oct 27, 2023
1 parent d7f845b commit f5c7061
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions test/integration/kafka_sarama_binding/kafka_test.go
Expand Up @@ -10,7 +10,6 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/IBM/sarama"
"github.com/google/uuid"
Expand Down Expand Up @@ -91,25 +90,30 @@ func testClient(t testing.TB) sarama.Client {
}

func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receiver, string) {
client := testClient(t)

topicName := "test-ce-client-" + uuid.New().String()
p, err := kafka_sarama.NewProtocolFromClient(client, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID))

// Create a 'client' and 'protocol" for the Receiver side
clientR := testClient(t)
protocolR, err := kafka_sarama.NewProtocolFromClient(clientR, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID))
require.NoError(t, err)
require.NotNil(t, p)
require.NotNil(t, protocolR)

// Create a 'client' and 'protocol" for the Sender side
clientS := testClient(t)
protocolS, err := kafka_sarama.NewProtocolFromClient(clientS, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID))
require.NoError(t, err)
require.NotNil(t, protocolS)

go func() {
require.NoError(t, p.OpenInbound(context.TODO()))
require.NoError(t, protocolR.OpenInbound(context.TODO()))
}()

// Not perfect but we need to give OpenInbound() as chance to start
// as it's a race condition. I couldn't find something on 'p' to wait for
time.Sleep(6 * time.Second)

return func() {
require.NoError(t, p.Close(context.TODO()))
require.NoError(t, client.Close())
}, p, p, topicName
require.NoError(t, protocolR.Close(context.TODO()))
require.NoError(t, protocolS.Close(context.TODO()))
require.NoError(t, clientR.Close())
require.NoError(t, clientS.Close())
}, protocolS, protocolR, topicName
}

func BenchmarkSendReceive(b *testing.B) {
Expand Down

0 comments on commit f5c7061

Please sign in to comment.