diff --git a/Dockerfile b/Dockerfile index 500b07411..0908af62d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,14 +16,10 @@ RUN curl -sL https://deb.nodesource.com/setup_16.x | bash - &&\ # Build the full app binary WORKDIR /app COPY . . -# The Kafka plugin currently uses Confluent's Go client for Kafka -# which uses librdkafka, a C library under the hood, so we set CGO_ENABLED=1. -# Soon we should switch to another, CGo-free, client, so we'll be able to set CGO_ENABLED to 0. -RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=1 make build +RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 make build # Copy built binaries to production slim image. -# minideb provides glibc, which librdkafka needs. -FROM bitnami/minideb:bullseye AS final +FROM alpine:3.14 AS final # HTTP API EXPOSE 8080/tcp # gRPC API diff --git a/go.mod b/go.mod index ef854e1a2..a58687341 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.24.0 github.com/batchcorp/pgoutput v0.3.2 github.com/bufbuild/buf v1.0.0-rc11 - github.com/confluentinc/confluent-kafka-go v1.8.2 github.com/dgraph-io/badger/v3 v3.2103.2 github.com/dop251/goja v0.0.0-20210225094849-f3cfc97811c0 github.com/golang/mock v1.6.0 diff --git a/go.sum b/go.sum index 499e0818d..9f201b31e 100644 --- a/go.sum +++ b/go.sum @@ -156,8 +156,6 @@ github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490/go.mod h1:eXthEFrGJvWH github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= -github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E= -github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/pkg/plugins/kafka/README.md b/pkg/plugins/kafka/README.md index 41a0aed85..2ce178183 100644 --- a/pkg/plugins/kafka/README.md +++ b/pkg/plugins/kafka/README.md @@ -1,16 +1,18 @@ ### General -The Conduit Kafka plugin provides both, a destination and source Kafka connector, for Conduit. +The Conduit Kafka plugin provides both, a source and a destination Kafka connector, for Conduit. ### How it works? -Under the hood, the plugin uses [Confluent's Golang Client for Apache Kafka(tm)](https://github.com/confluentinc/confluent-kafka-go). -This client supports a wide range of configuration parameters, which makes it possible to fine tune the plugin. +Under the hood, the plugin uses [Segment's Go Client for Apache Kafka(tm)](https://github.com/segmentio/kafka-go). It was +chosen since it has no CGo dependency, making it possible to build the plugin for a wider range of platforms and architectures. +It also supports contexts, which will likely use in the future. #### Source -The Kafka source manages the offsets manually. The main reason for this is that the source connector needs to be able to -"seek" to any offset in a Kafka topic. +A Kafka source connector is represented by a single consumer in a Kafka consumer group. By virtue of that, a source's +logical position is the respective consumer's offset in Kafka. Internally, though, we're not saving the offset as the +position: instead, we're saving the consumer group ID, since that's all which is needed for Kafka to find the offsets for +our consumer. -If a messages is not received from a broker in a specified timeout (which is 5 seconds, and defined by `msgTimeout` in `source.go`), -the Kafka source returns a "recoverable error", which indicates to Conduit that it should try reading data after some time again. +A source is getting associated with a consumer group ID the first time the `Read()` method is called. #### Destination The destination connector uses **synchronous** writes to Kafka. Proper buffering support which will enable asynchronous @@ -32,7 +34,6 @@ There's no global, plugin configuration. Each connector instance is configured s |------|---------|-------------|----------|---------------| |`servers`|destination, source|A list of bootstrap servers to which the plugin will connect.|true| | |`topic`|destination, source|The topic to which records will be written to.|true| | -|`securityProtocol`|destination, source|Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.|false| | |`acks`|destination|The number of acknowledgments required before considering a record written to Kafka. Valid values: 0, 1, all|false|`all`| |`deliveryTimeout`|destination|Message delivery timeout.|false|`10s`| |`readFromBeginning`|destination|Whether or not to read a topic from beginning (i.e. existing messages or only new messages).|false|`false`| diff --git a/pkg/plugins/kafka/config.go b/pkg/plugins/kafka/config.go index c973a03ca..1f843723b 100644 --- a/pkg/plugins/kafka/config.go +++ b/pkg/plugins/kafka/config.go @@ -16,12 +16,11 @@ package kafka import ( "strconv" + "strings" "time" "github.com/conduitio/conduit/pkg/foundation/cerrors" - "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/google/uuid" - skafka "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go" ) const ( @@ -39,46 +38,31 @@ var Required = []string{Servers, Topic} // When changing this struct, please also change the plugin specification (in main.go) as well as the ReadMe. type Config struct { // A list of bootstrap servers, which will be used to discover all the servers in a cluster. - // Maps to "bootstrap.servers" in a Kafka consumer's configuration - Servers string + Servers []string Topic string - // Maps to "security.protocol" in a Kafka consumer's configuration - SecurityProtocol string - // Maps to "acks" in a Kafka consumer's configuration - Acks skafka.RequiredAcks + // Required acknowledgments when writing messages to a topic: + // Can be: 0, 1, -1 (all) + Acks kafka.RequiredAcks DeliveryTimeout time.Duration // Read all messages present in a source topic. // Default value: false (only new messages are read) ReadFromBeginning bool } -func (c Config) AsKafkaCfg() *kafka.ConfigMap { - kafkaCfg := &kafka.ConfigMap{ - "bootstrap.servers": c.Servers, - "group.id": uuid.New().String(), - // because we wan't to be able to 'seek' to specific positions in a topic - // we need to manually manage the consumer state. - "enable.auto.commit": false, - "client.id": "conduit-kafka-source", - } - - if c.SecurityProtocol != "" { - // nolint:errcheck // returns nil always - kafkaCfg.SetKey("security.protocol", c.SecurityProtocol) - } - return kafkaCfg -} - func Parse(cfg map[string]string) (Config, error) { err := checkRequired(cfg) // todo check if values are valid, e.g. hosts are valid etc. if err != nil { return Config{}, err } + // parse servers + servers, err := split(cfg[Servers]) + if err != nil { + return Config{}, cerrors.Errorf("invalid servers: %w", err) + } var parsed = Config{ - Servers: cfg[Servers], - Topic: cfg[Topic], - SecurityProtocol: cfg[SecurityProtocol], + Servers: servers, + Topic: cfg[Topic], } // parse acknowledgment setting ack, err := parseAcks(cfg[Acks]) @@ -107,12 +91,12 @@ func Parse(cfg map[string]string) (Config, error) { return parsed, nil } -func parseAcks(ack string) (skafka.RequiredAcks, error) { +func parseAcks(ack string) (kafka.RequiredAcks, error) { // when ack is empty, return default (which is 'all') if ack == "" { - return skafka.RequireAll, nil + return kafka.RequireAll, nil } - acks := skafka.RequiredAcks(0) + acks := kafka.RequiredAcks(0) err := acks.UnmarshalText([]byte(ack)) if err != nil { return 0, cerrors.Errorf("unknown ack mode: %w", err) @@ -157,3 +141,15 @@ func checkRequired(cfg map[string]string) error { func requiredConfigErr(name string) error { return cerrors.Errorf("%q config value must be set", name) } + +func split(serversString string) ([]string, error) { + split := strings.Split(serversString, ",") + servers := make([]string, 0) + for i, s := range split { + if strings.Trim(s, " ") == "" { + return nil, cerrors.Errorf("empty %d. server", i) + } + servers = append(servers, s) + } + return servers, nil +} diff --git a/pkg/plugins/kafka/config_test.go b/pkg/plugins/kafka/config_test.go index 6bd5417af..49052a021 100644 --- a/pkg/plugins/kafka/config_test.go +++ b/pkg/plugins/kafka/config_test.go @@ -40,9 +40,45 @@ func TestParse_ServersMissing(t *testing.T) { assert.Error(t, err) } +func TestNewProducer_InvalidServers(t *testing.T) { + testCases := []struct { + name string + config map[string]string + exp string + }{ + { + name: "empty server string in the middle", + config: map[string]string{ + Servers: "host1:1111,,host2:2222", + Topic: "topic", + }, + exp: "invalid servers: empty 1. server", + }, + { + name: "single blank server string", + config: map[string]string{ + Servers: " ", + Topic: "topic", + }, + exp: "invalid servers: empty 0. server", + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + parsed, err := Parse(tc.config) + assert.Equal(t, Config{}, parsed) + assert.Error(t, err) + assert.Equal(t, tc.exp, err.Error()) + }) + } +} + func TestParse_OneMissing_OnePresent(t *testing.T) { parsed, err := Parse(map[string]string{ - Servers: "localhost:9092"}) + Servers: "localhost:9092", + }) assert.Equal(t, Config{}, parsed) assert.Error(t, err) } @@ -50,11 +86,11 @@ func TestParse_OneMissing_OnePresent(t *testing.T) { func TestParse_FullRequired(t *testing.T) { parsed, err := Parse(map[string]string{ Servers: "localhost:9092", - Topic: "hello-world-topic"}) + Topic: "hello-world-topic", + }) assert.Ok(t, err) - assert.True(t, Config{} != parsed, "expected parsed config not to be empty") - assert.Equal(t, "localhost:9092", parsed.Servers) + assert.Equal(t, []string{"localhost:9092"}, parsed.Servers) assert.Equal(t, "hello-world-topic", parsed.Topic) } @@ -99,10 +135,8 @@ func TestParse_Full(t *testing.T) { }) assert.Ok(t, err) - assert.True(t, Config{} != parsed, "expected parsed config not to be empty") - assert.Equal(t, "localhost:9092", parsed.Servers) + assert.Equal(t, []string{"localhost:9092"}, parsed.Servers) assert.Equal(t, "hello-world-topic", parsed.Topic) - assert.Equal(t, "SASL_SSL", parsed.SecurityProtocol) assert.Equal(t, kafka.RequireAll, parsed.Acks) assert.Equal(t, int64(1002), parsed.DeliveryTimeout.Milliseconds()) assert.Equal(t, true, parsed.ReadFromBeginning) diff --git a/pkg/plugins/kafka/consumer.go b/pkg/plugins/kafka/consumer.go index 219bccc35..53d910be7 100644 --- a/pkg/plugins/kafka/consumer.go +++ b/pkg/plugins/kafka/consumer.go @@ -17,174 +17,108 @@ package kafka import ( + "context" "fmt" - "time" "github.com/conduitio/conduit/pkg/foundation/cerrors" - "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/google/uuid" + "github.com/segmentio/kafka-go" ) +// Consumer represents a Kafka consumer in a simplified form, +// with just the functionality which is needed for this plugin. +// A Consumer's offset is being managed by the broker. type Consumer interface { - // Get returns a message from the configured topic, waiting at most 'timeoutMs' milliseconds. - // Returns: - // A message and the client's 'position' in Kafka, if there's no error, OR - // A nil message, the client's position in Kafka, and a nil error, - // if no message was retrieved within the specified timeout, OR - // A nil message, nil position and an error if there was an error while retrieving the message (e.g. broker down). - Get(timeout time.Duration) (*kafka.Message, map[int32]int64, error) + // StartFrom instructs the consumer to connect to a broker and a topic, using the provided consumer group ID. + // The group ID is significant for this consumer's offsets. + // By using the same group ID after a restart, we make sure that the consumer continues from where it left off. + // Returns: An error, if the consumer could not be set to read from the given position, nil otherwise. + StartFrom(config Config, groupID string) error + + // Get returns a message from the configured topic. Waits until a messages is available + // or until it errors out. + // Returns: a message (if available), the consumer group ID and an error (if there was one). + Get(ctx context.Context) (*kafka.Message, string, error) + + Ack() error // Close this consumer and the associated resources (e.g. connections to the broker) Close() - - // StartFrom reads messages from the given topic, starting from the given positions. - // For new partitions or partitions not found in the 'position', - // the reading behavior is specified by 'readFromBeginning' parameter: - // if 'true', then all messages will be read, if 'false', only new messages will be read. - // Returns: An error, if the consumer could not be set to read from the given position, nil otherwise. - StartFrom(topic string, position map[int32]int64, readFromBeginning bool) error } -type confluentConsumer struct { - Consumer *kafka.Consumer - positions map[int32]int64 +type segmentConsumer struct { + reader *kafka.Reader + lastMsgRead *kafka.Message } -// NewConsumer creates a new Kafka consumer. -// The current implementation uses Confluent's Kafka client. -// Full list of configuration properties is available here: -// https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md -func NewConsumer(config Config) (Consumer, error) { - consumer, err := kafka.NewConsumer(config.AsKafkaCfg()) - if err != nil { - return nil, cerrors.Errorf("couldn't create consumer: %w", err) - } - return &confluentConsumer{Consumer: consumer, positions: map[int32]int64{}}, nil +// NewConsumer creates a new Kafka consumer. The consumer needs to be started +// (using the StartFrom method) before actually being used. +func NewConsumer() (Consumer, error) { + return &segmentConsumer{}, nil } -func (c *confluentConsumer) Get(timeout time.Duration) (*kafka.Message, map[int32]int64, error) { - if c.noPositions() { - return nil, nil, cerrors.New("no positions set, call StartFrom first") - } - - endAt := time.Now().Add(timeout) - for timeLeft := -time.Since(endAt); timeLeft > 0; timeLeft = -time.Since(endAt) { - event := c.Consumer.Poll(int(timeLeft.Milliseconds())) - // there are events of other types, but we're not interested in those. - // More info is available here: - // https://docs.confluent.io/5.5.0/clients/confluent-kafka-go/index.html#hdr-Consumer_events - switch v := event.(type) { - case *kafka.Message: - return v, c.updatePosition(v), nil - case kafka.Error: - return nil, nil, cerrors.Errorf("received error from client %v", v) - } +func (c *segmentConsumer) StartFrom(config Config, groupID string) error { + // todo if we can assume that a new Config instance will always be created by calling Parse(), + // and that the instance will not be mutated, then we can leave it out these checks. + if len(config.Servers) == 0 { + return ErrServersMissing } - // no message, no error - return nil, c.updatePosition(nil), nil -} - -func (c *confluentConsumer) StartFrom(topic string, position map[int32]int64, readFromBeginning bool) error { - defaultOffsets, err := c.defaultOffsets(topic, readFromBeginning) - if err != nil { - return cerrors.Errorf("couldn't get default offsets: %w", err) - } - - completePos := merge(defaultOffsets, position) - partitions, err := toKafkaPositions(&topic, completePos) - if err != nil { - return cerrors.Errorf("couldn't get offsets: %w", err) - } - - err = c.Consumer.Assign(partitions) - if err != nil { - return cerrors.Errorf("couldn't assign partitions: %w", err) + if config.Topic == "" { + return ErrTopicMissing } - - c.positions = completePos + c.reader = newReader(config, groupID) return nil } -func (c *confluentConsumer) defaultOffsets(topic string, readFromBeginning bool) (map[int32]int64, error) { - // to get the number of partitions - partitions, err := c.countPartitions(topic) - if err != nil { - return nil, cerrors.Errorf("couldn't count partitions: %w", err) - } - offsets := map[int32]int64{} - - // get last offset for each partition - for i := 0; i < partitions; i++ { - lo, hi, err := c.Consumer.QueryWatermarkOffsets(topic, int32(i), 5000) - if err != nil { - return nil, cerrors.Errorf("couldn't get default offsets: %w", err) - } - offset := hi - if readFromBeginning { - offset = lo - } - offsets[int32(i)] = offset - } - return offsets, nil +func newReader(cfg Config, groupID string) *kafka.Reader { + readerCfg := kafka.ReaderConfig{ + Brokers: cfg.Servers, + Topic: cfg.Topic, + WatchPartitionChanges: true, + } + // Group ID + if groupID == "" { + readerCfg.GroupID = uuid.NewString() + } else { + readerCfg.GroupID = groupID + } + // StartOffset + if cfg.ReadFromBeginning { + readerCfg.StartOffset = kafka.FirstOffset + } else { + readerCfg.StartOffset = kafka.LastOffset + } + return kafka.NewReader(readerCfg) } -func (c *confluentConsumer) countPartitions(topic string) (int, error) { - metadata, err := c.Consumer.GetMetadata(&topic, false, 10000) +func (c *segmentConsumer) Get(ctx context.Context) (*kafka.Message, string, error) { + msg, err := c.reader.FetchMessage(ctx) if err != nil { - return 0, cerrors.Errorf("couldn't get metadata: %w", err) + return nil, "", cerrors.Errorf("couldn't read message: %w", err) } - return len(metadata.Topics[topic].Partitions), nil + c.lastMsgRead = &msg + return &msg, c.readerID(), nil } -func toKafkaPositions(topic *string, position map[int32]int64) ([]kafka.TopicPartition, error) { - partitions := make([]kafka.TopicPartition, 0, len(position)) - for k, v := range position { - offset, err := kafka.NewOffset(v) - if err != nil { - return nil, cerrors.Errorf("invalid offset: %w", err) - } - partitions = append(partitions, kafka.TopicPartition{Topic: topic, Partition: k, Offset: offset}) - } - return partitions, nil -} - -func (c *confluentConsumer) updatePosition(msg *kafka.Message) map[int32]int64 { - if msg == nil { - return c.positions - } - c.positions[msg.TopicPartition.Partition] = c.increment(msg.TopicPartition.Offset) - return c.positions -} - -func (c *confluentConsumer) increment(offset kafka.Offset) int64 { - switch offset { - case kafka.OffsetBeginning, kafka.OffsetEnd, kafka.OffsetInvalid, kafka.OffsetStored: - panic(cerrors.Errorf("got unexpected offset %v", offset)) - default: - return int64(offset) + 1 +func (c *segmentConsumer) Ack() error { + err := c.reader.CommitMessages(context.Background(), *c.lastMsgRead) + if err != nil { + return cerrors.Errorf("couldn't commit messages: %w", err) } + return nil } -func (c *confluentConsumer) Close() { - if c.Consumer == nil { +func (c *segmentConsumer) Close() { + if c.reader == nil { return } - err := c.Consumer.Close() + // this will also make the loops in the reader goroutines stop + err := c.reader.Close() if err != nil { - fmt.Printf("couldn't close consumer due to error: %v\n", err) + fmt.Printf("couldn't close reader: %v\n", err) } } -func (c *confluentConsumer) noPositions() bool { - return len(c.positions) == 0 -} - -func merge(first map[int32]int64, second map[int32]int64) map[int32]int64 { - merged := map[int32]int64{} - for k, v := range first { - merged[k] = v - } - for k, v := range second { - merged[k] = v - } - return merged +func (c *segmentConsumer) readerID() string { + return c.reader.Config().GroupID } diff --git a/pkg/plugins/kafka/consumer_integration_test.go b/pkg/plugins/kafka/consumer_integration_test.go index 0fa4c7ae2..d6b0002bc 100644 --- a/pkg/plugins/kafka/consumer_integration_test.go +++ b/pkg/plugins/kafka/consumer_integration_test.go @@ -14,201 +14,174 @@ //go:build integration -package kafka +package kafka_test import ( "context" "fmt" + "net" "testing" + "time" "github.com/conduitio/conduit/pkg/foundation/assert" "github.com/conduitio/conduit/pkg/foundation/cerrors" - "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/conduitio/conduit/pkg/plugins/kafka" "github.com/google/uuid" + skafka "github.com/segmentio/kafka-go" ) -func TestConfluentClient_StartFrom_EmptyPosition(t *testing.T) { +func TestConsumer_Get_FromBeginning(t *testing.T) { t.Parallel() - cfg := Config{Topic: "TestConfluentClient_" + uuid.NewString(), Servers: "localhost:9092"} - createTopic(t, cfg, 1) - - consumer, err := NewConsumer(cfg) - assert.Ok(t, err) - - err = consumer.StartFrom(cfg.Topic, map[int32]int64{}, true) - defer consumer.Close() - assert.Ok(t, err) -} - -func TestConfluentClient_StartFrom_FromBeginning(t *testing.T) { - t.Parallel() - - cfg := Config{ - Topic: "TestConfluentClient_" + uuid.NewString(), - Servers: "localhost:9092", + cfg := kafka.Config{ + Topic: "TestConsumer_Get_FromBeginning_" + uuid.NewString(), + Servers: []string{"localhost:9092"}, ReadFromBeginning: true, } - // other two partitions should be consumed from beginning - positions := map[int32]int64{0: 1} - - partitions := 3 - createTopic(t, cfg, partitions) - - sendTestMessages(t, cfg, partitions) + createTopic(t, cfg.Topic) + sendTestMessages(t, cfg, 1, 6) - consumer, err := NewConsumer(cfg) + consumer, err := kafka.NewConsumer() defer consumer.Close() assert.Ok(t, err) - err = consumer.StartFrom(cfg.Topic, positions, cfg.ReadFromBeginning) + err = consumer.StartFrom(cfg, uuid.NewString()) assert.Ok(t, err) + time.Sleep(5 * time.Second) - // 1 message from first partition - // +4 messages from 2 partitions which need to be read fully messagesUnseen := map[string]bool{ "test-key-1": true, "test-key-2": true, + "test-key-3": true, "test-key-4": true, "test-key-5": true, "test-key-6": true, } - for i := 1; i <= 5; i++ { - message, _, err := consumer.Get(msgTimeout) + for i := 1; i <= 6; i++ { + message, _, err := waitForMessage(consumer, 200 * time.Millisecond) assert.NotNil(t, message) assert.Ok(t, err) delete(messagesUnseen, string(message.Key)) } assert.Equal(t, 0, len(messagesUnseen)) - - message, updatedPos, err := consumer.Get(msgTimeout) - assert.Ok(t, err) - assert.Nil(t, message) - assert.Equal( - t, - map[int32]int64{0: 2, 1: 2, 2: 2}, - updatedPos, - ) } -func TestConfluentClient_StartFrom(t *testing.T) { - cases := []struct { - name string - cfg Config - positions map[int32]int64 - }{ - { - name: "StartFrom: Only new", - cfg: Config{ - Topic: "TestConfluentClient_" + uuid.NewString(), - Servers: "localhost:9092", - ReadFromBeginning: false, - }, - positions: map[int32]int64{0: 1}, - }, - { - name: "StartFrom: Simple test", - cfg: Config{ - Topic: "TestConfluentClient_" + uuid.NewString(), - Servers: "localhost:9092", - }, - positions: map[int32]int64{0: 1, 1: 2, 2: 2}, - }, - } +func TestConsumer_Get_OnlyNew(t *testing.T) { + t.Parallel() - for _, tt := range cases { - // https://github.com/golang/go/wiki/CommonMistakes#using-goroutines-on-loop-iterator-variables - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - testConfluentClient_StartFrom(t, tt.cfg, tt.positions) - }) + cfg := kafka.Config{ + Topic: "TestConsumer_Get_OnlyNew_" + uuid.NewString(), + Servers: []string{"localhost:9092"}, + ReadFromBeginning: false, } -} - -func testConfluentClient_StartFrom(t *testing.T, cfg Config, positions map[int32]int64) { - partitions := 3 - createTopic(t, cfg, partitions) + createTopic(t, cfg.Topic) + sendTestMessages(t, cfg, 1, 6) - sendTestMessages(t, cfg, partitions) - - consumer, err := NewConsumer(cfg) + consumer, err := kafka.NewConsumer() defer consumer.Close() assert.Ok(t, err) - err = consumer.StartFrom(cfg.Topic, positions, cfg.ReadFromBeginning) + err = consumer.StartFrom(cfg, uuid.NewString()) assert.Ok(t, err) + time.Sleep(4 * time.Second) - message, _, err := consumer.Get(msgTimeout) - assert.NotNil(t, message) - assert.Ok(t, err) - assert.Equal(t, "test-key-6", string(message.Key)) - assert.Equal(t, "test-payload-6", string(message.Value)) + sendTestMessages(t, cfg, 7, 9) - message, updatedPos, err := consumer.Get(msgTimeout) - assert.Ok(t, err) - assert.Nil(t, message) - assert.Equal( - t, - map[int32]int64{0: 2, 1: 2, 2: 2}, - updatedPos, - ) + messagesUnseen := map[string]bool{ + "test-key-7": true, + "test-key-8": true, + "test-key-9": true, + } + for i := 1; i <= 3; i++ { + message, _, err := waitForMessage(consumer, 200 * time.Millisecond) + assert.NotNil(t, message) + assert.Ok(t, err) + delete(messagesUnseen, string(message.Key)) + } + assert.Equal(t, 0, len(messagesUnseen)) } -// partition 0 has messages: 3 and 6 -// partition 1 has messages: 1 and 4 -// partition 2 has messages: 2 and 5 -func sendTestMessages(t *testing.T, cfg Config, partitions int) { - producer, err := kafka.NewProducer(cfg.AsKafkaCfg()) - defer producer.Close() - assert.Ok(t, err) +func waitForMessage(consumer kafka.Consumer, timeout time.Duration) (*skafka.Message, string, error) { + c := make(chan struct { + msg *skafka.Message + pos string + err error + }) + + go func() { + msg, pos, err := consumer.Get(context.Background()) + c <- struct { + msg *skafka.Message + pos string + err error + }{msg: msg, pos: pos, err: err} + }() + + select { + case r := <-c: + return r.msg, r.pos, r.err // completed normally + case <-time.After(timeout): + return nil, "", cerrors.New("timed out while waiting for message") // timed out + } +} - for i := 1; i <= 6; i++ { - err = sendTestMessage( - producer, - cfg.Topic, +func sendTestMessages(t *testing.T, cfg kafka.Config, from int, to int) { + writer := skafka.Writer{ + Addr: skafka.TCP(cfg.Servers...), + Topic: cfg.Topic, + BatchSize: 1, + BatchTimeout: 10 * time.Millisecond, + WriteTimeout: cfg.DeliveryTimeout, + RequiredAcks: cfg.Acks, + MaxAttempts: 2, + } + defer writer.Close() + + for i := from; i <= to; i++ { + err := sendTestMessage( + &writer, fmt.Sprintf("test-key-%d", i), fmt.Sprintf("test-payload-%d", i), - int32(i%partitions), ) assert.Ok(t, err) } - unflushed := producer.Flush(5000) - assert.Equal(t, 0, unflushed) } -func sendTestMessage(producer *kafka.Producer, topic string, key string, payload string, partition int32) error { - return producer.Produce( - &kafka.Message{ - Key: []byte(key), - TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: partition}, - Value: []byte(payload), +func sendTestMessage(writer *skafka.Writer, key string, payload string) error { + return writer.WriteMessages( + context.Background(), + skafka.Message{ + Key: []byte(key), + Value: []byte(payload), }, - make(chan kafka.Event, 10), ) } func TestGet_KafkaDown(t *testing.T) { t.Parallel() - cfg := Config{Topic: "client_integration_test_topic", Servers: "localhost:12345"} - consumer, err := NewConsumer(cfg) + cfg := kafka.Config{Topic: "client_integration_test_topic", Servers: []string{"localhost:12345"}} + consumer, err := kafka.NewConsumer() assert.Ok(t, err) - err = consumer.StartFrom(cfg.Topic, map[int32]int64{0: 123}, true) - assert.Error(t, err) - var kerr kafka.Error - if !cerrors.As(err, &kerr) { - t.Fatal("expected kafka.Error") - } - assert.Equal(t, kafka.ErrTransport, kerr.Code()) + err = consumer.StartFrom(cfg, "") + assert.Ok(t, err) + + msg, _, err := consumer.Get(context.Background()) + assert.Nil(t, msg) + var cause *net.OpError + as := cerrors.As(err, &cause) + assert.True(t, as, "expected net.OpError") + assert.Equal(t, "dial", cause.Op) + assert.Equal(t, "tcp", cause.Net) } -func createTopic(t *testing.T, cfg Config, partitions int) { - kafkaCfg := cfg.AsKafkaCfg() - adminClient, _ := kafka.NewAdminClient(kafkaCfg) - defer adminClient.Close() +func createTopic(t *testing.T, topic string) { + c, err := skafka.Dial("tcp", "localhost:9092") + assert.Ok(t, err) + defer c.Close() - _, err := adminClient.CreateTopics(context.Background(), []kafka.TopicSpecification{{Topic: cfg.Topic, NumPartitions: partitions}}) + kt := skafka.TopicConfig{Topic: topic, NumPartitions: 3, ReplicationFactor: 1} + err = c.CreateTopics(kt) assert.Ok(t, err) } diff --git a/pkg/plugins/kafka/destination_integration_test.go b/pkg/plugins/kafka/destination_integration_test.go index a7a6b62ec..4f46b7a7a 100644 --- a/pkg/plugins/kafka/destination_integration_test.go +++ b/pkg/plugins/kafka/destination_integration_test.go @@ -14,7 +14,7 @@ //go:build integration -package kafka +package kafka_test import ( "context" @@ -24,24 +24,21 @@ import ( "github.com/conduitio/conduit/pkg/foundation/assert" "github.com/conduitio/conduit/pkg/plugins" + "github.com/conduitio/conduit/pkg/plugins/kafka" "github.com/conduitio/conduit/pkg/record" - "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/google/uuid" + skafka "github.com/segmentio/kafka-go" ) // todo try optimizing, the test takes 15 seconds to run! func TestDestination_Write_Simple(t *testing.T) { - // Kafka is started in Docker - consumer := newKafkaConsumer(t) - defer consumer.Close() - // prepare test data cfg := newTestConfig(t) - createDestinationTopic(t, consumer, cfg.Settings[Topic]) + createTopic(t, cfg.Settings[kafka.Topic]) record := testRecord() // prepare SUT - underTest := Destination{} + underTest := kafka.Destination{} openErr := underTest.Open(context.Background(), cfg) defer underTest.Teardown() assert.Ok(t, openErr) @@ -51,42 +48,25 @@ func TestDestination_Write_Simple(t *testing.T) { assert.Ok(t, writeErr) assert.Equal(t, record.Position, result) - message := waitForMessage(consumer, 2000, cfg.Settings[Topic]) - assert.NotNil(t, message) + message, err := waitForReaderMessage(cfg.Settings[kafka.Topic], 10*time.Second) + assert.Ok(t, err) assert.Equal(t, record.Payload.Bytes(), message.Value) } -func newTestConfig(t *testing.T) plugins.Config { - return plugins.Config{Settings: map[string]string{ - Servers: "localhost:9092", - Topic: t.Name() + uuid.NewString(), - }} -} - -func waitForMessage(consumer *kafka.Consumer, timeoutMs int, topic string) *kafka.Message { - consumer.SubscribeTopics([]string{topic}, nil) +func waitForReaderMessage(topic string, timeout time.Duration) (skafka.Message, error) { + // Kafka is started in Docker + reader := newKafkaReader(topic) + defer reader.Close() - var message *kafka.Message - waited := 0 - for waited < timeoutMs && message == nil { - event := consumer.Poll(100) - messageMaybe, ok := event.(*kafka.Message) - if ok { - message = messageMaybe - } - } - return message + withTimeout, _ := context.WithTimeout(context.Background(), timeout) + return reader.ReadMessage(withTimeout) } -func createDestinationTopic(t *testing.T, consumer *kafka.Consumer, topic string) { - adminClient, _ := kafka.NewAdminClientFromConsumer(consumer) - defer adminClient.Close() - - _, err := adminClient.CreateTopics( - context.Background(), - []kafka.TopicSpecification{{Topic: topic, NumPartitions: 1}}, - ) - assert.Ok(t, err) +func newTestConfig(t *testing.T) plugins.Config { + return plugins.Config{Settings: map[string]string{ + kafka.Servers: "localhost:9092", + kafka.Topic: t.Name() + uuid.NewString(), + }} } func testRecord() record.Record { @@ -100,15 +80,11 @@ func testRecord() record.Record { } } -func newKafkaConsumer(t *testing.T) (consumer *kafka.Consumer) { - consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ - "bootstrap.servers": "localhost:9092", - "auto.offset.reset": "earliest", - "enable.auto.commit": false, - "group.id": "None"}) - - if err != nil { - t.Fatalf("Failed to create consumer: %s\n", err) - } - return +func newKafkaReader(topic string) (reader *skafka.Reader) { + return skafka.NewReader(skafka.ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: topic, + StartOffset: skafka.FirstOffset, + GroupID: uuid.NewString(), + }) } diff --git a/pkg/plugins/kafka/destination_test.go b/pkg/plugins/kafka/destination_test.go index c8f193d28..cf24562e1 100644 --- a/pkg/plugins/kafka/destination_test.go +++ b/pkg/plugins/kafka/destination_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kafka +package kafka_test import ( "context" @@ -23,6 +23,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/assert" "github.com/conduitio/conduit/pkg/plugins" + "github.com/conduitio/conduit/pkg/plugins/kafka" "github.com/conduitio/conduit/pkg/plugins/kafka/mock" "github.com/conduitio/conduit/pkg/record" "github.com/golang/mock/gomock" @@ -30,21 +31,21 @@ import ( ) func TestOpen_FailsWhenConfigEmpty(t *testing.T) { - underTest := Destination{} + underTest := kafka.Destination{} err := underTest.Open(context.TODO(), plugins.Config{}) assert.Error(t, err) assert.True(t, strings.HasPrefix(err.Error(), "config is invalid:"), "incorrect error msg") } func TestOpen_FailsWhenConfigInvalid(t *testing.T) { - underTest := Destination{} + underTest := kafka.Destination{} err := underTest.Open(context.TODO(), plugins.Config{Settings: map[string]string{"foobar": "foobar"}}) assert.Error(t, err) assert.True(t, strings.HasPrefix(err.Error(), "config is invalid:"), "incorrect error msg") } func TestOpen_KafkaProducerCreated(t *testing.T) { - underTest := Destination{} + underTest := kafka.Destination{} err := underTest.Open(context.TODO(), config()) assert.Ok(t, err) assert.NotNil(t, underTest.Client) @@ -59,7 +60,7 @@ func TestTeardown_ClosesClient(t *testing.T) { Close(). Return() - underTest := Destination{Client: clientMock, Config: connectorCfg()} + underTest := kafka.Destination{Client: clientMock, Config: connectorCfg()} assert.Ok(t, underTest.Teardown()) } @@ -77,15 +78,15 @@ func TestWrite_ClientSendsMessage(t *testing.T) { ). Return(nil) - underTest := Destination{Client: clientMock, Config: connectorCfg()} + underTest := kafka.Destination{Client: clientMock, Config: connectorCfg()} res, err := underTest.Write(context.TODO(), rec) assert.Ok(t, err) assert.NotNil(t, res) } -func connectorCfg() Config { - cfg, _ := Parse(configMap()) +func connectorCfg() kafka.Config { + cfg, _ := kafka.Parse(configMap()) return cfg } @@ -94,7 +95,7 @@ func config() plugins.Config { } func configMap() map[string]string { - return map[string]string{Servers: "localhost:9092", Topic: "test"} + return map[string]string{kafka.Servers: "localhost:9092", kafka.Topic: "test"} } func testRec() record.Record { diff --git a/pkg/plugins/kafka/mock/consumer.go b/pkg/plugins/kafka/mock/consumer.go index 2dbbe3e3c..06fd3aba4 100644 --- a/pkg/plugins/kafka/mock/consumer.go +++ b/pkg/plugins/kafka/mock/consumer.go @@ -5,11 +5,12 @@ package mock import ( + context "context" reflect "reflect" - time "time" - kafka "github.com/confluentinc/confluent-kafka-go/kafka" + kafka "github.com/conduitio/conduit/pkg/plugins/kafka" gomock "github.com/golang/mock/gomock" + kafka0 "github.com/segmentio/kafka-go" ) // Consumer is a mock of Consumer interface. @@ -35,6 +36,20 @@ func (m *Consumer) EXPECT() *ConsumerMockRecorder { return m.recorder } +// Ack mocks base method. +func (m *Consumer) Ack() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Ack") + ret0, _ := ret[0].(error) + return ret0 +} + +// Ack indicates an expected call of Ack. +func (mr *ConsumerMockRecorder) Ack() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ack", reflect.TypeOf((*Consumer)(nil).Ack)) +} + // Close mocks base method. func (m *Consumer) Close() { m.ctrl.T.Helper() @@ -48,11 +63,11 @@ func (mr *ConsumerMockRecorder) Close() *gomock.Call { } // Get mocks base method. -func (m *Consumer) Get(arg0 time.Duration) (*kafka.Message, map[int32]int64, error) { +func (m *Consumer) Get(arg0 context.Context) (*kafka0.Message, string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Get", arg0) - ret0, _ := ret[0].(*kafka.Message) - ret1, _ := ret[1].(map[int32]int64) + ret0, _ := ret[0].(*kafka0.Message) + ret1, _ := ret[1].(string) ret2, _ := ret[2].(error) return ret0, ret1, ret2 } @@ -64,15 +79,15 @@ func (mr *ConsumerMockRecorder) Get(arg0 interface{}) *gomock.Call { } // StartFrom mocks base method. -func (m *Consumer) StartFrom(arg0 string, arg1 map[int32]int64, arg2 bool) error { +func (m *Consumer) StartFrom(arg0 kafka.Config, arg1 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StartFrom", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "StartFrom", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // StartFrom indicates an expected call of StartFrom. -func (mr *ConsumerMockRecorder) StartFrom(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *ConsumerMockRecorder) StartFrom(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartFrom", reflect.TypeOf((*Consumer)(nil).StartFrom), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartFrom", reflect.TypeOf((*Consumer)(nil).StartFrom), arg0, arg1) } diff --git a/pkg/plugins/kafka/producer.go b/pkg/plugins/kafka/producer.go index 9cfd7f62e..da5e8c1c5 100644 --- a/pkg/plugins/kafka/producer.go +++ b/pkg/plugins/kafka/producer.go @@ -18,7 +18,6 @@ package kafka import ( "context" - "strings" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/segmentio/kafka-go" @@ -40,18 +39,15 @@ type segmentProducer struct { // NewProducer creates a new Kafka producer. // The current implementation uses Segment's kafka-go client. func NewProducer(config Config) (Producer, error) { - if config.Servers == "" { + if len(config.Servers) == 0 { return nil, ErrServersMissing } if config.Topic == "" { return nil, ErrTopicMissing } - servers, err := split(config.Servers) - if err != nil { - return nil, cerrors.Errorf("invalid servers: %w", err) - } + writer := &kafka.Writer{ - Addr: kafka.TCP(servers...), + Addr: kafka.TCP(config.Servers...), Topic: config.Topic, BatchSize: 1, WriteTimeout: config.DeliveryTimeout, @@ -63,18 +59,6 @@ func NewProducer(config Config) (Producer, error) { return &segmentProducer{writer: writer}, nil } -func split(serversString string) ([]string, error) { - split := strings.Split(serversString, ",") - servers := make([]string, 0) - for i, s := range split { - if strings.Trim(s, " ") == "" { - return nil, cerrors.Errorf("empty %d. server", i) - } - servers = append(servers, s) - } - return servers, nil -} - func (c *segmentProducer) Send(key []byte, payload []byte) error { err := c.writer.WriteMessages( context.Background(), diff --git a/pkg/plugins/kafka/producer_test.go b/pkg/plugins/kafka/producer_test.go index 87eeecc91..bc2d17e22 100644 --- a/pkg/plugins/kafka/producer_test.go +++ b/pkg/plugins/kafka/producer_test.go @@ -12,75 +12,41 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kafka +package kafka_test import ( "testing" "github.com/conduitio/conduit/pkg/foundation/assert" "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/plugins/kafka" ) func TestNewProducer_MissingRequired(t *testing.T) { testCases := []struct { name string - config Config + config kafka.Config exp error }{ { name: "servers missing", - config: Config{Topic: "topic"}, - exp: ErrServersMissing, + config: kafka.Config{Topic: "topic"}, + exp: kafka.ErrServersMissing, }, { name: "topic missing", - config: Config{Servers: "irrelevant servers"}, - exp: ErrTopicMissing, + config: kafka.Config{Servers: []string{"irrelevant servers"}}, + exp: kafka.ErrTopicMissing, }, } for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { - producer, err := NewProducer(tc.config) + producer, err := kafka.NewProducer(tc.config) assert.Nil(t, producer) assert.Error(t, err) assert.True(t, cerrors.Is(err, tc.exp), "expected "+tc.exp.Error()) }) } } - -func TestNewProducer_InvalidServers(t *testing.T) { - testCases := []struct { - name string - config Config - exp string - }{ - { - name: "empty server string in the middle", - config: Config{ - Servers: "host1:1111,,host2:2222", - Topic: "topic", - }, - exp: "invalid servers: empty 1. server", - }, - { - name: "single blank server string", - config: Config{ - Servers: " ", - Topic: "topic", - }, - exp: "invalid servers: empty 0. server", - }, - } - - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - producer, err := NewProducer(tc.config) - assert.Nil(t, producer) - assert.Error(t, err) - assert.Equal(t, tc.exp, err.Error()) - }) - } -} diff --git a/pkg/plugins/kafka/source.go b/pkg/plugins/kafka/source.go index d9146257e..d046e240a 100644 --- a/pkg/plugins/kafka/source.go +++ b/pkg/plugins/kafka/source.go @@ -17,18 +17,15 @@ package kafka import ( "bytes" "context" - "encoding/json" "fmt" "time" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/plugins" "github.com/conduitio/conduit/pkg/record" - "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/segmentio/kafka-go" ) -const msgTimeout = time.Second * 5 - type Source struct { Consumer Consumer Config Config @@ -43,7 +40,7 @@ func (s *Source) Open(ctx context.Context, cfg plugins.Config) error { } s.Config = parsed - client, err := NewConsumer(s.Config) + client, err := NewConsumer() if err != nil { return cerrors.Errorf("failed to create Kafka client: %w", err) } @@ -69,14 +66,14 @@ func (s *Source) Read(ctx context.Context, position record.Position) (record.Rec return record.Record{}, cerrors.Errorf("couldn't start from position: %w", err) } - message, positions, err := s.Consumer.Get(msgTimeout) + message, kafkaPos, err := s.Consumer.Get(ctx) if err != nil { return record.Record{}, cerrors.Errorf("failed getting a message %w", err) } if message == nil { return record.Record{}, plugins.ErrEndData } - rec, err := toRecord(message, positions) + rec, err := toRecord(message, kafkaPos) if err != nil { return record.Record{}, cerrors.Errorf("couldn't transform record %w", err) } @@ -85,21 +82,12 @@ func (s *Source) Read(ctx context.Context, position record.Position) (record.Rec } func (s *Source) startFrom(position record.Position) error { - // The check is in place, to avoid instructing the Kafka client to "seek" - // to a position if it's already at it. - // The position is actually a map, but we compare the "raw" byte representations - // to avoid needless parsing. - // Note: Map key ordering is not guaranteed, however, - // json.Marshall() orders the keys itself, so using it in comparisons is safe. + // The check is in place, to avoid reconstructing the Kafka consumer. if s.lastPositionRead != nil && bytes.Equal(s.lastPositionRead, position) { return nil } - positionMap, err := toKafkaPosition(position) - if err != nil { - return cerrors.Errorf("invalid position %v %w", string(position), err) - } - err = s.Consumer.StartFrom(s.Config.Topic, positionMap, s.Config.ReadFromBeginning) + err := s.Consumer.StartFrom(s.Config, string(position)) if err != nil { return cerrors.Errorf("couldn't start from given position %v due to %w", string(position), err) } @@ -107,26 +95,9 @@ func (s *Source) startFrom(position record.Position) error { return nil } -func toKafkaPosition(position record.Position) (map[int32]int64, error) { - if position == nil || len(position) == 0 { - return map[int32]int64{}, nil - } - - var p map[int32]int64 - err := json.Unmarshal(position, &p) - if err != nil { - return nil, cerrors.Errorf("couldn't deserialize position %w", err) - } - return p, nil -} - -func toRecord(message *kafka.Message, position map[int32]int64) (record.Record, error) { - posBytes, err := json.Marshal(position) - if err != nil { - return record.Record{}, cerrors.Errorf("couldn't serialize position %w", err) - } +func toRecord(message *kafka.Message, position string) (record.Record, error) { return record.Record{ - Position: posBytes, + Position: []byte(position), CreatedAt: time.Time{}, ReadAt: time.Time{}, Key: record.RawData{Raw: message.Key}, @@ -135,5 +106,5 @@ func toRecord(message *kafka.Message, position map[int32]int64) (record.Record, } func (s *Source) Ack(context.Context, record.Position) error { - return nil + return s.Consumer.Ack() } diff --git a/pkg/plugins/kafka/source_test.go b/pkg/plugins/kafka/source_test.go index 88d1be6f1..a0bf945f9 100644 --- a/pkg/plugins/kafka/source_test.go +++ b/pkg/plugins/kafka/source_test.go @@ -12,33 +12,33 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kafka +package kafka_test import ( "context" - "encoding/json" "strings" "testing" "time" "github.com/conduitio/conduit/pkg/foundation/assert" "github.com/conduitio/conduit/pkg/plugins" + "github.com/conduitio/conduit/pkg/plugins/kafka" "github.com/conduitio/conduit/pkg/plugins/kafka/mock" - "github.com/conduitio/conduit/pkg/record" - "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/golang/mock/gomock" + "github.com/google/uuid" + skafka "github.com/segmentio/kafka-go" ) func TestOpenSource_FailsWhenConfigEmpty(t *testing.T) { - underTest := Source{} - err := underTest.Open(context.TODO(), plugins.Config{}) + underTest := kafka.Source{} + err := underTest.Open(context.Background(), plugins.Config{}) assert.Error(t, err) assert.True(t, strings.HasPrefix(err.Error(), "config is invalid:"), "incorrect error msg") } func TestOpenSource_FailsWhenConfigInvalid(t *testing.T) { - underTest := Source{} - err := underTest.Open(context.TODO(), plugins.Config{Settings: map[string]string{"foobar": "foobar"}}) + underTest := kafka.Source{} + err := underTest.Open(context.Background(), plugins.Config{Settings: map[string]string{"foobar": "foobar"}}) assert.Error(t, err) assert.True(t, strings.HasPrefix(err.Error(), "config is invalid:"), "incorrect error msg") } @@ -52,91 +52,56 @@ func TestTeardownSource_ClosesClient(t *testing.T) { Close(). Return() - underTest := Source{Consumer: consumerMock, Config: connectorCfg()} + underTest := kafka.Source{Consumer: consumerMock, Config: connectorCfg()} assert.Ok(t, underTest.Teardown()) } -func TestRead_SpecialPositions(t *testing.T) { - testCases := []struct { - name string - pos record.Position - }{ - { - name: "empty position", - pos: record.Position{}, - }, - { - name: "nil position", - pos: nil, - }, - } - - for _, tt := range testCases { - tt := tt - t.Run(tt.name, func(t *testing.T) { - testReadPosition(t, tt.pos) - }) - } -} - -func testReadPosition(t *testing.T, pos record.Position) { +func TestReadPosition(t *testing.T) { ctrl := gomock.NewController(t) kafkaMsg := testKafkaMsg() cfg := connectorCfg() - expPos := map[int32]int64{} + groupID := uuid.NewString() consumerMock := mock.NewConsumer(ctrl) consumerMock. EXPECT(). - StartFrom(cfg.Topic, map[int32]int64{}, cfg.ReadFromBeginning) + StartFrom(cfg, groupID) consumerMock. EXPECT(). - Get(msgTimeout). - Return(kafkaMsg, expPos, nil) + Get(gomock.Any()). + Return(kafkaMsg, groupID, nil) - underTest := Source{Consumer: consumerMock, Config: cfg} - rec, err := underTest.Read(context.TODO(), pos) + underTest := kafka.Source{Consumer: consumerMock, Config: cfg} + rec, err := underTest.Read(context.Background(), []byte(groupID)) assert.Ok(t, err) assert.Equal(t, rec.Key.Bytes(), kafkaMsg.Key) assert.Equal(t, rec.Payload.Bytes(), kafkaMsg.Value) - var actPos map[int32]int64 - err = json.Unmarshal(rec.Position, &actPos) - assert.Ok(t, err) - assert.Equal(t, expPos, actPos) + assert.Equal(t, groupID, string(rec.Position)) } func TestRead_StartFromCalledOnce(t *testing.T) { ctrl := gomock.NewController(t) cfg := connectorCfg() - pos1 := map[int32]int64{0: 122, 1: 455} - pos1Bytes, _ := json.Marshal(pos1) - - pos2 := map[int32]int64{0: 122, 1: 456} - pos2Bytes, _ := json.Marshal(pos2) + pos := uuid.NewString() consumerMock := mock.NewConsumer(ctrl) consumerMock. EXPECT(). - StartFrom(cfg.Topic, pos1, cfg.ReadFromBeginning) + StartFrom(cfg, pos) consumerMock. EXPECT(). - Get(msgTimeout). - Return(testKafkaMsg(), pos2, nil). - Times(1) - consumerMock. - EXPECT(). - Get(msgTimeout). - Return(nil, pos2, nil). - Times(1) + Get(gomock.Any()). + Return(testKafkaMsg(), pos, nil). + Times(2) - underTest := Source{Consumer: consumerMock, Config: cfg} - _, err := underTest.Read(context.TODO(), pos1Bytes) + underTest := kafka.Source{Consumer: consumerMock, Config: cfg} + _, err := underTest.Read(context.Background(), []byte(pos)) + assert.Ok(t, err) + _, err = underTest.Read(context.Background(), []byte(pos)) assert.Ok(t, err) - _, err = underTest.Read(context.TODO(), pos2Bytes) - assert.True(t, plugins.IsRecoverableError(err), "expected recoverable error") } func TestRead(t *testing.T) { @@ -144,71 +109,33 @@ func TestRead(t *testing.T) { kafkaMsg := testKafkaMsg() cfg := connectorCfg() - startPos := map[int32]int64{0: 122, 1: 455} - startPosBytes, _ := json.Marshal(startPos) - expPos := map[int32]int64{0: 123, 1: 456} + pos := uuid.NewString() consumerMock := mock.NewConsumer(ctrl) consumerMock. EXPECT(). - StartFrom(cfg.Topic, startPos, cfg.ReadFromBeginning) + StartFrom(cfg, pos) consumerMock. EXPECT(). - Get(msgTimeout). - Return(kafkaMsg, expPos, nil) + Get(gomock.Any()). + Return(kafkaMsg, pos, nil) - underTest := Source{Consumer: consumerMock, Config: cfg} - rec, err := underTest.Read(context.TODO(), startPosBytes) + underTest := kafka.Source{Consumer: consumerMock, Config: cfg} + rec, err := underTest.Read(context.Background(), []byte(pos)) assert.Ok(t, err) assert.Equal(t, rec.Key.Bytes(), kafkaMsg.Key) assert.Equal(t, rec.Payload.Bytes(), kafkaMsg.Value) - - var actPos map[int32]int64 - err = json.Unmarshal(rec.Position, &actPos) - assert.Ok(t, err) - assert.Equal(t, expPos, actPos) -} - -func TestRead_InvalidPosition(t *testing.T) { - underTest := Source{} - rec, err := underTest.Read(context.TODO(), []byte("foobar")) - assert.Equal(t, record.Record{}, rec) - assert.Error(t, err) - assert.True( - t, - strings.HasPrefix(err.Error(), "couldn't start from position: invalid position"), - "expected msg to have prefix 'couldn't start from position: invalid position'", - ) -} - -func TestRead_NilMsgReturned(t *testing.T) { - ctrl := gomock.NewController(t) - cfg := connectorCfg() - - consumerMock := mock.NewConsumer(ctrl) - consumerMock. - EXPECT(). - StartFrom(cfg.Topic, map[int32]int64{}, cfg.ReadFromBeginning) - consumerMock. - EXPECT(). - Get(msgTimeout). - Return(nil, map[int32]int64{}, nil) - - underTest := Source{Consumer: consumerMock, Config: cfg} - rec, err := underTest.Read(context.TODO(), record.Position{}) - assert.Equal(t, record.Record{}, rec) - assert.Error(t, err) - assert.True(t, plugins.IsRecoverableError(err), "expected a recoverable error") + assert.Equal(t, pos, string(rec.Position)) } -func testKafkaMsg() *kafka.Message { - return &kafka.Message{ - TopicPartition: kafka.TopicPartition{}, - Value: []byte("test-value"), - Key: []byte("test-key"), - Timestamp: time.Time{}, - TimestampType: 0, - Opaque: nil, - Headers: nil, +func testKafkaMsg() *skafka.Message { + return &skafka.Message{ + Topic: "test", + Partition: 0, + Offset: 123, + HighWaterMark: 234, + Key: []byte("test-key"), + Value: []byte("test-value"), + Time: time.Time{}, } } diff --git a/pkg/plugins/kafka/spec.go b/pkg/plugins/kafka/spec.go index c3c82054a..d0a432d60 100644 --- a/pkg/plugins/kafka/spec.go +++ b/pkg/plugins/kafka/spec.go @@ -25,7 +25,7 @@ func (s Spec) Specify() (plugins.Specification, error) { return plugins.Specification{ Summary: "A Kafka source and destination plugin for Conduit, written in Go.", Description: "", - Version: "v0.5.0", + Version: "v0.1.0", Author: "Meroxa", DestinationParams: map[string]plugins.Parameter{ "servers": { @@ -38,11 +38,6 @@ func (s Spec) Specify() (plugins.Specification, error) { Required: true, Description: "The topic to which records will be written to.", }, - "securityProtocol": { - Default: "", - Required: false, - Description: "Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.", - }, "acks": { Default: "all", Required: false, @@ -65,11 +60,6 @@ func (s Spec) Specify() (plugins.Specification, error) { Required: true, Description: "The topic to which records will be written to.", }, - "securityProtocol": { - Default: "", - Required: false, - Description: "Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.", - }, "readFromBeginning": { Default: "false", Required: false,