-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka source: Use Segment's kafka-go client #105
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to build and run the docker mage locally and received messages into my local Kafka from a file source.
When I was building the docker image, I compared the current image size of main
to this version and this PR reduces the final build image size by ~79 MB. 👍 🍾
# which uses librdkafka, a C library under the hood, so we set CGO_ENABLED=1. | ||
# Soon we should switch to another, CGo-free, client, so we'll be able to set CGO_ENABLED to 0. | ||
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=1 make build | ||
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 make build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was curious so I checked the builds. This change, combined with changing to alpine from bitnami/minideb, nets a 79mb reduction in our out-the-door docker image. Nice! 🎉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, I can't really call it an improvement. I reverted to what you put in there, before I made things worse with a CGo dependency.:D
"github.com/google/uuid" | ||
skafka "github.com/segmentio/kafka-go" | ||
) | ||
|
||
// todo try optimizing, the test takes 15 seconds to run! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had some tests fail locally and found that it was because I had -timeout
set to a pretty aggressive 10s
and when I went digging for the failure found this. Do we just have to wait on Kafka?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That appears to be the case. From what I could understand from logs, tearing down (i.e. simply closing the client), takes cca. 5 seconds.:S
pkg/plugins/kafka/source_test.go
Outdated
underTest := Source{Consumer: consumerMock, Config: cfg} | ||
rec, err := underTest.Read(context.TODO(), pos) | ||
underTest := kafka.Source{Consumer: consumerMock, Config: cfg} | ||
rec, err := underTest.Read(context.TODO(), []byte(groupID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these context.TODO
s going to be changed in the future or should they be Background
's instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I'll change this.
Acks skafka.RequiredAcks | ||
// Required acknowledgments when writing messages to a topic: | ||
// Can be: 0, 1, -1 (all) | ||
Acks kafka.RequiredAcks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we going to make this configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wait, it is already nm
I'm confused by how the GroupID is being generated/handled here. It looks like we're using the GroupID should be configurable (since a customer might want to use a specific consumer group ID). If it's not set, then we can generate one internally but it needs to be remain consistent throughout the entire lifespan of the connector. |
We talked about this offline, but for the rest: Group ID currently is generated the first time a source is read, and then re-used throughout a connector's lifespan. Ali mentioned that some Kafka deployments will use fixed consumer groups (e.g. DevOps will explicitly create Groups with certain ACLs etc.), so I'll make the group ID configurable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
Description
List of major changes:
kafka_test
. This is because the tests (which were in thekafka
package), used themock
package, which now imports something fromkafka
package, which created an import cycle Go complained about.Read()
method, which should make it possible to read a record from an arbitrary position. To achieve that with Confluent's Kafka client, we were manually managing offsets. Practically, though, never needed to read records from arbitrary positions. Furthermore, in the new plugin interfaces, we won't have to do that at all: the plugin will stream messages to Conduit.I'm about to update the plugin docs in a bit, and also do a bit more of cleanup.
Fixes part of #52
Quick checks: