Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka source: Use Segment's kafka-go client #105

Merged
merged 13 commits into from
Jan 28, 2022
8 changes: 2 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@ RUN curl -sL https://deb.nodesource.com/setup_16.x | bash - &&\
# Build the full app binary
WORKDIR /app
COPY . .
# The Kafka plugin currently uses Confluent's Go client for Kafka
# which uses librdkafka, a C library under the hood, so we set CGO_ENABLED=1.
# Soon we should switch to another, CGo-free, client, so we'll be able to set CGO_ENABLED to 0.
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=1 make build
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 make build
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was curious so I checked the builds. This change, combined with changing to alpine from bitnami/minideb, nets a 79mb reduction in our out-the-door docker image. Nice! 🎉

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I can't really call it an improvement. I reverted to what you put in there, before I made things worse with a CGo dependency.:D


# Copy built binaries to production slim image.
# minideb provides glibc, which librdkafka needs.
FROM bitnami/minideb:bullseye AS final
FROM alpine:3.14 AS final
# HTTP API
EXPOSE 8080/tcp
# gRPC API
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.24.0
github.com/batchcorp/pgoutput v0.3.2
github.com/bufbuild/buf v1.0.0-rc11
github.com/confluentinc/confluent-kafka-go v1.8.2
github.com/dgraph-io/badger/v3 v3.2103.2
github.com/dop251/goja v0.0.0-20210225094849-f3cfc97811c0
github.com/golang/mock v1.6.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490/go.mod h1:eXthEFrGJvWH
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E=
github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand Down
17 changes: 9 additions & 8 deletions pkg/plugins/kafka/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
### General
The Conduit Kafka plugin provides both, a destination and source Kafka connector, for Conduit.
The Conduit Kafka plugin provides both, a source and a destination Kafka connector, for Conduit.

