diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java index 2bc70804900..827fd5dd454 100644 --- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java +++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java @@ -172,19 +172,17 @@ private static PullMessageConsumer createPullConsumer( // 2. build session factory object // find and initialize TubeMultiSessionFactory object according to the Master cluster information MasterInfo masterInfo = consumerConfig.getMasterInfo(); - MessageSessionFactory sessionFactory = - multSessFtyMap.get(masterInfo.getMasterClusterStr()); - if (sessionFactory == null) { - MessageSessionFactory tmpSessionFactory = - new TubeMultiSessionFactory(consumerConfig); - sessionFactory = multSessFtyMap.putIfAbsent( - masterInfo.getMasterClusterStr(), tmpSessionFactory); + + MessageSessionFactory sessionFactory; + synchronized (multSessFtyMap) { + sessionFactory = + multSessFtyMap.get(masterInfo.getMasterClusterStr()); if (sessionFactory == null) { - sessionFactory = tmpSessionFactory; - } else { - tmpSessionFactory.shutdown(); + sessionFactory = new TubeMultiSessionFactory(consumerConfig); + multSessFtyMap.put(masterInfo.getMasterClusterStr(), sessionFactory); } } + // 3. Create and get the PullMessageConsumer object return sessionFactory.createPullConsumer(consumerConfig); }