Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.subscription.agent;

import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMetaKeeper;
import org.apache.iotdb.db.subscription.broker.SubscriptionBroker;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.task.subtask.SubscriptionConnectorSubtask;
Expand Down Expand Up @@ -100,19 +101,30 @@ public List<SubscriptionCommitContext> commit(

/////////////////////////////// broker ///////////////////////////////

/**
* Caller should ensure that the method is called in the lock {@link
* ConsumerGroupMetaKeeper#acquireWriteLock}.
*/
public boolean isBrokerExist(final String consumerGroupId) {
return consumerGroupIdToSubscriptionBroker.containsKey(consumerGroupId);
}

public synchronized void createBroker(final String consumerGroupId) {
/**
* Caller should ensure that the method is called in the lock {@link
* ConsumerGroupMetaKeeper#acquireWriteLock}.
*/
public void createBroker(final String consumerGroupId) {
final SubscriptionBroker broker = new SubscriptionBroker(consumerGroupId);
consumerGroupIdToSubscriptionBroker.put(consumerGroupId, broker);
}

/**
* @return true -> if drop broker success
* Caller should ensure that the method is called in the lock {@link
* ConsumerGroupMetaKeeper#acquireWriteLock}.
*
* @return {@code true} if drop broker success, {@code false} otherwise
*/
public synchronized boolean dropBroker(final String consumerGroupId) {
public boolean dropBroker(final String consumerGroupId) {
final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
LOGGER.warn(
Expand All @@ -124,6 +136,7 @@ public synchronized boolean dropBroker(final String consumerGroupId) {
LOGGER.warn(
"Subscription: broker bound to consumer group [{}] is not empty when dropping",
consumerGroupId);
// do nothing
return false;
}
consumerGroupIdToSubscriptionBroker.remove(consumerGroupId);
Expand All @@ -136,7 +149,8 @@ public void bindPrefetchingQueue(final SubscriptionConnectorSubtask subtask) {
final String consumerGroupId = subtask.getConsumerGroupId();
final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
LOGGER.warn("Subscription: consumer group [{}] does not exist", consumerGroupId);
LOGGER.warn(
"Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId);
return;
}
broker.bindPrefetchingQueue(subtask.getTopicName(), subtask.getInputPendingQueue());
Expand All @@ -146,7 +160,8 @@ public void unbindPrefetchingQueue(
final String consumerGroupId, final String topicName, final boolean doRemove) {
final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
LOGGER.warn("Subscription: consumer group [{}] does not exist", consumerGroupId);
LOGGER.warn(
"Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId);
return;
}
broker.unbindPrefetchingQueue(topicName, doRemove);
Expand All @@ -155,7 +170,8 @@ public void unbindPrefetchingQueue(
public void executePrefetch(final String consumerGroupId, final String topicName) {
final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
LOGGER.warn("Subscription: consumer group [{}] does not exist", consumerGroupId);
LOGGER.warn(
"Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId);
return;
}
broker.executePrefetch(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,14 @@ public TPushConsumerGroupMetaRespExceptionMessage handleSingleConsumerGroupMetaC
return null;
} catch (final Exception e) {
final String consumerGroupId = consumerGroupMetaFromCoordinator.getConsumerGroupId();
LOGGER.warn(
"Exception occurred when handling single consumer group meta changes for consumer group {}",
consumerGroupId,
e);
final String exceptionMessage =
String.format(
"Subscription: Failed to handle single consumer group meta changes for consumer group %s, because %s",
consumerGroupId, e);
LOGGER.warn(exceptionMessage);
return new TPushConsumerGroupMetaRespExceptionMessage(
consumerGroupId, exceptionMessage, System.currentTimeMillis());
} finally {
Expand Down Expand Up @@ -138,11 +141,14 @@ public TPushConsumerGroupMetaRespExceptionMessage handleConsumerGroupMetaChanges
return null;
} catch (final Exception e) {
final String consumerGroupId = consumerGroupMetaFromCoordinator.getConsumerGroupId();
LOGGER.warn(
"Exception occurred when handling single consumer group meta changes for consumer group {}",
consumerGroupId,
e);
final String exceptionMessage =
String.format(
"Subscription: Failed to handle single consumer group meta changes for consumer group %s, because %s",
consumerGroupId, e);
LOGGER.warn(exceptionMessage);
return new TPushConsumerGroupMetaRespExceptionMessage(
consumerGroupId, exceptionMessage, System.currentTimeMillis());
}
Expand All @@ -160,10 +166,10 @@ public TPushConsumerGroupMetaRespExceptionMessage handleDropConsumerGroup(
handleDropConsumerGroupInternal(consumerGroupId);
return null;
} catch (final Exception e) {
LOGGER.warn("Exception occurred when dropping consumer group {}", consumerGroupId, e);
final String exceptionMessage =
String.format(
"Subscription: Failed to drop consumer group %s, because %s", consumerGroupId, e);
LOGGER.warn(exceptionMessage);
return new TPushConsumerGroupMetaRespExceptionMessage(
consumerGroupId, exceptionMessage, System.currentTimeMillis());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ public TPushTopicMetaRespExceptionMessage handleSingleTopicMetaChanges(
return null;
} catch (final Exception e) {
final String topicName = topicMetaFromCoordinator.getTopicName();
LOGGER.warn(
"Exception occurred when handling single topic meta changes for topic {}", topicName, e);
final String exceptionMessage =
String.format(
"Subscription: Failed to handle single topic meta changes for topic %s, because %s",
topicName, e);
LOGGER.warn(exceptionMessage);
return new TPushTopicMetaRespExceptionMessage(
topicName, exceptionMessage, System.currentTimeMillis());
} finally {
Expand All @@ -98,11 +99,14 @@ public TPushTopicMetaRespExceptionMessage handleTopicMetaChanges(
handleSingleTopicMetaChangesInternal(topicMetaFromCoordinator);
} catch (final Exception e) {
final String topicName = topicMetaFromCoordinator.getTopicName();
LOGGER.warn(
"Exception occurred when handling single topic meta changes for topic {}",
topicName,
e);
final String exceptionMessage =
String.format(
"Subscription: Failed to handle single topic meta changes for topic %s, because %s",
topicName, e);
LOGGER.warn(exceptionMessage);
return new TPushTopicMetaRespExceptionMessage(
topicName, exceptionMessage, System.currentTimeMillis());
}
Expand All @@ -119,9 +123,9 @@ public TPushTopicMetaRespExceptionMessage handleDropTopic(final String topicName
handleDropTopicInternal(topicName);
return null;
} catch (final Exception e) {
LOGGER.warn("Exception occurred when dropping topic {}", topicName, e);
final String exceptionMessage =
String.format("Subscription: Failed to drop topic %s, because %s", topicName, e);
LOGGER.warn(exceptionMessage);
return new TPushTopicMetaRespExceptionMessage(
topicName, exceptionMessage, System.currentTimeMillis());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,35 @@ public boolean isEmpty() {

public List<SubscriptionEvent> poll(final String consumerId, final Set<String> topicNames) {
final List<SubscriptionEvent> events = new ArrayList<>();
for (final Map.Entry<String, SubscriptionPrefetchingQueue> entry :
topicNameToPrefetchingQueue.entrySet()) {
final String topicName = entry.getKey();
final SubscriptionPrefetchingQueue prefetchingQueue = entry.getValue();
if (topicNames.contains(topicName)) {
// before determining if it is closed
if (prefetchingQueue.isCompleted()) {
LOGGER.info(
"Subscription: prefetching queue bound to topic [{}] is completed, return termination response to client",
topicName);
events.add(prefetchingQueue.generateSubscriptionPollTerminationResponse());
continue;
}
if (prefetchingQueue.isClosed()) {
LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is closed", topicName);
continue;
}
final SubscriptionEvent event = prefetchingQueue.poll(consumerId);
if (Objects.nonNull(event)) {
events.add(event);
}
for (final String topicName : topicNames) {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
if (Objects.isNull(prefetchingQueue)) {
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist",
topicName,
brokerId);
continue;
}
// check if completed before closed
if (prefetchingQueue.isCompleted()) {
LOGGER.info(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is completed, return termination response to client",
topicName,
brokerId);
events.add(prefetchingQueue.generateSubscriptionPollTerminationResponse());
continue;
}
if (prefetchingQueue.isClosed()) {
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed",
topicName,
brokerId);
continue;
}
final SubscriptionEvent event = prefetchingQueue.poll(consumerId);
if (Objects.nonNull(event)) {
events.add(event);
}
}
return events;
Expand All @@ -96,19 +104,24 @@ public List<SubscriptionEvent> pollTsFile(
if (Objects.isNull(prefetchingQueue)) {
final String errorMessage =
String.format(
"Subscription: prefetching queue bound to topic [%s] does not exist", topicName);
"Subscription: prefetching queue bound to topic [%s] for consumer group [%s] does not exist",
topicName, brokerId);
LOGGER.warn(errorMessage);
throw new SubscriptionException(errorMessage);
}
if (!(prefetchingQueue instanceof SubscriptionPrefetchingTsFileQueue)) {
final String errorMessage =
String.format(
"Subscription: prefetching queue bound to topic [%s] is invalid", topicName);
"Subscription: prefetching queue bound to topic [%s] for consumer group [%s] is invalid",
topicName, brokerId);
LOGGER.warn(errorMessage);
throw new SubscriptionException(errorMessage);
}
if (prefetchingQueue.isClosed()) {
LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is closed", topicName);
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed",
topicName,
brokerId);
return Collections.emptyList();
}
final SubscriptionEvent event =
Expand All @@ -132,11 +145,16 @@ public List<SubscriptionCommitContext> commit(
topicNameToPrefetchingQueue.get(topicName);
if (Objects.isNull(prefetchingQueue)) {
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] does not exist", topicName);
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist",
topicName,
brokerId);
continue;
}
if (prefetchingQueue.isClosed()) {
LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is closed", topicName);
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed",
topicName,
brokerId);
continue;
}
if (!nack) {
Expand All @@ -160,7 +178,9 @@ public void bindPrefetchingQueue(
topicNameToPrefetchingQueue.get(topicName);
if (Objects.nonNull(prefetchingQueue)) {
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] has already existed", topicName);
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] has already existed",
topicName,
brokerId);
return;
}
final String topicFormat = SubscriptionAgent.topic().getTopicFormat(topicName);
Expand All @@ -183,7 +203,10 @@ public void unbindPrefetchingQueue(final String topicName, final boolean doRemov
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
if (Objects.isNull(prefetchingQueue)) {
LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does not exist", topicName);
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist",
topicName,
brokerId);
return;
}

Expand Down Expand Up @@ -214,11 +237,17 @@ public void executePrefetch(final String topicName) {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
if (Objects.isNull(prefetchingQueue)) {
LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does not exist", topicName);
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist",
topicName,
brokerId);
return;
}
if (prefetchingQueue.isClosed()) {
LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is closed", topicName);
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed",
topicName,
brokerId);
return;
}
prefetchingQueue.executePrefetch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,16 @@ public SubscriptionEvent poll(final String consumerId) {
try {
event.fetchNextResponse();
} catch (final Exception e) {
LOGGER.warn(
"Exception occurred when SubscriptionPrefetchingTsFileQueue {} transferring TsFile (with event {}) to consumer {}",
this,
event,
consumerId,
e);
final String errorMessage =
String.format(
"Exception occurred when SubscriptionPrefetchingTsFileQueue %s transferring TsFile (with event %s) to consumer %s: %s",
this, event, consumerId, e);
LOGGER.warn(errorMessage);
return generateSubscriptionPollErrorResponse(errorMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,15 @@ public ByteBuffer getCurrentResponseByteBuffer() throws IOException {

public void resetResponseByteBuffer(final boolean resetAll) {
if (resetAll) {
SubscriptionEventBinaryCache.getInstance().invalidateAll(Arrays.asList(responses));
SubscriptionEventBinaryCache.getInstance()
.invalidateAll(
Arrays.stream(responses).filter(Objects::nonNull).collect(Collectors.toList()));
// maybe friendly for gc
Arrays.fill(byteBuffers, null);
} else {
SubscriptionEventBinaryCache.getInstance().invalidate(responses[currentResponseIndex]);
if (Objects.nonNull(responses[currentResponseIndex])) {
SubscriptionEventBinaryCache.getInstance().invalidate(responses[currentResponseIndex]);
}
// maybe friendly for gc
byteBuffers[currentResponseIndex] = null;
}
Expand Down
Loading