### How it works?
Under the hood, the plugin uses [Confluent's Golang Client for Apache Kafka(tm)](https://github.com/confluentinc/confluent-kafka-go).
This client supports a wide range of configuration parameters, which makes it possible to fine tune the plugin.
Under the hood, the plugin uses [Segment's Go Client for Apache Kafka(tm)](https://github.com/segmentio/kafka-go). It was
chosen since it has no CGo dependency, making it possible to build the plugin for a wider range of platforms and architectures.
It also supports contexts, which will likely use in the future.

#### Source
The Kafka source manages the offsets manually. The main reason for this is that the source connector needs to be able to
"seek" to any offset in a Kafka topic.
A Kafka source connector is represented by a single consumer in a Kafka consumer group. By virtue of that, a source's
logical position is the respective consumer's offset in Kafka. Internally, though, we're not saving the offset as the
position: instead, we're saving the consumer group ID, since that's all which is needed for Kafka to find the offsets for
our consumer.

If a messages is not received from a broker in a specified timeout (which is 5 seconds, and defined by `msgTimeout` in `source.go`),
the Kafka source returns a "recoverable error", which indicates to Conduit that it should try reading data after some time again.
A source is getting associated with a consumer group ID the first time the `Read()` method is called.

#### Destination
The destination connector uses **synchronous** writes to Kafka. Proper buffering support which will enable asynchronous
Expand All @@ -32,7 +34,6 @@ There's no global, plugin configuration. Each connector instance is configured s
|------|---------|-------------|----------|---------------|
|`servers`|destination, source|A list of bootstrap servers to which the plugin will connect.|true| |
|`topic`|destination, source|The topic to which records will be written to.|true| |
|`securityProtocol`|destination, source|Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.|false| |
|`acks`|destination|The number of acknowledgments required before considering a record written to Kafka. Valid values: 0, 1, all|false|`all`|
|`deliveryTimeout`|destination|Message delivery timeout.|false|`10s`|
|`readFromBeginning`|destination|Whether or not to read a topic from beginning (i.e. existing messages or only new messages).|false|`false`|
Expand Down
60 changes: 28 additions & 32 deletions pkg/plugins/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ package kafka

import (
"strconv"
"strings"
"time"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/google/uuid"
skafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go"
)

const (
Expand All @@ -39,46 +38,31 @@ var Required = []string{Servers, Topic}
// When changing this struct, please also change the plugin specification (in main.go) as well as the ReadMe.
type Config struct {
// A list of bootstrap servers, which will be used to discover all the servers in a cluster.
// Maps to "bootstrap.servers" in a Kafka consumer's configuration
Servers string
Servers []string
Topic string
// Maps to "security.protocol" in a Kafka consumer's configuration
SecurityProtocol string
// Maps to "acks" in a Kafka consumer's configuration
Acks skafka.RequiredAcks
// Required acknowledgments when writing messages to a topic:
// Can be: 0, 1, -1 (all)
Acks kafka.RequiredAcks

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to make this configurable?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, it is already nm

DeliveryTimeout time.Duration
// Read all messages present in a source topic.
// Default value: false (only new messages are read)
ReadFromBeginning bool
}

func (c Config) AsKafkaCfg() *kafka.ConfigMap {
kafkaCfg := &kafka.ConfigMap{
"bootstrap.servers": c.Servers,
"group.id": uuid.New().String(),
// because we wan't to be able to 'seek' to specific positions in a topic
// we need to manually manage the consumer state.
"enable.auto.commit": false,
"client.id": "conduit-kafka-source",
}

if c.SecurityProtocol != "" {
// nolint:errcheck // returns nil always
kafkaCfg.SetKey("security.protocol", c.SecurityProtocol)
}
return kafkaCfg
}

func Parse(cfg map[string]string) (Config, error) {
err := checkRequired(cfg)
// todo check if values are valid, e.g. hosts are valid etc.
if err != nil {
return Config{}, err
}
// parse servers
servers, err := split(cfg[Servers])
if err != nil {
return Config{}, cerrors.Errorf("invalid servers: %w", err)
}
var parsed = Config{
Servers: cfg[Servers],
Topic: cfg[Topic],
SecurityProtocol: cfg[SecurityProtocol],
Servers: servers,
Topic: cfg[Topic],
}
// parse acknowledgment setting
ack, err := parseAcks(cfg[Acks])
Expand Down Expand Up @@ -107,12 +91,12 @@ func Parse(cfg map[string]string) (Config, error) {
return parsed, nil
}

func parseAcks(ack string) (skafka.RequiredAcks, error) {
func parseAcks(ack string) (kafka.RequiredAcks, error) {
// when ack is empty, return default (which is 'all')
if ack == "" {
return skafka.RequireAll, nil
return kafka.RequireAll, nil
}
acks := skafka.RequiredAcks(0)
acks := kafka.RequiredAcks(0)
err := acks.UnmarshalText([]byte(ack))
if err != nil {
return 0, cerrors.Errorf("unknown ack mode: %w", err)
Expand Down Expand Up @@ -157,3 +141,15 @@ func checkRequired(cfg map[string]string) error {
func requiredConfigErr(name string) error {
return cerrors.Errorf("%q config value must be set", name)
}

func split(serversString string) ([]string, error) {
split := strings.Split(serversString, ",")
servers := make([]string, 0)
for i, s := range split {
if strings.Trim(s, " ") == "" {
return nil, cerrors.Errorf("empty %d. server", i)
}
servers = append(servers, s)
}
return servers, nil
}
48 changes: 41 additions & 7 deletions pkg/plugins/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,57 @@ func TestParse_ServersMissing(t *testing.T) {
assert.Error(t, err)
}

func TestNewProducer_InvalidServers(t *testing.T) {
testCases := []struct {
name string
config map[string]string
exp string
}{
{
name: "empty server string in the middle",
config: map[string]string{
Servers: "host1:1111,,host2:2222",
Topic: "topic",
},
exp: "invalid servers: empty 1. server",
},
{
name: "single blank server string",
config: map[string]string{
Servers: " ",
Topic: "topic",
},
exp: "invalid servers: empty 0. server",
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
parsed, err := Parse(tc.config)
assert.Equal(t, Config{}, parsed)
assert.Error(t, err)
assert.Equal(t, tc.exp, err.Error())
})
}
}

func TestParse_OneMissing_OnePresent(t *testing.T) {
parsed, err := Parse(map[string]string{
Servers: "localhost:9092"})
Servers: "localhost:9092",
})
assert.Equal(t, Config{}, parsed)
assert.Error(t, err)
}

func TestParse_FullRequired(t *testing.T) {
parsed, err := Parse(map[string]string{
Servers: "localhost:9092",
Topic: "hello-world-topic"})
Topic: "hello-world-topic",
})

assert.Ok(t, err)
assert.True(t, Config{} != parsed, "expected parsed config not to be empty")
assert.Equal(t, "localhost:9092", parsed.Servers)
assert.Equal(t, []string{"localhost:9092"}, parsed.Servers)
assert.Equal(t, "hello-world-topic", parsed.Topic)
}

Expand Down Expand Up @@ -99,10 +135,8 @@ func TestParse_Full(t *testing.T) {
})

assert.Ok(t, err)
assert.True(t, Config{} != parsed, "expected parsed config not to be empty")
assert.Equal(t, "localhost:9092", parsed.Servers)
assert.Equal(t, []string{"localhost:9092"}, parsed.Servers)
assert.Equal(t, "hello-world-topic", parsed.Topic)
assert.Equal(t, "SASL_SSL", parsed.SecurityProtocol)
assert.Equal(t, kafka.RequireAll, parsed.Acks)
assert.Equal(t, int64(1002), parsed.DeliveryTimeout.Milliseconds())
assert.Equal(t, true, parsed.ReadFromBeginning)
Expand Down
Loading