Skip to content

Two consumer in one jvm , shutdown the two client may throw error in unregisterConsumer  #576

@Ah39

Description

@Ah39

The two client have the same MQClientInstance, if A client shutdown , may shutdown MQClientInstance and netty client , so if B shutdown , call unregisterClient , MQClientAPIImpl have shutdowned before .

if consumer.setInstanceName(); The two client have diffrent MQClientAPIImpl so ,dont throw error

image

`public class TestClientShutdown {

public static void main(String[] args) {
	TestClientShutdown testShutdown = new TestClientShutdown();
	try {
		testShutdown.testShutdown();
	} catch (MQClientException e) {
		e.printStackTrace();
	}
	System.exit(0);
}

// @Test
public void testShutdown() throws MQClientException {
	System.out.println("start");
	DefaultMQPushConsumer consumer1 = createConsumer("hellogroup1","topic_test1");
	DefaultMQPushConsumer consumer2 = createConsumer("hellogroup2","topic_test2");
	Object obj = new Object();
	Thread thread = new Thread() {
		public void run() {
			synchronized (obj) {
				try {
					obj.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			System.out.println("helloworld shutdown consumer1");
			consumer1.shutdown();
		}
	};

	Thread thread2 = new Thread() {
		public void run() {
			synchronized (obj) {
				try {
					obj.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			System.out.println("helloworld shutdown consumer2");
			consumer2.shutdown();
		}
	};
	thread.start();
	thread2.start();
	try {
		System.out.println("**************** start sleep **************************");
		Thread.sleep(20000);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}

	synchronized (obj) {
		obj.notifyAll();
	}
	try {
		Thread.sleep(20000);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	//consumer1.shutdown();
	//consumer2.shutdown(); //如果是一个线程顺序shutdown,则是没有问题,如果是两个线程,
	//分别shutdown consumer 则会出现报错。如果是consumer.setInstanceName(consumerGroup);
	//则不会报错,因为内部有一个单例变量,setIntance 就是两个变量了

	System.out.println("start shutdown *******************");
}

public static DefaultMQPushConsumer createConsumer(String consumerGroup,String topicName) throws MQClientException {
	final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
	consumer.setNamesrvAddr("10.126.84.164:9876;10.126.84.165:9876");

	consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    //topic_test 
	consumer.subscribe(topicName, "*");
	//consumer.setInstanceName(consumerGroup);
	consumer.setMessageListener(new MessageListenerConcurrently() {

		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			Iterator<MessageExt> it = msgs.iterator();
			while (it.hasNext()) {
				MessageExt msgExt = it.next();
				String str = new String(msgExt.getBody());
				System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + str);
			}
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

		}

	});

	consumer.start();
	return consumer;
}

public static DefaultMQProducer GetProducer() throws MQClientException {
	DefaultMQProducer producer = new DefaultMQProducer("helloproducer22");
	producer.setNamesrvAddr("10.126.84.164:9876;10.126.84.165:9876");
	producer.setSendMsgTimeout(800);
	producer.start();
	return producer;
}

}`

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions