Skip to content

Commit

Permalink
Merge pull request #3047 from qingjiyuji/develop
Browse files Browse the repository at this point in the history
[ISSUE #3046]:Polish lite pull consumer code style
  • Loading branch information
duhenglucky committed Jul 16, 2021
2 parents 36333ce + 893e8fd commit 6aabf77
Showing 1 changed file with 21 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private enum SubscriptionType {

private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";

private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
private static final String SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
/**
* the type of subscription
*/
Expand Down Expand Up @@ -195,19 +195,21 @@ public void executeHookAfter(final ConsumeMessageContext context) {
}

private void checkServiceState() {
if (this.serviceState != ServiceState.RUNNING)
if (this.serviceState != ServiceState.RUNNING) {
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
}
}

public void updateNameServerAddr(String newAddresses) {
this.mQClientFactory.getMQClientAPIImpl().updateNameServerAddressList(newAddresses);
}

private synchronized void setSubscriptionType(SubscriptionType type) {
if (this.subscriptionType == SubscriptionType.NONE)
if (this.subscriptionType == SubscriptionType.NONE) {
this.subscriptionType = type;
else if (this.subscriptionType != type)
throw new IllegalStateException(SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE);
} else if (this.subscriptionType != type) {
throw new IllegalStateException(SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE);
}
}

private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
Expand Down Expand Up @@ -464,7 +466,7 @@ private void updateTopicSubscribeInfoWhenSubscriptionChanged() {

public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
if (topic == null || topic.equals("")) {
if (topic == null || "".equals(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty.");
}
setSubscriptionType(SubscriptionType.SUBSCRIBE);
Expand All @@ -483,7 +485,7 @@ public synchronized void subscribe(String topic, String subExpression) throws MQ

public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
try {
if (topic == null || topic.equals("")) {
if (topic == null || "".equals(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty.");
}
setSubscriptionType(SubscriptionType.SUBSCRIBE);
Expand Down Expand Up @@ -533,8 +535,9 @@ private void maybeAutoCommit() {
public synchronized List<MessageExt> poll(long timeout) {
try {
checkServiceState();
if (timeout < 0)
if (timeout < 0) {
throw new IllegalArgumentException("Timeout must not be negative");
}

if (defaultLitePullConsumer.isAutoCommit()) {
maybeAutoCommit();
Expand All @@ -546,8 +549,9 @@ public synchronized List<MessageExt> poll(long timeout) {
if (endTime - System.currentTimeMillis() > 0) {
while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() <= 0)
if (endTime - System.currentTimeMillis() <= 0) {
break;
}
}
}

Expand Down Expand Up @@ -671,8 +675,9 @@ private long fetchConsumeOffset(MessageQueue messageQueue) throws MQClientExcept
public long committed(MessageQueue messageQueue) throws MQClientException {
checkServiceState();
long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
if (offset == -2)
if (offset == -2) {
throw new MQClientException("Fetch consume offset from broker exception", null);
}
return offset;
}

Expand All @@ -683,8 +688,9 @@ private void clearMessageQueueInCache(MessageQueue messageQueue) {
}
Iterator<ConsumeRequest> iter = consumeRequestCache.iterator();
while (iter.hasNext()) {
if (iter.next().getMessageQueue().equals(messageQueue))
if (iter.next().getMessageQueue().equals(messageQueue)) {
iter.remove();
}
}
}

Expand Down Expand Up @@ -735,10 +741,11 @@ public void run() {
return;
}

if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0)
if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
}
return;
}

Expand Down Expand Up @@ -790,11 +797,10 @@ public void run() {
long pullDelayTimeMills = 0;
try {
SubscriptionData subscriptionData;
String topic = this.messageQueue.getTopic();
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
String topic = this.messageQueue.getTopic();
subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
} else {
String topic = this.messageQueue.getTopic();
subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
}

Expand Down

0 comments on commit 6aabf77

Please sign in to comment.