Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubSubConn.Receive() is insufficient #650

Closed
Fruchtgummi opened this issue Jul 7, 2023 · 1 comment
Closed

pubSubConn.Receive() is insufficient #650

Fruchtgummi opened this issue Jul 7, 2023 · 1 comment

Comments

@Fruchtgummi
Copy link

Fruchtgummi commented Jul 7, 2023

My Function:

func InitPubSub(p *redis.Pool, chn map[string]map[string]chan *models.ChatMessage, chn2 map[string]map[string]chan *models.OnlineUsers, mux *sync.Mutex) (context.Context, context.CancelFunc) {
	defer func() {
		if r := recover(); r != nil {
			log.Println("Recovered in InitPubSub", r)
		}
		log.Println("defer")
	}()

	pubSubConn := redis.PubSubConn{Conn: p.Get()}
	err := pubSubConn.Subscribe("message_chatglobal", "joindeduser_chatglobal")
	if err != nil {
		log.Fatal(err)
	}

	ctx, cancel := context.WithCancel(context.Background())

	go func(ctx context.Context) {
		ticker := time.NewTicker(time.Second * 60)
		for {
			select {
			case <-ticker.C:
				err := pubSubConn.Ping("")
				if err != nil {
					log.Println("Ping failed: ", err)
					pubSubConn = redis.PubSubConn{Conn: p.Get()}
					err = pubSubConn.Subscribe("message_chatglobal", "joindeduser_chatglobal")
					if err != nil {
						log.Println("Error re-establishing connection: ", err)
					}
				}
			case <-ctx.Done():
				ticker.Stop()
				return
			}
		}
	}(ctx)

	go func(ctx context.Context) {
		defer func() {
			if r := recover(); r != nil {
				log.Println("Recovered in goroutine", r)
				pubSubConn = redis.PubSubConn{Conn: p.Get()}
				err := pubSubConn.Subscribe("message_chatglobal", "joindeduser_chatglobal")
				if err != nil {
					log.Println("Error re-establishing connection: ", err)
				}
			}
		}()

		for {
			select {
			case <-ctx.Done():
				return
			default:
				resultCh := make(chan interface{})
				go func() {
					p := pubSubConn.Receive()
					resultCh <- p
				}()

				select {
				case p := <-resultCh:
					// Ihre vorhandene Logik...
				case <-time.After(5 * time.Second):
					log.Println("Receive operation timed out")
					pubSubConn = redis.PubSubConn{Conn: p.Get()}
					err = pubSubConn.Subscribe("message_chatglobal", "joindeduser_chatglobal")
					if err != nil {
						log.Println("Error re-establishing connection: ", err)
					}
				}
			}
		}
	}(ctx)

	return ctx, cancel
}

The goroutine containing the Receive() call hangs because Receive() is blocked and waits for a response. There is no way to directly stop a blocking function. Therefore, this goroutine will still exist in memory even if the surrounding goroutine has been stopped by the context.

In Go, it is generally difficult to terminate goroutines cleanly, especially if they perform blocking operations. A possible solution would be for the Receive method itself to accept a context, but this is not possible in this case

It must be possible that Recevie() does not expire somewhere in its state, because I cannot cancel it. And that's exactly what happens here. It waits and waits and my goroutines remain.

@stevenh
Copy link
Collaborator

stevenh commented Jul 9, 2023

Use ReceiveContext instead.

@stevenh stevenh closed this as completed Jul 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants