Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stop and start consumers #77

Closed
mhmtszr opened this issue Dec 4, 2023 · 4 comments
Closed

feat: stop and start consumers #77

mhmtszr opened this issue Dec 4, 2023 · 4 comments
Assignees
Labels
enhancement New feature or request

Comments

@mhmtszr
Copy link
Member

mhmtszr commented Dec 4, 2023

Introduction
We may need to turn our consumers on and off while the application is running, depending on our needs. Currently, it is not possible to continue consuming again after consumer is closed.

@Abdulsametileri Abdulsametileri added the enhancement New feature or request label Dec 10, 2023
@oguzhaneren
Copy link

oguzhaneren commented Dec 12, 2023

There is already a feature in kafka clients called pause/resume consumer. it will be good implement this feature in consumer mechanism. there are some good benefits except stopping/start consumer:

  • if consumer pauses kafka client sends heartbeats in background thread so kafka will not enter the rebalance status so this will help to process long-running kind of messages. if your handling thread is busy more than 1 second we can call consumer.pause to eliminate rebalancing.
  • another great feature can be implementing "control-flow" or "backpressure" depending on processing time or buffer limit. for example: if client consume 10K message or processing consumed message time greater than X time you can pause to consumer until all received messages processed.
  • we can implement delaying messages using thread block without thinking about rebalancing, because when in pause status kafka client will continue to send heartbeat in background thread. if we implement this kind of delaying mechanism this will support persistent retry mechanism with configurable delay time for each message using kafka headers.

Resources:

@Abdulsametileri
Copy link
Member

Currently, you can workaround this such as

func main() {
	consumerCfg := &kafka.ConsumerConfig{
		Concurrency: 1,
		Reader: kafka.ReaderConfig{
			Brokers: []string{"localhost:29092"},
			Topic:   "standart-topic",
			GroupID: "standart-cg",
		},
		RetryEnabled: false,
		ConsumeFn:    consumeFn,
	}

	consumer, _ := kafka.NewConsumer(consumerCfg)
	consumer.Consume()

	go func() {
		fmt.Println("closing consumer")
		consumer.Stop()

		time.Sleep(10 * time.Second)

		newConsumer, _ := kafka.NewConsumer(consumerCfg) // take a fresh instance

		fmt.Println("start new fresh consumer")
		newConsumer.Consume()
	}()

	fmt.Println("Consumer started...!")

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)
	<-c
}

func consumeFn(message *kafka.Message) error {
	fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
	return nil
}

@Abdulsametileri
Copy link
Member

cron.AddFunc(config.StartingTime, func() {
		consumer, err := kafka.NewConsumer(config)
		if err != nil {
			panic("error initializing kafka consumer " + err.Error())
		}

		consumer.Consume()

		time.AfterFunc(config.WorkingDuration, func() {
			consumer.Stop()
		})
	})

Abdulsametileri added a commit that referenced this issue Jan 15, 2024
)

* feat: add functionality pause and resume for consumer

* chore: typo

* refactor: extract to the base

* feat: add unit tests

* feat: add pause/resume integration tests

* chore: add documentation

* refactor: convert pause ch in order to save cpu

* chore: fix tests

* chore: lint
@Abdulsametileri
Copy link
Member

Implemented in v2.1.9; thank you for all your support <3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants