Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub/ AWS MSK Support #3398

Closed
alok87 opened this issue Mar 9, 2024 · 6 comments
Closed

pubsub/ AWS MSK Support #3398

alok87 opened this issue Mar 9, 2024 · 6 comments

Comments

@alok87
Copy link

alok87 commented Mar 9, 2024

Does pubsub support AWS MSK? (aws serveless kafka)

AWS MSK requires these special token providers https://github.com/aws/aws-msk-iam-sasl-signer-go

@vangent
Copy link
Contributor

vangent commented Mar 9, 2024

As far as I know, it should. The Go CDK constructors take a sarama.Config, and it looks like the required token providers are provided on that.

@vangent vangent closed this as completed Mar 9, 2024
@vangent
Copy link
Contributor

vangent commented Mar 9, 2024

The default URL opener will probably not work, as it uses a MinimalConfig and there's no way to override anything from the URL. However, you can create your own URLOpener with the saram.Config set:

https://github.com/google/go-cloud/blob/master/pubsub/kafkapubsub/kafka.go#L152

OpenTopic is here:

https://github.com/google/go-cloud/blob/master/pubsub/kafkapubsub/kafka.go#L232

@alok87
Copy link
Author

alok87 commented Mar 9, 2024

Did not get? will the following work or not!

  • OpenTopics
  • Subscription.Receive()

my code is doing below, wont it work?

package main

func main() {
        config := cfg.New(appEnv)

        config.Pub.Kafka.Sarama = kafkapubsub.MinimalConfig()
	config.Pub.Kafka.Sarama.Net.SASL.Enable = true
	config.Pub.Kafka.Sarama.Net.SASL.Mechanism = sarama.SASLTypeOAuth
	config.Pub.Kafka.Sarama.Net.SASL.TokenProvider = pubsub.NewMSKAccessTokenProvider("ap-south-1")
	log.Infof(ctx, "MSK Bootstrap server: %v", config.Pub.Kafka.Address)


        pubService, err := pub.New(
		ctx,
		config.Pub,
		[]string{"mytopic"},
	)

        pubService.Publish()
}
package pub

import (
	"context"
	"fmt"
	"sync"

	"github.com/IBM/sarama"
	gopubsub "gocloud.dev/pubsub"
	"gocloud.dev/pubsub/kafkapubsub"

	"github.com/myorg/myworld/pkg/log"
)

// Pub is a wrapper over gocloud pubsub
// to reduce code duplication across modules
type Pub struct {
	// openTopics keeps a list of open topics
	openTopics map[string]*gopubsub.Topic

	// config can be used later also
	config Config
}

// Config has the pubsub configuration
type Config struct {
	// Kafka configurations
	Kafka *KafkaConfig
}

// KafkaConfig keeps all kafka related config
type KafkaConfig struct {
	// Sarama kafka library configurations
	Sarama *sarama.Config
	// Address where kafka is present
	Address []string
}

// New creates a new instance of pubsub, opens subscriptions, and creates topics.
func New(
	ctx context.Context,
	config Config,
	openTopics []string,
) (*Pub, error) {
	if config.Kafka == nil {
		return nil, fmt.Errorf("only kafka supported currently")
	}

	openedTopics := make(map[string]*gopubsub.Topic)
	for _, topicName := range openTopics {
		topic, err := kafkapubsub.OpenTopic(
			config.Kafka.Address,
			config.Kafka.Sarama,
			topicName,
			nil,
		)
		if err != nil {
			return nil, fmt.Errorf(
				"could not create/establish conn with topic: %+v, err:%v",
				topicName,
				err,
			)
		}
		openedTopics[topicName] = topic
	}

	return &Pub{
		openTopics: openedTopics,
		config:     config,
	}, nil
}

// Close closes all open subscriptions and topics.
func (ps *Pub) Close(ctx context.Context, wg *sync.WaitGroup) {
	defer wg.Done()

	<-ctx.Done()
	for _, topic := range ps.openTopics {
		err := topic.Shutdown(ctx)
		if err != nil {
			log.Errorf(ctx, "error closing pubsub topic: %v", err)
		}
	}
}

// OpenTopic creates and registers a new topic.
func (ps *Pub) OpenTopic(
	ctx context.Context,
	topicName string,
) error {
	if _, exists := ps.openTopics[topicName]; exists {
		return fmt.Errorf("topic: %v already registered", topicName)
	}

	topic, err := kafkapubsub.OpenTopic(
		ps.config.Kafka.Address,
		ps.config.Kafka.Sarama,
		topicName,
		nil,
	)
	if err != nil {
		return fmt.Errorf(
			"could not create/establish conn with topic: %+v, err:%v",
			topicName,
			err,
		)
	}
	ps.openTopics[topicName] = topic

	return nil
}

// Publish writes to the topic.
func (ps *Pub) Publish(
	ctx context.Context,
	topicName string,
	message []byte,
) error {
	topic, ok := ps.openTopics[topicName]
	if !ok || topic == nil {
		return fmt.Errorf("topic: %v unregistered", topic)
	}

	return topic.Send(
		ctx,
		&gopubsub.Message{
			Body: message,
		},
	)
}

I am currently testing only above using go-cloud, only opening topics and publishing.

@vangent
Copy link
Contributor

vangent commented Mar 9, 2024

As far as I know if you set up the config.Sarama correctly for AWS MSK, it should work with Go CDK. If you are having problems I suggest you try it using the native AWS MSK to make sure your config is correct.

@alok87
Copy link
Author

alok87 commented Mar 11, 2024

Getting this kafka: client has run out of available brokers to talk to)

I debugged also

It is failing at: https://github.com/google/go-cloud/blob/master/pubsub/kafkapubsub/kafka.go#L247-L250

brokers configuration is correct, i tested with

sh-4.2# ./kafka-topics.sh --bootstrap-server $BS --command-config client.properties --list

@alok87
Copy link
Author

alok87 commented Mar 11, 2024

Working, TLS config was issue, thanks gocloud works with MSK 👍🏽

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants