diff --git a/scheduler/pkg/kafka/config/auth.go b/scheduler/pkg/kafka/config/auth.go index 2bc85f3bdf..3e27651764 100644 --- a/scheduler/pkg/kafka/config/auth.go +++ b/scheduler/pkg/kafka/config/auth.go @@ -63,7 +63,7 @@ func setupSASLSSLAuthentication(config kafka.ConfigMap) error { func withPasswordAuth(mechanism string, config kafka.ConfigMap) error { // Set the SASL mechanism - config["security.protocol"] = "SASL_SSL" + config["security.protocol"] = tls.SecurityProtocolSASLSSL config["sasl.mechanism"] = mechanism // Set the SASL username and password diff --git a/scheduler/pkg/kafka/gateway/constants.go b/scheduler/pkg/kafka/gateway/constants.go index 58812e2d56..f079c03ba7 100644 --- a/scheduler/pkg/kafka/gateway/constants.go +++ b/scheduler/pkg/kafka/gateway/constants.go @@ -9,10 +9,18 @@ the Change License after the Change Date as each is defined in accordance with t package gateway +import "time" + const ( HeaderKeyType = "seldon-infer-type" HeaderValueJsonReq = "json/inferModelRequest" HeaderValueJsonRes = "json/inferModelResponse" HeaderValueProtoReq = "proto/InferModelRequest" HeaderValueProtoRes = "proto/InferModelResponse" + + // Topic creation retries + TopicCreateTimeout = time.Minute + TopicDescribeTimeout = time.Second + TopicDescribeMaxRetries = 60 + TopicDescribeRetryDelay = time.Second ) diff --git a/scheduler/pkg/kafka/gateway/infer.go b/scheduler/pkg/kafka/gateway/infer.go index dbf299d3ae..db743537e1 100644 --- a/scheduler/pkg/kafka/gateway/infer.go +++ b/scheduler/pkg/kafka/gateway/infer.go @@ -16,6 +16,7 @@ import ( "sync/atomic" "time" + "github.com/cenkalti/backoff/v4" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/signalfx/splunk-otel-go/instrumentation/github.com/confluentinc/confluent-kafka-go/v2/kafka/splunkkafka" log "github.com/sirupsen/logrus" @@ -194,7 +195,10 @@ func (kc *InferKafkaHandler) GetNumModels() int { func (kc *InferKafkaHandler) createTopics(topicNames []string) error { logger := kc.logger.WithField("func", "createTopics") if kc.adminClient == nil { - logger.Warnf("Can't create topics %v as no admin client", topicNames) + logger.Warnf("no kafka admin client, can't create any of the following topics: %v", topicNames) + // An error would typically be returned here, but a missing adminClient typically + // indicates we're running tests. Instead of failing tests, we return nil here. + // TODO: find a better way of mocking kafka return nil } t1 := time.Now() @@ -210,18 +214,62 @@ func (kc *InferKafkaHandler) createTopics(topicNames []string) error { results, err := kc.adminClient.CreateTopics( context.Background(), topicSpecs, - kafka.SetAdminOperationTimeout(time.Minute), + kafka.SetAdminOperationTimeout(TopicCreateTimeout), ) if err != nil { return err } + // Wait for topic creation + logFailure := func(err error, delay time.Duration) { + logger.WithError(err).Errorf("still waiting for all topics to be created...") + } + + logger.Infof("waiting for kafka topic creation") + retryPolicy := backoff.WithMaxRetries( + backoff.NewConstantBackOff(TopicDescribeRetryDelay), + TopicDescribeMaxRetries, + ) + err = backoff.RetryNotify( + func() error { + return kc.ensureTopicsExist(topicNames) + }, + retryPolicy, + logFailure) + + if err != nil { + logger.WithError(err).Errorf("some topics not created, giving up") + return err + } else { + logger.Infof("all topics created") + } + for _, result := range results { - logger.Debugf("Topic result for %s", result.String()) + logger.Debugf("topic result for %s", result.String()) } t2 := time.Now() - logger.Infof("Topic created in %d millis", t2.Sub(t1).Milliseconds()) + logger.Debugf("kafka topics created in %d millis", t2.Sub(t1).Milliseconds()) + + return nil +} + +func (kc *InferKafkaHandler) ensureTopicsExist(topicNames []string) error { + ctx, cancel := context.WithTimeout(context.Background(), TopicDescribeTimeout) + defer cancel() + topicsDescResult, err := kc.adminClient.DescribeTopics( + ctx, + kafka.NewTopicCollectionOfTopicNames(topicNames), + kafka.SetAdminOptionIncludeAuthorizedOperations(false)) + if err != nil { + return err + } + + for _, topicDescription := range topicsDescResult.TopicDescriptions { + if topicDescription.Error.Code() != kafka.ErrNoError { + return fmt.Errorf("topic description failure: %s", topicDescription.Error.Error()) + } + } return nil } @@ -241,7 +289,7 @@ func (kc *InferKafkaHandler) AddModel(modelName string) error { kc.subscribedTopics[inputTopic] = true err := kc.subscribeTopics() if err != nil { - kc.logger.WithError(err).Warn("Failed to subscribe to topics") + kc.logger.WithError(err).Errorf("failed to subscribe to topics") return nil } return nil @@ -255,7 +303,7 @@ func (kc *InferKafkaHandler) RemoveModel(modelName string) error { if len(kc.subscribedTopics) > 0 { err := kc.subscribeTopics() if err != nil { - kc.logger.WithError(err).Errorf("Failed to subscribe to topics") + kc.logger.WithError(err).Errorf("failed to subscribe to topics") return nil } } @@ -276,7 +324,7 @@ func (kc *InferKafkaHandler) Serve() { for run { select { case <-kc.done: - logger.Infof("Stopping consumer %s", kc.consumer.String()) + logger.Infof("stopping consumer %s", kc.consumer.String()) kc.producerActive.Store(false) run = false default: diff --git a/scheduler/pkg/kafka/gateway/worker.go b/scheduler/pkg/kafka/gateway/worker.go index 59dd005c43..37e336e725 100644 --- a/scheduler/pkg/kafka/gateway/worker.go +++ b/scheduler/pkg/kafka/gateway/worker.go @@ -243,7 +243,7 @@ func (iw *InferWorker) produce( for k, vs := range headers { for _, v := range vs { if !existsKafkaHeader(kafkaHeaders, k, v) { - logger.Infof("Adding header to kafka response %s:%s", k, v) + logger.Debugf("Adding header to kafka response %s:%s", k, v) kafkaHeaders = append(kafkaHeaders, kafka.Header{Key: k, Value: []byte(v)}) } }