diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java index 94086b82d3..3319b6cd3e 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java @@ -260,8 +260,7 @@ private void startClientGroupConsumer(Session session) throws Exception { } synchronized (lockMap.get(subsystem)) { log.info("readySession session[{}]", session); - ClientGroupWrapper cgw = session.getClientGroupWrapper().get(); - + ClientGroupWrapper cgw = this.getClientGroupMap().get(session.getClient().getGroup()); boolean flag = cgw != null && cgw.addGroupConsumerSession(session); if (!flag) { throw new Exception("addGroupConsumerSession fail"); @@ -279,14 +278,14 @@ private void startClientGroupConsumer(Session session) throws Exception { private void cleanClientGroupWrapperByCloseSub(Session session) throws Exception { cleanSubscriptionInSession(session); - ClientGroupWrapper clientGroupWrapper = Objects.requireNonNull(session.getClientGroupWrapper().get()); + ClientGroupWrapper clientGroupWrapper = Objects.requireNonNull(this.getClientGroupMap().get(session.getClient().getGroup())); clientGroupWrapper.removeGroupConsumerSession(session); handleUnackMsgsInSession(session); cleanClientGroupWrapperCommon(clientGroupWrapper); } private void cleanClientGroupWrapperByClosePub(Session session) throws Exception { - ClientGroupWrapper clientGroupWrapper = Objects.requireNonNull(session.getClientGroupWrapper().get()); + ClientGroupWrapper clientGroupWrapper = Objects.requireNonNull(this.getClientGroupMap().get(session.getClient().getGroup())); clientGroupWrapper.removeGroupProducerSession(session); cleanClientGroupWrapperCommon(clientGroupWrapper); } @@ -298,7 +297,7 @@ private void cleanClientGroupWrapperByClosePub(Session session) throws Exception */ private void cleanSubscriptionInSession(Session session) throws Exception { for (SubscriptionItem item : session.getSessionContext().getSubscribeTopics().values()) { - ClientGroupWrapper clientGroupWrapper = Objects.requireNonNull(session.getClientGroupWrapper().get()); + ClientGroupWrapper clientGroupWrapper = Objects.requireNonNull(this.getClientGroupMap().get(session.getClient().getGroup())); clientGroupWrapper.removeSubscription(item, session); if (!clientGroupWrapper.hasSubscription(item.getTopic())) { clientGroupWrapper.unsubscribe(item); @@ -314,7 +313,7 @@ private void cleanSubscriptionInSession(Session session) throws Exception { private void handleUnackMsgsInSession(Session session) { // key: seq ConcurrentHashMap unAckMsg = session.getPusher().getUnAckMsg(); - ClientGroupWrapper clientGroupWrapper = Objects.requireNonNull(session.getClientGroupWrapper().get()); + ClientGroupWrapper clientGroupWrapper = Objects.requireNonNull(this.getClientGroupMap().get(session.getClient().getGroup())); if (unAckMsg.size() > 0 && !clientGroupWrapper.getGroupConsumerSessions().isEmpty()) { for (Map.Entry entry : unAckMsg.entrySet()) { DownStreamMsgContext downStreamMsgContext = entry.getValue(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java index ccbb98255b..a0e90ea376 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java @@ -34,6 +34,7 @@ import org.apache.eventmesh.runtime.acl.Acl; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendResult; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendStatus; @@ -259,8 +260,9 @@ public void onException(OnExceptionContext context) { // retry UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext( session, event, pkg.getHeader(), startTime, taskExecuteTime); + ClientGroupWrapper cgw = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(session.getClient().getGroup()); Objects.requireNonNull( - session.getClientGroupWrapper().get()).getTcpRetryer() + cgw).getTcpRetryer() .newTimeout(upStreamMsgContext, 10, TimeUnit.SECONDS); session.getSender().getFailMsgCount().incrementAndGet();