Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace topic's routeInfoData can never be updated #1007

Open
tt67wq opened this issue Feb 16, 2023 · 4 comments · May be fixed by #1095
Open

Trace topic's routeInfoData can never be updated #1007

tt67wq opened this issue Feb 16, 2023 · 4 comments · May be fixed by #1095
Labels
bug Something isn't working good first issue Good for newcomers trace

Comments

@tt67wq
Copy link

tt67wq commented Feb 16, 2023

BUG REPORT

  1. Please describe the issue you observed:

    • What did you do (The steps to reproduce)?
      I disable write permission for a broker.

    • What did you expect to see?
      All queues belong to this broker are not allowed to write.

    • What did you see instead?
      queues belongs to topic RMQ_SYS_TRACE_TOPIC still emit messages to this broker.

I read trace.go code for a while and found that routeInfoData for topic RMQ_SYS_TRACE_TOPIC will never update since first boot;
Maybe we should add a timer to process UpdateTopicRouteInfo job every several seconds

@tt67wq tt67wq changed the title Trace topic's routeInfoData can never update Trace topic's routeInfoData can never be updated Feb 16, 2023
@francisoliverlee francisoliverlee added the question Further information is requested label Feb 21, 2023
@francisoliverlee
Copy link
Member

我发现路由自动更新的逻辑是有的, 可以试试是否生效。
主要的处理过程如下:

  1. 初始化rmqClient,维护一个producerMap,
    producerMap sync.Map

    启动更新topic路由的TimeTicker
    // schedule update route info
    go primitive.WithRecover(func() {
    // delay
    op := func() {
    c.UpdateTopicRouteInfo()
    }
    time.Sleep(10 * time.Millisecond)
    op()
    ticker := time.NewTicker(_PullNameServerInterval)
    defer ticker.Stop()
    for {
    select {
    case <-ticker.C:
    op()
    case <-c.done:
    rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
    "clientID": c.ClientID(),
    })
    return
    }
    }
    })
  2. 第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。
  3. 正常发送消息

@tt67wq
Copy link
Author

tt67wq commented Feb 24, 2023

我发现路由自动更新的逻辑是有的, 可以试试是否生效。 主要的处理过程如下:

  1. 初始化rmqClient,维护一个producerMap,

    producerMap sync.Map

    启动更新topic路由的TimeTicker

    // schedule update route info
    go primitive.WithRecover(func() {
    // delay
    op := func() {
    c.UpdateTopicRouteInfo()
    }
    time.Sleep(10 * time.Millisecond)
    op()
    ticker := time.NewTicker(_PullNameServerInterval)
    defer ticker.Stop()
    for {
    select {
    case <-ticker.C:
    op()
    case <-c.done:
    rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
    "clientID": c.ClientID(),
    })
    return
    }
    }
    })

  2. 第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。

  3. 正常发送消息

RMQ_SYS_TRACE_TOPIC 这个topic没有经过tryToFindTopicPublishInfo这个过程,trace的发送过程是这样

func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) {
	traceTopic := td.traceTopic
	if td.access == primitive.Cloud {
		traceTopic = td.traceTopic + regionID
	}
	msg := primitive.NewMessage(traceTopic, []byte(data))
	msg.WithKeys(keySet.slice())

	mq, addr := td.findMq(regionID)
	if mq == nil {
		return
	}

	var req = td.buildSendRequest(mq, msg)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
		cancel()
		resp := primitive.NewSendResult()
		if e != nil {
			rlog.Info("send trace data error.", map[string]interface{}{
				"traceData": data,
			})
		} else {
			td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
			rlog.Debug("send trace data success:", map[string]interface{}{
				"SendResult": resp,
				"traceData":  data,
			})
		}
	})
...
}

这里的findmq是不会将RMQ_SYS_TRACE_TOPIC加入publishInfo,所以不会自动被client执行UpdatePublishInfo

@francisoliverlee
Copy link
Member

我发现路由自动更新的逻辑是有的, 可以试试是否生效。 主要的处理过程如下:

  1. 初始化rmqClient,维护一个producerMap,

    producerMap sync.Map

    启动更新topic路由的TimeTicker

    // schedule update route info
    go primitive.WithRecover(func() {
    // delay
    op := func() {
    c.UpdateTopicRouteInfo()
    }
    time.Sleep(10 * time.Millisecond)
    op()
    ticker := time.NewTicker(_PullNameServerInterval)
    defer ticker.Stop()
    for {
    select {
    case <-ticker.C:
    op()
    case <-c.done:
    rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
    "clientID": c.ClientID(),
    })
    return
    }
    }
    })

  2. 第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。

  3. 正常发送消息

RMQ_SYS_TRACE_TOPIC 这个topic没有经过tryToFindTopicPublishInfo这个过程,trace的发送过程是这样

