-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka source: Use Segment's kafka-go client #105
Merged
Merged
Changes from 11 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
88482ca
Kafka source: Use Segment's kafka-go client
hariso 4c9d65f
update docs, cleanup, fix test
hariso 28a8ef8
tweak tests
hariso 09ef8a6
linter
hariso 8c81882
tweak test
hariso d471998
tweak timeout
hariso b957b3a
Merge branch 'main' into haris/segment-consumer
hariso f2ed77f
Merge branch 'main' into haris/segment-consumer
hariso e8c81b5
Merge branch 'main' into haris/segment-consumer
hariso c0fd30f
use context.Background
hariso fc17906
Merge branch 'haris/segment-consumer' of github.com:ConduitIO/conduit…
hariso 32b964c
Merge branch 'main' into haris/segment-consumer
hariso 8765efe
Merge branch 'main' into haris/segment-consumer
hariso File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,12 +16,11 @@ package kafka | |
|
||
import ( | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/conduitio/conduit/pkg/foundation/cerrors" | ||
"github.com/confluentinc/confluent-kafka-go/kafka" | ||
"github.com/google/uuid" | ||
skafka "github.com/segmentio/kafka-go" | ||
"github.com/segmentio/kafka-go" | ||
) | ||
|
||
const ( | ||
|
@@ -39,46 +38,31 @@ var Required = []string{Servers, Topic} | |
// When changing this struct, please also change the plugin specification (in main.go) as well as the ReadMe. | ||
type Config struct { | ||
// A list of bootstrap servers, which will be used to discover all the servers in a cluster. | ||
// Maps to "bootstrap.servers" in a Kafka consumer's configuration | ||
Servers string | ||
Servers []string | ||
Topic string | ||
// Maps to "security.protocol" in a Kafka consumer's configuration | ||
SecurityProtocol string | ||
// Maps to "acks" in a Kafka consumer's configuration | ||
Acks skafka.RequiredAcks | ||
// Required acknowledgments when writing messages to a topic: | ||
// Can be: 0, 1, -1 (all) | ||
Acks kafka.RequiredAcks | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we going to make this configurable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh wait, it is already nm |
||
DeliveryTimeout time.Duration | ||
// Read all messages present in a source topic. | ||
// Default value: false (only new messages are read) | ||
ReadFromBeginning bool | ||
} | ||
|
||
func (c Config) AsKafkaCfg() *kafka.ConfigMap { | ||
kafkaCfg := &kafka.ConfigMap{ | ||
"bootstrap.servers": c.Servers, | ||
"group.id": uuid.New().String(), | ||
// because we wan't to be able to 'seek' to specific positions in a topic | ||
// we need to manually manage the consumer state. | ||
"enable.auto.commit": false, | ||
"client.id": "conduit-kafka-source", | ||
} | ||
|
||
if c.SecurityProtocol != "" { | ||
// nolint:errcheck // returns nil always | ||
kafkaCfg.SetKey("security.protocol", c.SecurityProtocol) | ||
} | ||
return kafkaCfg | ||
} | ||
|
||
func Parse(cfg map[string]string) (Config, error) { | ||
err := checkRequired(cfg) | ||
// todo check if values are valid, e.g. hosts are valid etc. | ||
if err != nil { | ||
return Config{}, err | ||
} | ||
// parse servers | ||
servers, err := split(cfg[Servers]) | ||
if err != nil { | ||
return Config{}, cerrors.Errorf("invalid servers: %w", err) | ||
} | ||
var parsed = Config{ | ||
Servers: cfg[Servers], | ||
Topic: cfg[Topic], | ||
SecurityProtocol: cfg[SecurityProtocol], | ||
Servers: servers, | ||
Topic: cfg[Topic], | ||
} | ||
// parse acknowledgment setting | ||
ack, err := parseAcks(cfg[Acks]) | ||
|
@@ -107,12 +91,12 @@ func Parse(cfg map[string]string) (Config, error) { | |
return parsed, nil | ||
} | ||
|
||
func parseAcks(ack string) (skafka.RequiredAcks, error) { | ||
func parseAcks(ack string) (kafka.RequiredAcks, error) { | ||
// when ack is empty, return default (which is 'all') | ||
if ack == "" { | ||
return skafka.RequireAll, nil | ||
return kafka.RequireAll, nil | ||
} | ||
acks := skafka.RequiredAcks(0) | ||
acks := kafka.RequiredAcks(0) | ||
err := acks.UnmarshalText([]byte(ack)) | ||
if err != nil { | ||
return 0, cerrors.Errorf("unknown ack mode: %w", err) | ||
|
@@ -157,3 +141,15 @@ func checkRequired(cfg map[string]string) error { | |
func requiredConfigErr(name string) error { | ||
return cerrors.Errorf("%q config value must be set", name) | ||
} | ||
|
||
func split(serversString string) ([]string, error) { | ||
split := strings.Split(serversString, ",") | ||
servers := make([]string, 0) | ||
for i, s := range split { | ||
if strings.Trim(s, " ") == "" { | ||
return nil, cerrors.Errorf("empty %d. server", i) | ||
} | ||
servers = append(servers, s) | ||
} | ||
return servers, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was curious so I checked the builds. This change, combined with changing to alpine from bitnami/minideb, nets a 79mb reduction in our out-the-door docker image. Nice! 🎉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, I can't really call it an improvement. I reverted to what you put in there, before I made things worse with a CGo dependency.:D