From 0d874f041981d6ad25997609dc6f0f6122121930 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Tue, 16 Jul 2024 14:01:41 +0800 Subject: [PATCH 1/2] setup --- .../agent/SubscriptionConsumerAgent.java | 12 ++- .../agent/SubscriptionTopicAgent.java | 10 ++- .../SubscriptionPrefetchingTsFileQueue.java | 7 +- .../subscription/event/SubscriptionEvent.java | 8 +- .../receiver/SubscriptionReceiverV1.java | 90 ++++++++++++------- 5 files changed, 87 insertions(+), 40 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java index 6d96decbe69b5..eee14943df05a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java @@ -73,11 +73,14 @@ public TPushConsumerGroupMetaRespExceptionMessage handleSingleConsumerGroupMetaC return null; } catch (final Exception e) { final String consumerGroupId = consumerGroupMetaFromCoordinator.getConsumerGroupId(); + LOGGER.warn( + "Exception occurred when handling single consumer group meta changes for consumer group {}", + consumerGroupId, + e); final String exceptionMessage = String.format( "Subscription: Failed to handle single consumer group meta changes for consumer group %s, because %s", consumerGroupId, e); - LOGGER.warn(exceptionMessage); return new TPushConsumerGroupMetaRespExceptionMessage( consumerGroupId, exceptionMessage, System.currentTimeMillis()); } finally { @@ -138,11 +141,14 @@ public TPushConsumerGroupMetaRespExceptionMessage handleConsumerGroupMetaChanges return null; } catch (final Exception e) { final String consumerGroupId = consumerGroupMetaFromCoordinator.getConsumerGroupId(); + LOGGER.warn( + "Exception occurred when handling single consumer group meta changes for consumer group {}", + consumerGroupId, + e); final String exceptionMessage = String.format( "Subscription: Failed to handle single consumer group meta changes for consumer group %s, because %s", consumerGroupId, e); - LOGGER.warn(exceptionMessage); return new TPushConsumerGroupMetaRespExceptionMessage( consumerGroupId, exceptionMessage, System.currentTimeMillis()); } @@ -160,10 +166,10 @@ public TPushConsumerGroupMetaRespExceptionMessage handleDropConsumerGroup( handleDropConsumerGroupInternal(consumerGroupId); return null; } catch (final Exception e) { + LOGGER.warn("Exception occurred when dropping consumer group {}", consumerGroupId, e); final String exceptionMessage = String.format( "Subscription: Failed to drop consumer group %s, because %s", consumerGroupId, e); - LOGGER.warn(exceptionMessage); return new TPushConsumerGroupMetaRespExceptionMessage( consumerGroupId, exceptionMessage, System.currentTimeMillis()); } finally { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java index 9bde571aa346c..4c2bf5d02176a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java @@ -71,11 +71,12 @@ public TPushTopicMetaRespExceptionMessage handleSingleTopicMetaChanges( return null; } catch (final Exception e) { final String topicName = topicMetaFromCoordinator.getTopicName(); + LOGGER.warn( + "Exception occurred when handling single topic meta changes for topic {}", topicName, e); final String exceptionMessage = String.format( "Subscription: Failed to handle single topic meta changes for topic %s, because %s", topicName, e); - LOGGER.warn(exceptionMessage); return new TPushTopicMetaRespExceptionMessage( topicName, exceptionMessage, System.currentTimeMillis()); } finally { @@ -98,11 +99,14 @@ public TPushTopicMetaRespExceptionMessage handleTopicMetaChanges( handleSingleTopicMetaChangesInternal(topicMetaFromCoordinator); } catch (final Exception e) { final String topicName = topicMetaFromCoordinator.getTopicName(); + LOGGER.warn( + "Exception occurred when handling single topic meta changes for topic {}", + topicName, + e); final String exceptionMessage = String.format( "Subscription: Failed to handle single topic meta changes for topic %s, because %s", topicName, e); - LOGGER.warn(exceptionMessage); return new TPushTopicMetaRespExceptionMessage( topicName, exceptionMessage, System.currentTimeMillis()); } @@ -119,9 +123,9 @@ public TPushTopicMetaRespExceptionMessage handleDropTopic(final String topicName handleDropTopicInternal(topicName); return null; } catch (final Exception e) { + LOGGER.warn("Exception occurred when dropping topic {}", topicName, e); final String exceptionMessage = String.format("Subscription: Failed to drop topic %s, because %s", topicName, e); - LOGGER.warn(exceptionMessage); return new TPushTopicMetaRespExceptionMessage( topicName, exceptionMessage, System.currentTimeMillis()); } finally { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index b0aed50b8bf45..6bdf327e10a65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -244,11 +244,16 @@ public SubscriptionEvent poll(final String consumerId) { try { event.fetchNextResponse(); } catch (final Exception e) { + LOGGER.warn( + "Exception occurred when SubscriptionPrefetchingTsFileQueue {} transferring TsFile (with event {}) to consumer {}", + this, + event, + consumerId, + e); final String errorMessage = String.format( "Exception occurred when SubscriptionPrefetchingTsFileQueue %s transferring TsFile (with event %s) to consumer %s: %s", this, event, consumerId, e); - LOGGER.warn(errorMessage); return generateSubscriptionPollErrorResponse(errorMessage); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index b8926a80316bc..0f30aa65ab5c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -299,11 +299,15 @@ public ByteBuffer getCurrentResponseByteBuffer() throws IOException { public void resetResponseByteBuffer(final boolean resetAll) { if (resetAll) { - SubscriptionEventBinaryCache.getInstance().invalidateAll(Arrays.asList(responses)); + SubscriptionEventBinaryCache.getInstance() + .invalidateAll( + Arrays.stream(responses).filter(Objects::nonNull).collect(Collectors.toList())); // maybe friendly for gc Arrays.fill(byteBuffers, null); } else { - SubscriptionEventBinaryCache.getInstance().invalidate(responses[currentResponseIndex]); + if (Objects.nonNull(responses[currentResponseIndex])) { + SubscriptionEventBinaryCache.getInstance().invalidate(responses[currentResponseIndex]); + } // maybe friendly for gc byteBuffers[currentResponseIndex] = null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java index 1da7a78a94539..69f60f593bd32 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java @@ -152,11 +152,12 @@ public final TPipeSubscribeResp handle(final TPipeSubscribeReq req) { private TPipeSubscribeResp handlePipeSubscribeHandshake(final PipeSubscribeHandshakeReq req) { try { return handlePipeSubscribeHandshakeInternal(req); - } catch (final SubscriptionException e) { + } catch (final Exception e) { + LOGGER.warn("Exception occurred when handshaking with request {}", req, e); final String exceptionMessage = String.format( - "Subscription: something unexpected happened when handshaking: %s, req: %s", e, req); - LOGGER.warn(exceptionMessage); + "Subscription: something unexpected happened when handshaking with request %s: %s", + req, e); return PipeSubscribeHandshakeResp.toTPipeSubscribeResp( RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HANDSHAKE_ERROR, exceptionMessage), -1, @@ -217,11 +218,12 @@ private TPipeSubscribeResp handlePipeSubscribeHandshakeInternal( private TPipeSubscribeResp handlePipeSubscribeHeartbeat(final PipeSubscribeHeartbeatReq req) { try { return handlePipeSubscribeHeartbeatInternal(req); - } catch (final SubscriptionException e) { + } catch (final Exception e) { + LOGGER.warn("Exception occurred when heartbeat with request {}", req, e); final String exceptionMessage = String.format( - "Subscription: something unexpected happened when heartbeat: %s, req: %s", e, req); - LOGGER.warn(exceptionMessage); + "Subscription: something unexpected happened when heartbeat with request %s: %s", + req, e); return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp( RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HEARTBEAT_ERROR, exceptionMessage)); } @@ -246,18 +248,19 @@ private TPipeSubscribeResp handlePipeSubscribeHeartbeatInternal( private TPipeSubscribeResp handlePipeSubscribeSubscribe(final PipeSubscribeSubscribeReq req) { try { return handlePipeSubscribeSubscribeInternal(req); - } catch (final SubscriptionException | IOException e) { + } catch (final Exception e) { + LOGGER.warn("Exception occurred when subscribing with request {}", req, e); final String exceptionMessage = String.format( - "Subscription: something unexpected happened when subscribing: %s, req: %s", e, req); - LOGGER.warn(exceptionMessage); + "Subscription: something unexpected happened when subscribing with request %s: %s", + req, e); return PipeSubscribeSubscribeResp.toTPipeSubscribeResp( RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR, exceptionMessage)); } } private TPipeSubscribeResp handlePipeSubscribeSubscribeInternal( - final PipeSubscribeSubscribeReq req) throws IOException { + final PipeSubscribeSubscribeReq req) throws SubscriptionException, IOException { // check consumer config thread local final ConsumerConfig consumerConfig = consumerConfigThreadLocal.get(); if (Objects.isNull(consumerConfig)) { @@ -283,12 +286,12 @@ private TPipeSubscribeResp handlePipeSubscribeSubscribeInternal( private TPipeSubscribeResp handlePipeSubscribeUnsubscribe(final PipeSubscribeUnsubscribeReq req) { try { return handlePipeSubscribeUnsubscribeInternal(req); - } catch (final SubscriptionException | IOException e) { + } catch (final Exception e) { + LOGGER.warn("Exception occurred when unsubscribing with request {}", req, e); final String exceptionMessage = String.format( - "Subscription: something unexpected happened when unsubscribing: %s, req: %s", - e, req); - LOGGER.warn(exceptionMessage); + "Subscription: something unexpected happened when unsubscribing with request %s: %s", + req, e); return PipeSubscribeUnsubscribeResp.toTPipeSubscribeResp( RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR, exceptionMessage)); } @@ -397,11 +400,12 @@ private TPipeSubscribeResp handlePipeSubscribePoll(final PipeSubscribePollReq re }) .filter(Objects::nonNull) .collect(Collectors.toList())); - } catch (final SubscriptionException e) { + } catch (final Exception e) { + LOGGER.warn("Exception occurred when polling with request {}", req, e); final String exceptionMessage = String.format( - "Subscription: something unexpected happened when polling: %s, req: %s", e, req); - LOGGER.warn(exceptionMessage); + "Subscription: something unexpected happened when polling with request %s: %s", + req, e); return PipeSubscribePollResp.toTPipeSubscribeResp( RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR, exceptionMessage), Collections.emptyList()); @@ -439,11 +443,12 @@ private List handlePipeSubscribePollTsFileInternal( private TPipeSubscribeResp handlePipeSubscribeCommit(final PipeSubscribeCommitReq req) { try { return handlePipeSubscribeCommitInternal(req); - } catch (final SubscriptionException e) { + } catch (final Exception e) { + LOGGER.warn("Exception occurred when committing with request {}", req, e); final String exceptionMessage = String.format( - "Subscription: something unexpected happened when committing: %s, req: %s", e, req); - LOGGER.warn(exceptionMessage); + "Subscription: something unexpected happened when committing with request %s: %s", + req, e); return PipeSubscribeCommitResp.toTPipeSubscribeResp( RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_COMMIT_ERROR, exceptionMessage)); } @@ -485,11 +490,12 @@ private TPipeSubscribeResp handlePipeSubscribeCommitInternal(final PipeSubscribe private TPipeSubscribeResp handlePipeSubscribeClose(final PipeSubscribeCloseReq req) { try { return handlePipeSubscribeCloseInternal(req); - } catch (final SubscriptionException e) { + } catch (final Exception e) { + LOGGER.warn("Exception occurred when closing with request {}", req, e); final String exceptionMessage = String.format( - "Subscription: something unexpected happened when closing: %s, req: %s", e, req); - LOGGER.warn(exceptionMessage); + "Subscription: something unexpected happened when closing with request %s: %s", + req, e); return PipeSubscribeCloseResp.toTPipeSubscribeResp( RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_CLOSE_ERROR, exceptionMessage)); } @@ -547,19 +553,22 @@ private void createConsumer(final ConsumerConfig consumerConfig) throws Subscrip .setConsumerAttributes(consumerConfig.getAttribute()); final TSStatus tsStatus = configNodeClient.createConsumer(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + LOGGER.warn( + "Unexpected status code {} when creating consumer {} in config node", + tsStatus, + consumerConfig); final String exceptionMessage = String.format( "Subscription: Failed to create consumer %s in config node, status is %s.", consumerConfig, tsStatus); - LOGGER.warn(exceptionMessage); throw new SubscriptionException(exceptionMessage); } } catch (final ClientManagerException | TException e) { + LOGGER.warn("Exception occurred when creating consumer {} in config node", consumerConfig, e); final String exceptionMessage = String.format( "Subscription: Failed to create consumer %s in config node, exception is %s.", consumerConfig, e); - LOGGER.warn(exceptionMessage); throw new SubscriptionException(exceptionMessage); } } @@ -573,19 +582,22 @@ private void dropConsumer(final ConsumerConfig consumerConfig) throws Subscripti .setConsumerGroupId(consumerConfig.getConsumerGroupId()); final TSStatus tsStatus = configNodeClient.closeConsumer(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + LOGGER.warn( + "Unexpected status code {} when closing consumer {} in config node", + tsStatus, + consumerConfig); final String exceptionMessage = String.format( "Subscription: Failed to close consumer %s in config node, status is %s.", consumerConfig, tsStatus); - LOGGER.warn(exceptionMessage); throw new SubscriptionException(exceptionMessage); } } catch (final ClientManagerException | TException e) { + LOGGER.warn("Exception occurred when closing consumer {} in config node", consumerConfig, e); final String exceptionMessage = String.format( "Subscription: Failed to close consumer %s in config node, exception is %s.", consumerConfig, e); - LOGGER.warn(exceptionMessage); throw new SubscriptionException(exceptionMessage); } @@ -603,19 +615,27 @@ private void subscribe(final ConsumerConfig consumerConfig, final Set to .setTopicNames(topicNames); final TSStatus tsStatus = configNodeClient.createSubscription(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + LOGGER.warn( + "Unexpected status code {} when subscribing topics {} for consumer {} in config node", + tsStatus, + topicNames, + consumerConfig); final String exceptionMessage = String.format( "Subscription: Failed to subscribe topics %s for consumer %s in config node, status is %s.", topicNames, consumerConfig, tsStatus); - LOGGER.warn(exceptionMessage); throw new SubscriptionException(exceptionMessage); } } catch (final ClientManagerException | TException e) { + LOGGER.warn( + "Exception occurred when subscribing topics {} for consumer {} in config node", + topicNames, + consumerConfig, + e); final String exceptionMessage = String.format( "Subscription: Failed to subscribe topics %s for consumer %s in config node, exception is %s.", topicNames, consumerConfig, e); - LOGGER.warn(exceptionMessage); throw new SubscriptionException(exceptionMessage); } } @@ -631,19 +651,27 @@ private void unsubscribe(final ConsumerConfig consumerConfig, final Set .setTopicNames(topicNames); final TSStatus tsStatus = configNodeClient.dropSubscription(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + LOGGER.warn( + "Unexpected status code {} when unsubscribing topics {} for consumer {} in config node", + tsStatus, + topicNames, + consumerConfig); final String exceptionMessage = String.format( "Subscription: Failed to unsubscribe topics %s for consumer %s in config node, status is %s.", topicNames, consumerConfig, tsStatus); - LOGGER.warn(exceptionMessage); throw new SubscriptionException(exceptionMessage); } } catch (final ClientManagerException | TException e) { + LOGGER.warn( + "Exception occurred when unsubscribing topics {} for consumer {} in config node", + topicNames, + consumerConfig, + e); final String exceptionMessage = String.format( "Subscription: Failed to unsubscribe topics %s for consumer %s in config node, exception is %s.", topicNames, consumerConfig, e); - LOGGER.warn(exceptionMessage); throw new SubscriptionException(exceptionMessage); } } From e6972a2ea3d3a357cb3061fe66e9ae0d6c7c5a80 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Tue, 16 Jul 2024 14:40:26 +0800 Subject: [PATCH 2/2] improve --- .../agent/SubscriptionBrokerAgent.java | 28 ++++-- .../broker/SubscriptionBroker.java | 89 ++++++++++++------- 2 files changed, 81 insertions(+), 36 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java index f47f7cba0b2be..87c7edfaf8aae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.subscription.agent; +import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMetaKeeper; import org.apache.iotdb.db.subscription.broker.SubscriptionBroker; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; import org.apache.iotdb.db.subscription.task.subtask.SubscriptionConnectorSubtask; @@ -100,19 +101,30 @@ public List commit( /////////////////////////////// broker /////////////////////////////// + /** + * Caller should ensure that the method is called in the lock {@link + * ConsumerGroupMetaKeeper#acquireWriteLock}. + */ public boolean isBrokerExist(final String consumerGroupId) { return consumerGroupIdToSubscriptionBroker.containsKey(consumerGroupId); } - public synchronized void createBroker(final String consumerGroupId) { + /** + * Caller should ensure that the method is called in the lock {@link + * ConsumerGroupMetaKeeper#acquireWriteLock}. + */ + public void createBroker(final String consumerGroupId) { final SubscriptionBroker broker = new SubscriptionBroker(consumerGroupId); consumerGroupIdToSubscriptionBroker.put(consumerGroupId, broker); } /** - * @return true -> if drop broker success + * Caller should ensure that the method is called in the lock {@link + * ConsumerGroupMetaKeeper#acquireWriteLock}. + * + * @return {@code true} if drop broker success, {@code false} otherwise */ - public synchronized boolean dropBroker(final String consumerGroupId) { + public boolean dropBroker(final String consumerGroupId) { final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); if (Objects.isNull(broker)) { LOGGER.warn( @@ -124,6 +136,7 @@ public synchronized boolean dropBroker(final String consumerGroupId) { LOGGER.warn( "Subscription: broker bound to consumer group [{}] is not empty when dropping", consumerGroupId); + // do nothing return false; } consumerGroupIdToSubscriptionBroker.remove(consumerGroupId); @@ -136,7 +149,8 @@ public void bindPrefetchingQueue(final SubscriptionConnectorSubtask subtask) { final String consumerGroupId = subtask.getConsumerGroupId(); final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); if (Objects.isNull(broker)) { - LOGGER.warn("Subscription: consumer group [{}] does not exist", consumerGroupId); + LOGGER.warn( + "Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId); return; } broker.bindPrefetchingQueue(subtask.getTopicName(), subtask.getInputPendingQueue()); @@ -146,7 +160,8 @@ public void unbindPrefetchingQueue( final String consumerGroupId, final String topicName, final boolean doRemove) { final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); if (Objects.isNull(broker)) { - LOGGER.warn("Subscription: consumer group [{}] does not exist", consumerGroupId); + LOGGER.warn( + "Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId); return; } broker.unbindPrefetchingQueue(topicName, doRemove); @@ -155,7 +170,8 @@ public void unbindPrefetchingQueue( public void executePrefetch(final String consumerGroupId, final String topicName) { final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); if (Objects.isNull(broker)) { - LOGGER.warn("Subscription: consumer group [{}] does not exist", consumerGroupId); + LOGGER.warn( + "Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId); return; } broker.executePrefetch(topicName); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java index 57cfca226c898..b9ae1ad7197a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java @@ -60,27 +60,35 @@ public boolean isEmpty() { public List poll(final String consumerId, final Set topicNames) { final List events = new ArrayList<>(); - for (final Map.Entry entry : - topicNameToPrefetchingQueue.entrySet()) { - final String topicName = entry.getKey(); - final SubscriptionPrefetchingQueue prefetchingQueue = entry.getValue(); - if (topicNames.contains(topicName)) { - // before determining if it is closed - if (prefetchingQueue.isCompleted()) { - LOGGER.info( - "Subscription: prefetching queue bound to topic [{}] is completed, return termination response to client", - topicName); - events.add(prefetchingQueue.generateSubscriptionPollTerminationResponse()); - continue; - } - if (prefetchingQueue.isClosed()) { - LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is closed", topicName); - continue; - } - final SubscriptionEvent event = prefetchingQueue.poll(consumerId); - if (Objects.nonNull(event)) { - events.add(event); - } + for (final String topicName : topicNames) { + final SubscriptionPrefetchingQueue prefetchingQueue = + topicNameToPrefetchingQueue.get(topicName); + if (Objects.isNull(prefetchingQueue)) { + LOGGER.warn( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", + topicName, + brokerId); + continue; + } + // check if completed before closed + if (prefetchingQueue.isCompleted()) { + LOGGER.info( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is completed, return termination response to client", + topicName, + brokerId); + events.add(prefetchingQueue.generateSubscriptionPollTerminationResponse()); + continue; + } + if (prefetchingQueue.isClosed()) { + LOGGER.warn( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", + topicName, + brokerId); + continue; + } + final SubscriptionEvent event = prefetchingQueue.poll(consumerId); + if (Objects.nonNull(event)) { + events.add(event); } } return events; @@ -96,19 +104,24 @@ public List pollTsFile( if (Objects.isNull(prefetchingQueue)) { final String errorMessage = String.format( - "Subscription: prefetching queue bound to topic [%s] does not exist", topicName); + "Subscription: prefetching queue bound to topic [%s] for consumer group [%s] does not exist", + topicName, brokerId); LOGGER.warn(errorMessage); throw new SubscriptionException(errorMessage); } if (!(prefetchingQueue instanceof SubscriptionPrefetchingTsFileQueue)) { final String errorMessage = String.format( - "Subscription: prefetching queue bound to topic [%s] is invalid", topicName); + "Subscription: prefetching queue bound to topic [%s] for consumer group [%s] is invalid", + topicName, brokerId); LOGGER.warn(errorMessage); throw new SubscriptionException(errorMessage); } if (prefetchingQueue.isClosed()) { - LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is closed", topicName); + LOGGER.warn( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", + topicName, + brokerId); return Collections.emptyList(); } final SubscriptionEvent event = @@ -132,11 +145,16 @@ public List commit( topicNameToPrefetchingQueue.get(topicName); if (Objects.isNull(prefetchingQueue)) { LOGGER.warn( - "Subscription: prefetching queue bound to topic [{}] does not exist", topicName); + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", + topicName, + brokerId); continue; } if (prefetchingQueue.isClosed()) { - LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is closed", topicName); + LOGGER.warn( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", + topicName, + brokerId); continue; } if (!nack) { @@ -160,7 +178,9 @@ public void bindPrefetchingQueue( topicNameToPrefetchingQueue.get(topicName); if (Objects.nonNull(prefetchingQueue)) { LOGGER.warn( - "Subscription: prefetching queue bound to topic [{}] has already existed", topicName); + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] has already existed", + topicName, + brokerId); return; } final String topicFormat = SubscriptionAgent.topic().getTopicFormat(topicName); @@ -183,7 +203,10 @@ public void unbindPrefetchingQueue(final String topicName, final boolean doRemov final SubscriptionPrefetchingQueue prefetchingQueue = topicNameToPrefetchingQueue.get(topicName); if (Objects.isNull(prefetchingQueue)) { - LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does not exist", topicName); + LOGGER.warn( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", + topicName, + brokerId); return; } @@ -214,11 +237,17 @@ public void executePrefetch(final String topicName) { final SubscriptionPrefetchingQueue prefetchingQueue = topicNameToPrefetchingQueue.get(topicName); if (Objects.isNull(prefetchingQueue)) { - LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does not exist", topicName); + LOGGER.warn( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", + topicName, + brokerId); return; } if (prefetchingQueue.isClosed()) { - LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is closed", topicName); + LOGGER.warn( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", + topicName, + brokerId); return; } prefetchingQueue.executePrefetch();