func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) {
	traceTopic := td.traceTopic
	if td.access == primitive.Cloud {
		traceTopic = td.traceTopic + regionID
	}
	msg := primitive.NewMessage(traceTopic, []byte(data))
	msg.WithKeys(keySet.slice())

	mq, addr := td.findMq(regionID)
	if mq == nil {
		return
	}

	var req = td.buildSendRequest(mq, msg)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
		cancel()
		resp := primitive.NewSendResult()
		if e != nil {
			rlog.Info("send trace data error.", map[string]interface{}{
				"traceData": data,
			})
		} else {
			td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
			rlog.Debug("send trace data success:", map[string]interface{}{
				"SendResult": resp,
				"traceData":  data,
			})
		}
	})
...
}

这里的findmq是不会将RMQ_SYS_TRACE_TOPIC加入publishInfo,所以不会自动被client执行UpdatePublishInfo

分析完全正确, 这里是bug,非常欢迎你提个PR修复这个问题。

正常的producer在生产的时候会把自己注册到client的producerMap中,在第一次发送的时候,会拉取路由,更新在producerMap对应实例的路由信息。 以后是由定时任务检查client.producerMap中每个生产者的topic的路由信息。

traceDispatcher在初始化的时候,并没有初始化生产者, 注册生产者到producerMap。这样定时任务检查路由的机制是存在的,但是没有topic需要检查。

处理办法:仿照java的逻辑, 初始化一个真正的producer来发送trace信息, 而不是直接调用通信层的invokeSync。这样每个producer的全部topic可以正常更新路由。

java的trace producer:
https://github.com/apache/rocketmq/blob/06f2208a34907211591114f6b0d327168c250fb3/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java#L62

@francisoliverlee francisoliverlee added bug Something isn't working good first issue Good for newcomers trace and removed question Further information is requested labels Feb 27, 2023
xiaolibuzai-ovo added a commit to xiaolibuzai-ovo/rocketmq-client-go that referenced this issue Sep 10, 2023
@xiaolibuzai-ovo
Copy link

我发现路由自动更新的逻辑是有的, 可以试试是否生效。 主要的处理过程如下:

  1. 初始化rmqClient,维护一个producerMap,

    producerMap sync.Map

    启动更新topic路由的TimeTicker

    // schedule update route info
    go primitive.WithRecover(func() {
    // delay
    op := func() {
    c.UpdateTopicRouteInfo()
    }
    time.Sleep(10 * time.Millisecond)
    op()
    ticker := time.NewTicker(_PullNameServerInterval)
    defer ticker.Stop()
    for {
    select {
    case <-ticker.C:
    op()
    case <-c.done:
    rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
    "clientID": c.ClientID(),
    })
    return
    }
    }
    })

  2. 第一次发送消息前,获取topic已存在的路由,如果不存在则从namesrv拉取,并且调用UpdatePublishInfo()保存到对应的producer中。

  3. 正常发送消息

RMQ_SYS_TRACE_TOPIC 这个topic没有经过tryToFindTopicPublishInfo这个过程,trace的发送过程是这样

func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) {
	traceTopic := td.traceTopic
	if td.access == primitive.Cloud {
		traceTopic = td.traceTopic + regionID
	}
	msg := primitive.NewMessage(traceTopic, []byte(data))
	msg.WithKeys(keySet.slice())

	mq, addr := td.findMq(regionID)
	if mq == nil {
		return
	}

	var req = td.buildSendRequest(mq, msg)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
		cancel()
		resp := primitive.NewSendResult()
		if e != nil {
			rlog.Info("send trace data error.", map[string]interface{}{
				"traceData": data,
			})
		} else {
			td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
			rlog.Debug("send trace data success:", map[string]interface{}{
				"SendResult": resp,
				"traceData":  data,
			})
		}
	})
...
}

这里的findmq是不会将RMQ_SYS_TRACE_TOPIC加入publishInfo,所以不会自动被client执行UpdatePublishInfo

分析完全正确, 这里是bug,非常欢迎你提个PR修复这个问题。

正常的producer在生产的时候会把自己注册到client的producerMap中,在第一次发送的时候,会拉取路由,更新在producerMap对应实例的路由信息。 以后是由定时任务检查client.producerMap中每个生产者的topic的路由信息。

traceDispatcher在初始化的时候,并没有初始化生产者, 注册生产者到producerMap。这样定时任务检查路由的机制是存在的,但是没有topic需要检查。

处理办法:仿照java的逻辑, 初始化一个真正的producer来发送trace信息, 而不是直接调用通信层的invokeSync。这样每个producer的全部topic可以正常更新路由。

java的trace producer: https://github.com/apache/rocketmq/blob/06f2208a34907211591114f6b0d327168c250fb3/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java#L62

hi,您好,我尝试修复了这个bug,辛苦review PR,第一次参与该项目有不规范的地方请指出

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good first issue Good for newcomers trace
Projects
None yet
3 participants