Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to stop subscriber goroutine when publisher goroutine is canceled ? #6

Closed
vikramarsid opened this issue Oct 20, 2020 · 1 comment

Comments

@vikramarsid
Copy link

vikramarsid commented Oct 20, 2020

Hello 👋,

Firstly, I would like to thank @autom8ter for bringing out the Machine. This package is super helpful in building highly concurrent Go applications.

In the below code snippet, how can I close subscribers when all the messages are consumed? Currently, the subscriber goroutine is hanging indefinitely.

numRange := []int{1,2,3,4,5,6,7,8,9}

m := machine.New(context.Background(),
		// functions are added to a FIFO channel that will block when active routines == max routines.
		machine.WithMaxRoutines(d.MaxConcurrentSqlExecutions),
		// every function executed by machine.Go will recover from panics
		machine.WithMiddlewares(machine.PanicRecover()),
	)
	defer m.Close()

	channelName := "dedupe"

	// start a goroutine that subscribes to all messages sent to the target channel
	m.Go(func(routine machine.Routine) {
		for {
			select {
			case <-routine.Context().Done():
				return
			default:
				err := routine.Subscribe(channelName, func(obj interface{}) {
					log.Infof("%v | subscription msg received! channel = %v msg = %v stats = %s\n",
						routine.PID(), channelName, obj, m.Stats().String())
				})
				if err != nil {
					log.Error("failed to start goroutines to consume jobs ", err)
					routine.Cancel()
					return
				}
			}
		}
	},
		machine.GoWithTags("subscribe"))

	// start another goroutine that publishes to the target channel every second
	m.Go(func(routine machine.Routine) {
		defer routine.Cancel()
		log.Infof("%v | streaming msg to channel = %v stats = %s\n", routine.PID(), channelName, routine.Machine().Stats().String())
		// publish message to channel
		for _, interval := range numRange {
			err := routine.Publish(channelName, interval)
			if err != nil {
				log.Error("failed to start goroutines to consume jobs ", err)
			}
		}
	},
		machine.GoWithTags("publish"),
	)

	m.Wait()

Thank you!

@autom8ter
Copy link
Owner

Hello, I have added SubscribeUntil(), SubscribeWhile(), & SubscribeN() methods to the routine interface that should let you break the subscription!

        // SubscribeN subscribes to the given channel until it receives N messages or its context is cancelled
	SubscribeN(channel string, n int, handler func(msg interface{})) error
	// SubscribeUntil subscribes to the given channel until the decider returns false for the first time. The subscription breaks when the routine's context is cancelled or the decider returns false.
	SubscribeUntil(channel string, decider func() bool, handler func(msg interface{})) error
	// SubscribeWhile subscribes to the given channel while the decider returns true. The subscription breaks when the routine's context is cancelled.
	SubscribeWhile(channel string, decider func() bool, handler func(msg interface{})) error

Please see the updated README for an example and let me know if this will work for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants