Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG for shutdown? #741

Closed
always-waiting opened this issue Nov 4, 2021 · 7 comments
Closed

BUG for shutdown? #741

always-waiting opened this issue Nov 4, 2021 · 7 comments
Labels

Comments

@always-waiting
Copy link

always-waiting commented Nov 4, 2021

image

当在一个循环里不断的新建PushConsumer,并Start和Shutdown时,发现协程在不断的增长。查看了源码,感觉有一个Shutdown的bug。具体问题如下:

msgs := pq.getMessages()

虽然Shutdown了PushConsumer但是getMessage函数在等待一个channel的返回值,因此挂起了。

如下是一个测试用例,Shutdown后,发现有些goroute没有关闭,结果如上面的图片

package main

import (
	"context"
	"fmt"
	"net/http"
	_ "net/http/pprof"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/rlog"
)

func main() {
	go http.ListenAndServe("0.0.0.0:6565", nil)
	rlog.SetLogLevel("error")
	consumer.ShutDownStatis()
	for {
		conn := createPushConsumer()
		if err := conn.Start(); err != nil {
			panic(err)
		}
		time.Sleep(5 * time.Second)
		if err := conn.Shutdown(); err != nil {
			panic(err)
		}
		fmt.Println("goroute会不断增加......")
	}
}

func createPushConsumer() rocketmq.PushConsumer {
	mqAddrs := []string{"127.0.0.1:9876"}
	consumeGroupName := "testing"
	topics := []string{"yacos"}
	opts := []consumer.Option{
		consumer.WithConsumerModel(consumer.BroadCasting),
		consumer.WithNsResolver(primitive.NewPassthroughResolver(mqAddrs)),
		consumer.WithGroupName(consumeGroupName),
	}
	var conn rocketmq.PushConsumer
	var err error
	if conn, err = rocketmq.NewPushConsumer(opts...); err != nil {
		return nil
	}
	f := func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		return consumer.ConsumeSuccess, nil
	}
	for _, topic := range topics {
		t := topic
		if err = conn.Subscribe(t, consumer.MessageSelector{}, f); err != nil {
			return nil
		}
	}
	return conn
}

@always-waiting
Copy link
Author

@180909 帮忙确认一下,这个是否为一个bug?

@180909
Copy link
Contributor

180909 commented Nov 4, 2021

@180909 帮忙确认一下,这个是否为一个bug?

简单看了下,确实在很短的时间内,goroutine数量一直在增加。

@always-waiting
Copy link
Author

@180909 我看了源码,判断是

msgs := pq.getMessages()
这里获取数据的时候,在一直等待channel返回数据,所以goroutine不会停止。因该需要在某个地方,有一个close那个channel的地方。

但是有一个问题是,往已经关闭的channel里放数据,会引发panic,所以具体在哪里关闭这个channel,我无法确定。这个还需要更熟悉代码的朋友帮忙。

能否帮忙解决这个bug?

@maixiaohai
Copy link
Contributor

#614
same issue? Try related patch

@always-waiting
Copy link
Author

@maixiaohai 我使用的是v2.1.0版本,是否因为版本过低了?

@tt-live
Copy link

tt-live commented Dec 3, 2021

@wenfengwang 大佬,帮忙看看这个chan是否需要在dorebalance里面更新队列信息,删除队列的时候,同时关闭chan

func (pq *processQueue) WithDropped(dropped bool) {
	if dropped {
		close(pq.msgCh)
	}
	pq.dropped.Store(dropped)
}

正常的应该是在队列删除的时候,就把这个chan关闭,不然会一直产生goroutine泄露的问题,另外我看了master分支的代码,在服务shutdown的时候,会关闭chan,那个地方的逻辑应该删除掉,不然服务重启的或者关闭的时候会引发panic,删除一个已经关闭的chan

func (dc *defaultConsumer) shutdown() error {
	atomic.StoreInt32(&dc.state, int32(internal.StateShutdown))

	mqs := make([]*primitive.MessageQueue, 0)
	dc.processQueueTable.Range(func(key, value interface{}) bool {
		k := key.(primitive.MessageQueue)
		pq := value.(*processQueue)
		pq.WithDropped(true)
		// close msg channel using RWMutex to make sure no data was writing
		pq.mutex.Lock()
		**// close(pq.msgCh) 这段代码应该删除掉**
		pq.mutex.Unlock()
		mqs = append(mqs, &k)
		return true
	})
	dc.stat.ShutDownStat()
	dc.storage.persist(mqs)
	dc.client.Shutdown()
	return nil
}

@wenfengwang wenfengwang linked a pull request Dec 16, 2021 that will close this issue
5 tasks
@stale
Copy link

stale bot commented Apr 16, 2022

This issue has been automatically marked as stale because it has not had recent activity. It will be closed after 14 days if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Apr 16, 2022
@stale stale bot closed this as completed Apr 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants