Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(audit): implement sink for kafka #3204

Merged
merged 1 commit into from
Jul 5, 2024
Merged

Conversation

erka
Copy link
Collaborator

@erka erka commented Jun 22, 2024

related #3146

  • find the right place for event.proto and data type for payload
  • add support for Schema Registry
  • improve test coverage

@erka
Copy link
Collaborator Author

erka commented Jun 22, 2024

While all the steps are straightforward, the main issue lies with the Payload of Event. Defining a schema for any in protobuf is challenging. Althoughavro or json schema might be more flexible in this regard, I haven't researched it yet

Copy link
Collaborator

@markphelps markphelps left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great @erka ! one comment, otherwise lgtm

Anything you need from us?

rpc/flipt/audit/event.proto Outdated Show resolved Hide resolved
@erka erka force-pushed the audit-kafka-sink branch 3 times, most recently from d540f8e to e465ecd Compare June 25, 2024 20:26
@erka erka force-pushed the audit-kafka-sink branch 15 times, most recently from 8d41b4f to a0ecc19 Compare July 1, 2024 14:30
@erka
Copy link
Collaborator Author

erka commented Jul 1, 2024

I'm a bit stuck with Dagger and Redpanda Schema Registry. Redpanda provides a great development Docker image that includes Kafka with Kraft, Schema Registry, and other related services. When I run it locally with Docker, all my tests pass without issue. However, when I run them with Dagger, it fails with the error Client.Timeout exceeded while awaiting headers.

There is an issue report dagger/dagger/issues/6673 suggesting that multiple ports may not be bound, but I tested this additional task and confirmed that there are no issues with the connections.

func (m *Dagger) SchemaRegistry(ctx context.Context) (string, error) {
	k := dag.Container().From("redpandadata/redpanda").
		WithExposedPort(9092, dagger.ContainerWithExposedPortOpts{
			Description: "kafka endpoint",
		}).
		WithExposedPort(8081, dagger.ContainerWithExposedPortOpts{
			Description: "schema registry",
		}).
		WithEnvVariable("REDPANDA_ADVERTISE_KAFKA_ADDRESS", "kafka:9092").
		WithExec(nil).AsService()

	return dag.Container().From("alpine").WithServiceBinding("kafka", k).
		// WithExec([]string{"nc", "-v", "kafka", "9092"}).
		WithExec([]string{"wget", "http://kafka:8081"}).
		Stdout(ctx)
}

If anyone can spot where I might be going wrong, I would greatly appreciate any hints.

@markphelps
Copy link
Collaborator

I'm a bit stuck with Dagger and Redpanda Schema Registry. Redpanda provides a great development Docker image that includes Kafka with Kraft, Schema Registry, and other related services. When I run it locally with Docker, all my tests pass without issue. However, when I run them with Dagger, it fails with the error Client.Timeout exceeded while awaiting headers.

There is an issue report dagger/dagger/issues/6673 suggesting that multiple ports may not be bound, but I tested this additional task and confirmed that there are no issues with the connections.

func (m *Dagger) SchemaRegistry(ctx context.Context) (string, error) {
	k := dag.Container().From("redpandadata/redpanda").
		WithExposedPort(9092, dagger.ContainerWithExposedPortOpts{
			Description: "kafka endpoint",
		}).
		WithExposedPort(8081, dagger.ContainerWithExposedPortOpts{
			Description: "schema registry",
		}).
		WithEnvVariable("REDPANDA_ADVERTISE_KAFKA_ADDRESS", "kafka:9092").
		WithExec(nil).AsService()

	return dag.Container().From("alpine").WithServiceBinding("kafka", k).
		// WithExec([]string{"nc", "-v", "kafka", "9092"}).
		WithExec([]string{"wget", "http://kafka:8081"}).
		Stdout(ctx)
}

If anyone can spot where I might be going wrong, I would greatly appreciate any hints.

cc @levlaz @GeorgeMac for any suggestions. I will take a look this evening myself as well

@levlaz
Copy link
Contributor

levlaz commented Jul 1, 2024

When I run it locally with Docker, all my tests pass without issue.

Hey @erka can you share how you are running this locally with Docker? What specific commands are you using to start the redpanda container?

The code looks good to me - and the proxy is working correctly, you are getting a 404 from kafka:8081 which at least means the kafka container is bound and listening and responding on that port. I suspect the issue has to do with the configuration of the service container itself.

I cannot find anywhere in the docs of redpanda that talks about how to configure things with environment variables (i.e. where does REDPANDA_ADVERTISE_KAFKA_ADDRESS come from?)

Looking at the docker-compose.yml file is the best way to translate to Dagger https://docs.redpanda.com/current/get-started/quick-start/#deploy-redpanda - but the config is very verbose so i'd love to hear how you are running it locally to see if there is a shortcut we can take with a few env vars.

Lastly, I don't think it makes any difference but this does not seem to be necessary WithExec(nil)

@erka erka force-pushed the audit-kafka-sink branch 2 times, most recently from 08a42c5 to 3a58d1e Compare July 2, 2024 05:50
@erka
Copy link
Collaborator Author

erka commented Jul 2, 2024

Thank you @levlaz for the support.

Hey @erka can you share how you are running this locally with Docker? What specific commands are you using to start the redpanda container?

docker run -it -p 9092:9092 -p 8081:8081 --privileged -h kafka redpandadata/redpanda

Also I had /etc/hosts modified with kafka pointed to 127.0.0.1.

The code looks good to me - and the proxy is working correctly, you are getting a 404 from kafka:8081 which at least means the kafka container is bound and listening and responding on that port. I suspect the issue has to do with the configuration of the service container itself.

I cannot find anywhere in the docs of redpanda that talks about how to configure things with environment variables (i.e. where does REDPANDA_ADVERTISE_KAFKA_ADDRESS come from?)

I saw it in the workflow of go kafka client. There is another place docker-composer.yml
which helped me to solve this.

I used kafka as the DNS name to promote the Kafka address, and it works fine for the Flipt container with WithServiceBinding. Inside the Kafka container, the Schema Registry service used the same DNS, and the Kafka container has no records about kafka host so the timeout appears. I hadn't realized that with the dagger alias from WithServiceBinding, it will only be in one container and not in both of them.

Thank you @markphelps @levlaz for help. I could move forward now

@erka erka marked this pull request as ready for review July 3, 2024 11:53
@erka erka requested a review from a team as a code owner July 3, 2024 11:53
@erka erka added the needs docs Requires documentation updates label Jul 3, 2024
Copy link
Collaborator

@markphelps markphelps left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one comment around deleting generated files that don't need to be there (i don't think), otherwise looks great to me! thank you for taking this on @erka !!

sdk/go/audit.sdk.gen.go Outdated Show resolved Hide resolved
Copy link

codecov bot commented Jul 4, 2024

Codecov Report

Attention: Patch coverage is 73.98844% with 45 lines in your changes missing coverage. Please review.

Project coverage is 48.37%. Comparing base (f997fb9) to head (3128e6a).
Report is 465 commits behind head on main.

Files Patch % Lines
internal/server/audit/events.go 0.00% 15 Missing ⚠️
internal/server/audit/kafka/kafka.go 83.11% 7 Missing and 6 partials ⚠️
internal/server/audit/kafka/protobuf.go 81.08% 4 Missing and 3 partials ⚠️
internal/cmd/grpc.go 0.00% 5 Missing and 1 partial ⚠️
internal/server/audit/kafka/avro.go 85.00% 2 Missing and 1 partial ⚠️
internal/server/audit/audit.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##             main    #3204       +/-   ##
===========================================
- Coverage   70.78%   48.37%   -22.41%     
===========================================
  Files          91      162       +71     
  Lines        8729    13105     +4376     
===========================================
+ Hits         6179     6340      +161     
- Misses       2165     6297     +4132     
- Partials      385      468       +83     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Roman Dmytrenko <rdmytrenko@gmail.com>
Copy link
Collaborator

@markphelps markphelps left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing 🤩. Thank you for all the hard work on this

@markphelps markphelps added the automerge Used by Kodiak bot to automerge PRs label Jul 5, 2024
@kodiakhq kodiakhq bot merged commit dae061c into flipt-io:main Jul 5, 2024
33 of 34 checks passed
@erka erka deleted the audit-kafka-sink branch July 5, 2024 13:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
automerge Used by Kodiak bot to automerge PRs needs docs Requires documentation updates
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants