Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
fixing test
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbrinlee1 committed Apr 7, 2023
1 parent 5681279 commit eb08291
Showing 1 changed file with 12 additions and 18 deletions.
30 changes: 12 additions & 18 deletions idk/kafka_sasl/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

confluent "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/featurebasedb/featurebase/v3/idk"
"github.com/featurebasedb/featurebase/v3/idk/common"
"github.com/pkg/errors"
)

Expand All @@ -21,17 +20,6 @@ type Main struct {
ConfigMap *confluent.ConfigMap `flag:"-"`
}

func (m *Main) CopyIn(config idk.ConfluentCommand) {
m.KafkaSaslUsername = config.KafkaSaslUsername
m.KafkaSaslPassword = config.KafkaSaslPassword
m.KafkaSaslMechanism = config.KafkaSaslMechanism
m.KafkaSecurityProtocol = config.KafkaSecurityProtocol
m.KafkaSslKeyPassword = config.KafkaSslKeyPassword
m.KafkaSslCaLocation = config.KafkaSslCaLocation
m.KafkaSslCertificateLocation = config.KafkaSslCertificateLocation
m.KafkaSslKeyLocation = config.KafkaSslKeyLocation
}

func NewMain() (*Main, error) {
var err error
m := Main{}
Expand All @@ -49,20 +37,26 @@ func NewMain() (*Main, error) {
}
m.NewSource = func() (idk.Source, error) {
source := NewSource()
source.KafkaBootstrapServers = m.KafkaBootstrapServers
source.SchemaRegistryURL = m.SchemaRegistryURL
source.Group = m.Group
source.Topics = m.Topics
source.Log = m.Main.Log()
source.Timeout = m.Timeout
source.KafkaSocketTimeoutMs = int(m.Timeout / time.Millisecond)
source.SkipOld = m.SkipOld
source.ConfluentCommand = m.ConfluentCommand
source.SchemaRegistryUsername = m.SchemaRegistryUsername
source.SchemaRegistryPassword = m.SchemaRegistryPassword
source.Verbose = m.Verbose
source.KafkaMaxPollInterval = m.KafkaMaxPollInterval
source.KafkaSessionTimeout = m.KafkaSessionTimeout
source.KafkaGroupInstanceId = m.KafkaGroupInstanceId
source.KafkaDebug = m.KafkaDebug
source.KafkaSocketKeepaliveEnable = m.KafkaSocketKeepaliveEnable
source.Header = m.Header
source.AllowMissingFields = m.AllowMissingFields
source.KafkaBootstrapServers = m.KafkaBootstrapServers
cfg, err := common.SetupConfluent(&m.ConfluentCommand)
if err != nil {
return nil, err
}

source.ConfigMap = cfg
err = source.Open()
if err != nil {
return nil, errors.Wrap(err, "opening source")
Expand Down

0 comments on commit eb08291

Please sign in to comment.