New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSSUE 1188]Fix the problem when more than one producer or consumer in the same process can trace only one #1275
[ISSSUE 1188]Fix the problem when more than one producer or consumer in the same process can trace only one #1275
Conversation
Hi @duhenglucky @ShannonDing @jonnxu please help to review this pr,together. |
|
||
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) { | ||
public AsyncTraceDispatcher(String producerOrConsumerGroup, String traceTopicName, RPCHook rpcHook) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be better if the variable "producerOrConsumerGroup" change to be "group" name.
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
Outdated
Show resolved
Hide resolved
@@ -159,6 +161,10 @@ private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) { | |||
return traceProducerInstance; | |||
} | |||
|
|||
private String makeGroupNameForTrace() { | |||
return TraceConstants.GROUP_NAME_PREFIX + "-" + this.producerOrConsumerGroup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same as above mentioned.
@@ -159,6 +161,10 @@ private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) { | |||
return traceProducerInstance; | |||
} | |||
|
|||
private String makeGroupNameForTrace() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be better if makeGroupNameForTrace method name change to be genGroupNameForTrace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All variables were renamed as your advice.
If users start the mutiple producer or consumer instances in the same process,the instance name is the same one.You can set different instance name in the start method when the traceProducer is started. @ahuazhu |
LGTM |
If set different instance name for trace producer, multiple client instances will be created. |
@@ -159,6 +161,10 @@ private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) { | |||
return traceProducerInstance; | |||
} | |||
|
|||
private String genGroupNameForTrace() { | |||
return TraceConstants.GROUP_NAME_PREFIX + "-" + this.group; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it can trace when the multi consumers or producers with the same group name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it can trace when the multi consumers or producers with the same group name?
Only the first traceProducer can trace, and the other will be start failed. If specify a instanceName for each group, it will create multi client instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonnxu
Thank you for the reminder. As far as I know, in RocketMQ two producers for different topics can't share the same ProduceGroup, and consumers also. But a consumer and a producer can have the same group. Unfortunately,the trace will failure in such scenes. To fix this problem, client type(consume or produce) will be appended to the trace-group to distinguish the consumer and producer.
private String genGroupNameForTrace() {
return TraceConstants.GROUP_NAME_PREFIX + "-" + this.group + "-" + this.type ;
}
When construct AsyncTraceDispatcher.type
will be set as "PRODUCE" or "CONSUME".
This PR has been updated.
And my test code as below:
public class SameProcessSameTopicTest {
private static String topic1 = "TopicTest1";
private static String topic2 = "TopicTest2";
// private static final String CONSUMER_GROUP1 = "test_c_group1";
// private static final String CONSUMER_GROUP2 = "test_c_group2";
//
// private static final String PRODUCER_GROUP1 = "test_p_group1";
// private static final String PRODUCER_GROUP2 = "test_p_group2";
private static final String CONSUMER_GROUP1 = "test__group1";
private static final String CONSUMER_GROUP2 = "test__group2";
private static final String PRODUCER_GROUP1 = "test__group1";
private static final String PRODUCER_GROUP2 = "test__group2";
private static final String NAMR_SRV = "";
public static void startConsume(final String id, final String group, final String topic) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, true);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(topic, "*");
consumer.setNamesrvAddr(NAMR_SRV);
consumer.setInstanceName(id);
final AtomicInteger i = new AtomicInteger(0);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new Date() + ":" + new String(msg.getBody()) + " consume success! " + msg.getMsgId() + " " + topic);
}
i.getAndAdd(msgs.size());
System.err.println(id + " Consumed " + i.get() + " messages for " + topic);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
public static void startProduce(String id, String group, String topic) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer(group,true);
producer.setNamesrvAddr(NAMR_SRV);
producer.start();
producer.setInstanceName(id);
AtomicInteger c = new AtomicInteger(0);
for (int i = 0; i < 10 ;) {
try {
Message msg = new Message(topic /* Topic */, null /* Tag */,
("trace message " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
producer.send(msg);
Thread.sleep(50);
i++;
System.out.println(id + " send " + c.getAndIncrement() + " messages to " + topic);
} catch (Exception e) {
// e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException, MQClientException {
Thread t1 = new Thread() {
@Override
public void run() {
try {
startConsume("C1", CONSUMER_GROUP1, topic1);
} catch (MQClientException e) {
e.printStackTrace();
}
}
};
Thread t2 = new Thread() {
@Override
public void run() {
try {
startConsume("C2", CONSUMER_GROUP2, topic2);
} catch (MQClientException e) {
e.printStackTrace();
}
}
};
Thread t3 = new Thread() {
@Override
public void run() {
try {
startProduce("P1", PRODUCER_GROUP1, topic1);
} catch (MQClientException e) {
e.printStackTrace();
}
}
};
//
Thread t4 = new Thread() {
@Override
public void run() {
try {
startProduce("P2", PRODUCER_GROUP2, topic2);
} catch (MQClientException e) {
e.printStackTrace();
}
}
};
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.printf("Consumer Started.%n");
}
}
LGTM |
@duhenglucky when release?Thanks! |
…in the same process can trace only one (apache#1275) (apache#1303) * fix trace problem when multi produce/consumer in the same process * uniform parameter manner * variable rename * consumer groups may be same with the producer group Co-authored-by: zhengwen zhu <ahuazhu@gmail.com>
…in the same process can trace only one (apache#1275) (apache#1303) * fix trace problem when multi produce/consumer in the same process * uniform parameter manner * variable rename * consumer groups may be same with the producer group Co-authored-by: zhengwen zhu <ahuazhu@gmail.com>
…in the same process can trace only one (apache#1275) (apache#1303) * fix trace problem when multi produce/consumer in the same process * uniform parameter manner * variable rename * consumer groups may be same with the producer group Co-authored-by: zhengwen zhu <ahuazhu@gmail.com>
What is the purpose of the change
Fix the bug when there are more than one producer (or consumer) , the Tracer can not show all trace infomation.
ISSUE 1188
Brief changelog
Set different ProducerGroup for different TraceProducer, the ProducerGroup will like
_INNER_TRACE_PRODUCER-user-specified-producer-group
Verifying this change
Produce and consume message for a same topic in the same process, the console show both produce and consume information.