Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example for integration with confluentinc/confluent-kafka-go #12

Open
1 of 2 tasks
bitdean opened this issue Nov 29, 2023 · 2 comments
Open
1 of 2 tasks

Example for integration with confluentinc/confluent-kafka-go #12

bitdean opened this issue Nov 29, 2023 · 2 comments

Comments

@bitdean
Copy link

bitdean commented Nov 29, 2023

Describe the feature

Since confluentinc/confluent-kafka-go is most generic and it uses underlaying librdkafka natively. Would it be possible that someone would provide example how to integrate with it?

Use Case

  • application using confluentinc/confluent-kafka-go would like to use AWS MSK IAM SASL signer for connecting to AWS MSK cluster via IAM role
  • provide example how to connect when using confluentinc/confluent-kafka-go

Proposed Solution

No response

Other Information

No response

Acknowledgements

  • I may be able to implement this feature request
  • This feature might incur a breaking change

aws-msk-iam-sasl-signer-go Module Versions Used

1.0.0

Go version used

1.21

@nboergerotto
Copy link

nboergerotto commented Mar 11, 2024

I got it running with the following code:

package main

import (
	"context"
	"fmt"
	"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
	"github.com/confluentinc/confluent-kafka-go/kafka"
	"time"
)

func main() {
	bearerToken := createToken()

	producer := createProducerWithToken(bearerToken)
	defer producer.Close()

	produceMessagesToTopic(producer)

	flush(producer)
}

func createToken() kafka.OAuthBearerToken {
	token, tokenExpirationTime, err := signer.GenerateAuthToken(context.TODO(), "YOUR-AWS-REGION")
	if err != nil {
		panic(err)
	}
	seconds := tokenExpirationTime / 1000
	nanoseconds := (tokenExpirationTime % 1000) * 1000000
	bearerToken := kafka.OAuthBearerToken{
		TokenValue: token,
		Expiration: time.Unix(seconds, nanoseconds),
	}

	return bearerToken
}

func createProducerWithToken(bearerToken kafka.OAuthBearerToken) *kafka.Producer {
	kafkaConfig := &kafka.ConfigMap{
		"bootstrap.servers": "YOUR-BROKER-URL",
		"client.id":         "golang-test-producer",
		"security.protocol": "sasl_ssl",
		"sasl.mechanism":    "OAUTHBEARER",
	}
	producer, err := kafka.NewProducer(kafkaConfig)
	if err != nil {
		panic(err)
	}

	err = producer.SetOAuthBearerToken(bearerToken)
	if err != nil {
		panic(err)
	}
	return producer
}

func produceMessagesToTopic(producer *kafka.Producer) {
	topic := "YOUR-TOPIC-NAME"
	for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
		err := producer.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value:          []byte(word),
		}, nil)
		fmt.Printf("Successfully produced word %s\n", word)
		if err != nil {
			fmt.Printf("Failed to produce: %v\n", err)
		}
	}
}

func flush(producer *kafka.Producer) {
	producer.Flush(15 * 1000)
}

You have to replace the following strings in my code: YOUR-BROKER-URL, YOUR-AWS-REGION, YOUR-TOPIC-NAME

For running the code in an ECS-Task, I'm using the following Dockerfile:

# syntax=docker/dockerfile:1

FROM --platform=linux/amd64 golang:1.22 as builder
WORKDIR /app
COPY . .
RUN go mod download
RUN GOOS=linux go build -o /kafka-producer-golang-confluent

FROM --platform=linux/amd64 debian:bookworm-slim as runner
COPY --from=builder /kafka-producer-golang-confluent /kafka-producer-golang-confluent
RUN apt-get update && apt-get install -y ca-certificates && update-ca-certificates
CMD ["/kafka-producer-golang-confluent"]

For me, in order to get it running, it was very important to install ca-certificates ("RUN apt-get update && apt-get install -y ca-certificates && update-ca-certificates"), since debian:bookworm-slim doesn't have them by default. Depending on your docker base image, you may not need to do this though.

This exact code is successfully producing events on my kafka-topic, once the client-policies are correctly set as stated in the AWS documentation.

@dixon14
Copy link

dixon14 commented Sep 24, 2024

#Hi, I encountered a problem when connecting consumer to the brokers using the IAM signer and the confluent kafka go client.

Setup the code similar as the above example. When I tried to ReadMessage from a topic, it return KafkaErrTimeOut. Looked at the debug logs - it shows no brokers is available for coordination but my brokers are healthy. If anyone can help on this?

type MessageProcessor interface {
	Process(msg *confluentkafka.Message) error
}

type Consumer struct {
	config       config.Config
	consumer     *confluentkafka.Consumer
	processorMap map[string]MessageProcessor
}

func New(
	ctx context.Context,
	config config.Config,
	processorMap map[string]MessageProcessor,
) *Consumer {
	configMap := &confluentkafka.ConfigMap{
		"bootstrap.servers":  config.KafkaBootstrapServers,
		"group.id":           config.KafkaGroupID,
		"security.protocol":  "SASL_SSL",
		"sasl.mechanism":     "OAUTHBEARER",
		"auto.offset.reset":  "latest",
		"enable.auto.commit": true,
		"session.timeout.ms": 60000,
		// "heartbeat.interval.ms": 60000,
		// "max.poll.interval.ms":  300000,
		// "debug":                 "all",
	}

	consumer, err := confluentkafka.NewConsumer(configMap)
	if err != nil {
		return nil
	}

	token, err := createToken(ctx)
	if err != nil {
		return nil
	}

	if err := consumer.SetOAuthBearerToken(token); err != nil {
		return nil
	}

	var topics []string
	for topic := range processorMap {
		topics = append(topics, topic)
	}

	// Subscribe to multiple topics
	if err = consumer.SubscribeTopics(topics, nil); err != nil {
		log.Err(err).Msg("Unable to subscribe to topics.")
		return nil
	}

	return &Consumer{
		config:       config,
		consumer:     consumer,
		processorMap: processorMap,
	}
}

// Start indicates that Kafka consumer will start polling messages
// from the subscribed topics in the kafka bootstrap servers
func (c *Consumer) Start() error {
	for {
		msg, err := c.consumer.ReadMessage(100 * time.Millisecond)
		if err != nil {
			return err
		}
		// fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n",
		// 	*msg.TopicPartition.Topic, string(msg.Key), string(msg.Value))
		topic := *msg.TopicPartition.Topic
		processor := c.processorMap[topic]

		if err := processor.Process(msg); err != nil {
			log.Err(err).Msgf("Error in processing message from topic %s", topic)
			return err
		}
	}
}

func createToken(ctx context.Context) (confluentkafka.OAuthBearerToken, error) {
	token, tokenExpirationTimeMs, err := signer.GenerateAuthToken(ctx, "us-east-1")
	if err != nil {
		return confluentkafka.OAuthBearerToken{}, err
	}

	seconds := tokenExpirationTimeMs / 1000
	nanoseconds := (tokenExpirationTimeMs % 1000) * 1000000
	bearerToken := confluentkafka.OAuthBearerToken{
		TokenValue: token,
		Expiration: time.Unix(seconds, nanoseconds),
	}

	return bearerToken, nil
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants