-
Notifications
You must be signed in to change notification settings - Fork 12k
[ISSUE #6205] LitePullConsumer support async pull message #6278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I don't know if I'm thinking wrong. |
| this.taskTable.put(messageQueue, pullTask); | ||
| this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS); | ||
| if (!this.messageQueueTable.containsKey(messageQueue)) { | ||
| // PullTaskImpl pullTask = new PullTaskImpl(messageQueue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's suggested to remove the comment line.
| // Dummy value to associate with an Object in the backing Map | ||
| private static final Object PRESENT = new Object(); | ||
|
|
||
| private final ConcurrentMap<MessageQueue, Object> messageQueueTable = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's quite strange to use Object as the map value. Are you attempting to create a concurrent set? If that is the case, you may use the keySet of a ConcurrentMap instead.
| } | ||
| } | ||
|
|
||
| public void pullMessage(final MessageQueue messageQueue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to abstract a method for the purpose of code reuse rather than duplicating it?
Lines 896 to 1020 in a3228ad
| public void run() { | |
| if (!this.isCancelled()) { | |
| this.currentThread = Thread.currentThread(); | |
| if (assignedMessageQueue.isPaused(messageQueue)) { | |
| scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS); | |
| log.debug("Message Queue: {} has been paused!", messageQueue); | |
| return; | |
| } | |
| ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); | |
| if (null == processQueue || processQueue.isDropped()) { | |
| log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue); | |
| return; | |
| } | |
| processQueue.setLastPullTimestamp(System.currentTimeMillis()); | |
| if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) { | |
| scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS); | |
| if ((consumeRequestFlowControlTimes++ % 1000) == 0) { | |
| log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); | |
| } | |
| return; | |
| } | |
| long cachedMessageCount = processQueue.getMsgCount().get(); | |
| long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); | |
| if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) { | |
| scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS); | |
| if ((queueFlowControlTimes++ % 1000) == 0) { | |
| log.warn( | |
| "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", | |
| defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes); | |
| } | |
| return; | |
| } | |
| if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) { | |
| scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS); | |
| if ((queueFlowControlTimes++ % 1000) == 0) { | |
| log.warn( | |
| "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", | |
| defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes); | |
| } | |
| return; | |
| } | |
| if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) { | |
| scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS); | |
| if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { | |
| log.warn( | |
| "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", | |
| processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes); | |
| } | |
| return; | |
| } | |
| long offset = 0L; | |
| try { | |
| offset = nextPullOffset(messageQueue); | |
| } catch (Exception e) { | |
| log.error("Failed to get next pull offset", e); | |
| scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS); | |
| return; | |
| } | |
| if (this.isCancelled() || processQueue.isDropped()) { | |
| return; | |
| } | |
| long pullDelayTimeMills = 0; | |
| try { | |
| SubscriptionData subscriptionData; | |
| String topic = this.messageQueue.getTopic(); | |
| if (subscriptionType == SubscriptionType.SUBSCRIBE) { | |
| subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); | |
| } else { | |
| String subExpression4Assign = topicToSubExpression.get(topic); | |
| subExpression4Assign = subExpression4Assign == null ? SubscriptionData.SUB_ALL : subExpression4Assign; | |
| subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression4Assign); | |
| } | |
| PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize()); | |
| if (this.isCancelled() || processQueue.isDropped()) { | |
| return; | |
| } | |
| switch (pullResult.getPullStatus()) { | |
| case FOUND: | |
| final Object objLock = messageQueueLock.fetchLockObject(messageQueue); | |
| synchronized (objLock) { | |
| if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) { | |
| processQueue.putMessage(pullResult.getMsgFoundList()); | |
| submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue)); | |
| } | |
| } | |
| break; | |
| case OFFSET_ILLEGAL: | |
| log.warn("The pull request offset illegal, {}", pullResult.toString()); | |
| break; | |
| default: | |
| break; | |
| } | |
| updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue); | |
| } catch (InterruptedException interruptedException) { | |
| log.warn("Polling thread was interrupted.", interruptedException); | |
| } catch (Throwable e) { | |
| if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) { | |
| pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL; | |
| } else { | |
| pullDelayTimeMills = pullTimeDelayMillsWhenException; | |
| } | |
| log.error("An error occurred in pull message process.", e); | |
| } | |
| if (!this.isCancelled()) { | |
| scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS); | |
| } else { | |
| log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue); | |
| } | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your reply, I don't have a good idea, my thinking is to add a parameter to judge sync or async, and then if... else handle differently
|
@drpmma sorry, I clicked review by accident |
Make sure set the target branch to
developWhat is the purpose of the change
fix #6205
Brief changelog
LitePullConsumer support async pull message
Verifying this change
Follow this checklist to help us incorporate your contribution quickly and easily. Notice,
it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.[ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyleto make sure basic checks pass. Runmvn clean install -DskipITsto make sure unit-test pass. Runmvn clean test-compile failsafe:integration-testto make sure integration-test pass.