-
Notifications
You must be signed in to change notification settings - Fork 123
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: Kafka async produce context cancellation (#1516)
* Kafka integration to highlight async produce bug Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Fix async produce by removing ctx cancel Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Keep the linter happy Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Drop produce timeout now no longer required Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Migrate to dockertest Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> Fix lint issues Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Add integration tag Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Use helper for free port Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> --------- Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>
- Loading branch information
Showing
7 changed files
with
193 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
// Copyright 2021-2023 Zenauth Ltd. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
//go:build integration | ||
// +build integration | ||
|
||
package kafka_test | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
"github.com/ory/dockertest/v3" | ||
"github.com/ory/dockertest/v3/docker" | ||
"github.com/stretchr/testify/require" | ||
"github.com/twmb/franz-go/pkg/kadm" | ||
"github.com/twmb/franz-go/pkg/kgo" | ||
|
||
auditv1 "github.com/cerbos/cerbos/api/genpb/cerbos/audit/v1" | ||
"github.com/cerbos/cerbos/internal/audit" | ||
_ "github.com/cerbos/cerbos/internal/audit/kafka" | ||
"github.com/cerbos/cerbos/internal/config" | ||
"github.com/cerbos/cerbos/internal/util" | ||
) | ||
|
||
const ( | ||
redpandaImage = "redpandadata/redpanda" | ||
redpandaVersion = "v23.1.5" | ||
|
||
defaultIntegrationTopic = "cerbos" | ||
) | ||
|
||
func TestSyncProduce(t *testing.T) { | ||
t.Parallel() | ||
|
||
ctx := context.Background() | ||
|
||
// setup kafka | ||
uri := newKafkaBroker(t, defaultIntegrationTopic) | ||
log, err := newLog(map[string]any{ | ||
"audit": map[string]any{ | ||
"enabled": true, | ||
"backend": "kafka", | ||
"kafka": map[string]any{ | ||
"brokers": []string{uri}, | ||
"topic": defaultIntegrationTopic, | ||
"produceSync": true, | ||
}, | ||
}, | ||
}) | ||
require.NoError(t, err) | ||
|
||
// write audit log entries | ||
err = log.WriteAccessLogEntry(ctx, func() (*auditv1.AccessLogEntry, error) { | ||
return &auditv1.AccessLogEntry{ | ||
CallId: "01ARZ3NDEKTSV4RRFFQ69G5FA1", | ||
}, nil | ||
}) | ||
require.NoError(t, err) | ||
|
||
err = log.WriteDecisionLogEntry(ctx, func() (*auditv1.DecisionLogEntry, error) { | ||
return &auditv1.DecisionLogEntry{ | ||
CallId: "01ARZ3NDEKTSV4RRFFQ69G5FA2", | ||
}, nil | ||
}) | ||
require.NoError(t, err) | ||
|
||
// validate we see this entries in kafka | ||
records, err := fetchKafkaTopic(uri, defaultIntegrationTopic) | ||
require.NoError(t, err) | ||
require.Len(t, records, 2, "unexpected number of published audit log entries") | ||
} | ||
|
||
func TestAsyncProduce(t *testing.T) { | ||
t.Parallel() | ||
|
||
ctx := context.Background() | ||
|
||
// setup kafka | ||
uri := newKafkaBroker(t, defaultIntegrationTopic) | ||
log, err := newLog(map[string]any{ | ||
"audit": map[string]any{ | ||
"enabled": true, | ||
"backend": "kafka", | ||
"kafka": map[string]any{ | ||
"brokers": []string{uri}, | ||
"topic": defaultIntegrationTopic, | ||
"produceSync": false, | ||
}, | ||
}, | ||
}) | ||
require.NoError(t, err) | ||
|
||
// write audit log entries | ||
err = log.WriteAccessLogEntry(ctx, func() (*auditv1.AccessLogEntry, error) { | ||
return &auditv1.AccessLogEntry{ | ||
CallId: "01ARZ3NDEKTSV4RRFFQ69G5FA1", | ||
}, nil | ||
}) | ||
require.NoError(t, err) | ||
|
||
err = log.WriteDecisionLogEntry(ctx, func() (*auditv1.DecisionLogEntry, error) { | ||
return &auditv1.DecisionLogEntry{ | ||
CallId: "01ARZ3NDEKTSV4RRFFQ69G5FA2", | ||
}, nil | ||
}) | ||
require.NoError(t, err) | ||
|
||
// validate we see this entries in kafka, eventually | ||
require.Eventually(t, func() bool { | ||
records, err := fetchKafkaTopic(uri, defaultIntegrationTopic) | ||
require.NoError(t, err) | ||
return len(records) == 2 | ||
}, 10*time.Second, 100*time.Millisecond, "expected to see audit log entries in kafka") | ||
} | ||
|
||
func newKafkaBroker(t *testing.T, topic string) string { | ||
t.Helper() | ||
|
||
hostPort, err := util.GetFreePort() | ||
require.NoError(t, err, "Unable to get free port") | ||
|
||
pool, err := dockertest.NewPool("") | ||
require.NoError(t, err, "Failed to connect to Docker") | ||
|
||
resource, err := pool.RunWithOptions(&dockertest.RunOptions{ | ||
Repository: redpandaImage, | ||
Tag: redpandaVersion, | ||
Cmd: []string{ | ||
"redpanda", | ||
"start", | ||
"--mode", "dev-container", | ||
// kafka admin client will retrieve the advertised address from the broker | ||
// so we need it to use the same port that is exposed on the container | ||
"--advertise-kafka-addr", fmt.Sprintf("localhost:%d", hostPort), | ||
}, | ||
ExposedPorts: []string{ | ||
"9092/tcp", | ||
}, | ||
PortBindings: map[docker.Port][]docker.PortBinding{ | ||
"9092/tcp": {{HostIP: "localhost", HostPort: strconv.Itoa(hostPort)}}, | ||
}, | ||
}, func(config *docker.HostConfig) { | ||
config.AutoRemove = true | ||
}) | ||
require.NoError(t, err, "Failed to start container") | ||
|
||
t.Cleanup(func() { | ||
_ = pool.Purge(resource) | ||
}) | ||
|
||
brokerDSN := fmt.Sprintf("localhost:%d", hostPort) | ||
client, err := kgo.NewClient(kgo.SeedBrokers(brokerDSN)) | ||
require.NoError(t, err) | ||
|
||
require.NoError(t, pool.Retry(func() error { | ||
return client.Ping(context.Background()) | ||
}), "Failed to connect to Kafka") | ||
|
||
// create topic | ||
_, err = kadm.NewClient(client).CreateTopic(context.Background(), 1, 1, nil, topic) | ||
require.NoError(t, err, "Failed to create Kafka topic") | ||
|
||
return brokerDSN | ||
} | ||
|
||
func fetchKafkaTopic(uri, topic string) ([]*kgo.Record, error) { | ||
client, err := kgo.NewClient(kgo.SeedBrokers(uri)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
client.AddConsumeTopics(topic) | ||
|
||
fetches := client.PollFetches(context.Background()) | ||
return fetches.Records(), fetches.Err() | ||
} | ||
|
||
func newLog(m map[string]any) (audit.Log, error) { | ||
cfg, err := config.WrapperFromMap(m) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return audit.NewLogFromConf(context.Background(), cfg) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters