Skip to content

Commit

Permalink
[ISSUE 7117] check message is in memory or not when init consumer off…
Browse files Browse the repository at this point in the history
…set for pop
  • Loading branch information
xdkxlk committed Aug 9, 2023
1 parent 6bc2c84 commit 110f06a
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
POP_LOGGER.error("put ack msg error:" + putMessageResult);
}
System.out.printf("put ack to store %s", ackMsg);
PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,20 +639,7 @@ private long getPopOffset(String topic, String group, int queueId, int initMode,

long offset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, queueId);
if (offset < 0) {
if (ConsumeInitMode.MIN == initMode) {
offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
} else {
// pop last one,then commit offset.
offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1;
// max & no consumer offset
if (offset < 0) {
offset = 0;
}
if (init) {
this.brokerController.getConsumerOffsetManager().commitOffset(
"getPopOffset", group, topic, queueId, offset);
}
}
offset = this.getInitOffset(topic, group, queueId, initMode, init);
}

if (checkResetOffset) {
Expand All @@ -670,6 +657,31 @@ private long getPopOffset(String topic, String group, int queueId, int initMode,
}
}

private long getInitOffset(String topic, String group, int queueId, int initMode, boolean init) {
long offset;
if (ConsumeInitMode.MIN == initMode) {
return this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
} else {
if (this.brokerController.getBrokerConfig().isInitPopOffsetByCheckMsgInMem() &&
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId) <= 0 &&
this.brokerController.getMessageStore().checkInMemByConsumeOffset(topic, queueId, 0, 1)) {
offset = 0;
} else {
// pop last one,then commit offset.
offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1;
// max & no consumer offset
if (offset < 0) {
offset = 0;
}
}
if (init) {
this.brokerController.getConsumerOffsetManager().commitOffset(
"getPopOffset", group, topic, queueId, offset);
}
}
return offset;
}

public final MessageExtBrokerInner buildCkMsg(final PopCheckPoint ck, final int reviveQid) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ public class BrokerConfig extends BrokerIdentity {
private int popCkOffsetMaxQueueSize = 20000;
private boolean enablePopBatchAck = false;
private boolean enableNotifyAfterPopOrderLockRelease = true;
private boolean initPopOffsetByCheckMsgInMem = true;

private boolean realTimeNotifyConsumerChange = true;

Expand Down Expand Up @@ -1264,6 +1265,14 @@ public void setEnableNotifyAfterPopOrderLockRelease(boolean enableNotifyAfterPop
this.enableNotifyAfterPopOrderLockRelease = enableNotifyAfterPopOrderLockRelease;
}

public boolean isInitPopOffsetByCheckMsgInMem() {
return initPopOffsetByCheckMsgInMem;
}

public void setInitPopOffsetByCheckMsgInMem(boolean initPopOffsetByCheckMsgInMem) {
this.initPopOffsetByCheckMsgInMem = initPopOffsetByCheckMsgInMem;
}

public boolean isRealTimeNotifyConsumerChange() {
return realTimeNotifyConsumerChange;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected static boolean isTopicRouteValid(TopicRouteData routeData) {
protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) {
if (isTopicRouteValid(topicRouteData)) {
MessageQueueView tmp = new MessageQueueView(topic, topicRouteData);
log.info("load topic route from namesrv. topic: {}, queue: {}", topic, tmp);
log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp);
return tmp;
}
return MessageQueueView.WRAPPED_EMPTY_QUEUE;
Expand Down

0 comments on commit 110f06a

Please sign in to comment.