diff --git a/functional_test.go b/functional_test.go index 0e0c9e216..2391725a4 100644 --- a/functional_test.go +++ b/functional_test.go @@ -5,7 +5,6 @@ package sarama import ( "context" "fmt" - toxiproxy "github.com/Shopify/toxiproxy/client" "io" "log" "net" @@ -18,6 +17,8 @@ import ( "strings" "testing" "time" + + toxiproxy "github.com/Shopify/toxiproxy/client" ) const ( @@ -27,19 +28,19 @@ const ( var ( testTopicDetails = map[string]*TopicDetail{ "test.1": { - NumPartitions: 1, + NumPartitions: 1, ReplicationFactor: 3, }, "test.4": { - NumPartitions: 4, + NumPartitions: 4, ReplicationFactor: 3, }, "test.64": { - NumPartitions: 64, + NumPartitions: 64, ReplicationFactor: 3, }, "uncommitted-topic-test-4": { - NumPartitions: 1, + NumPartitions: 1, ReplicationFactor: 3, }, } @@ -80,10 +81,10 @@ func testMain(m *testing.M) int { if !usingExisting { err := prepareDockerTestEnvironment(ctx, &env) if err != nil { - tearDownDockerTestEnvironment(ctx, &env) + _ = tearDownDockerTestEnvironment(ctx, &env) panic(err) } - defer tearDownDockerTestEnvironment(ctx, &env) + defer tearDownDockerTestEnvironment(ctx, &env) // nolint:errcheck } if err := prepareTestTopics(ctx, &env); err != nil { panic(err) @@ -126,7 +127,6 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err return fmt.Errorf("don't know what confluent platform version to use for kafka %s", env.KafkaVersion) } - c := exec.Command("docker-compose", "up", "-d") c.Stdout = os.Stdout c.Stderr = os.Stderr @@ -143,14 +143,14 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err proxyName := fmt.Sprintf("kafka%d", i) proxy, err := env.ToxiproxyClient.CreateProxy( proxyName, - fmt.Sprintf("0.0.0.0:%d", 29090 + i), - fmt.Sprintf("kafka-%d:%d", i, 29090 + i), + fmt.Sprintf("0.0.0.0:%d", 29090+i), + fmt.Sprintf("kafka-%d:%d", i, 29090+i), ) if err != nil { return fmt.Errorf("failed to create toxiproxy: %w", err) } env.Proxies[proxyName] = proxy - env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090 + i)) + env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i)) } // the mapping of confluent platform docker image vesions -> kafka versions can be @@ -175,7 +175,7 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err brokersOk := make([]bool, len(env.KafkaBrokerAddrs)) retryLoop: for j, addr := range env.KafkaBrokerAddrs { - client, err := NewClient([]string{addr},config) + client, err := NewClient([]string{addr}, config) if err != nil { continue } @@ -267,7 +267,7 @@ func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) er func prepareTestTopics(ctx context.Context, env *testEnvironment) error { Logger.Println("creating test topics") var testTopicNames []string - for topic, _ := range testTopicDetails { + for topic := range testTopicDetails { testTopicNames = append(testTopicNames, topic) } @@ -296,7 +296,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { // Start by deleting the test topics (if they already exist) deleteRes, err := controller.DeleteTopics(&DeleteTopicsRequest{ - Topics: testTopicNames, + Topics: testTopicNames, Timeout: 30 * time.Second, }) if err != nil { @@ -334,7 +334,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { // now create the topics empty createRes, err := controller.CreateTopics(&CreateTopicsRequest{ TopicDetails: testTopicDetails, - Timeout: 30 * time.Second, + Timeout: 30 * time.Second, }) if err != nil { return fmt.Errorf("failed to create test topics: %w", err) @@ -360,7 +360,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err) } defer res.Body.Close() - jarFile, err := os.OpenFile(jarName, os.O_WRONLY | os.O_TRUNC | os.O_CREATE, 0644) + jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) if err != nil { return fmt.Errorf("failed opening the uncomitted msg jar: %w", err) }