Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TUBEMQ-155] Use enum class for consume position[addendum] #106

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.tubemq.client.config;

import org.apache.tubemq.client.common.TClientConstants;
import org.apache.tubemq.client.consumer.ConsumeModel;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.cluster.MasterInfo;
import org.apache.tubemq.corebase.utils.TStringUtils;
Expand All @@ -36,7 +36,7 @@ public class ConsumerConfig extends TubeClientConfig {
* 0: Start from the latest position for the first time. Otherwise start from last consume position.
* 1: Start from the latest consume position.
*/
private ConsumeModel consumeModel = ConsumeModel.CONSUMER_FROM_LATEST_OFFSET;
private ConsumePosition consumePosition = ConsumePosition.CONSUMER_FROM_LATEST_OFFSET;
private int maxSubInfoReportIntvlTimes =
TClientConstants.MAX_SUBSCRIBE_REPORT_INTERVAL_TIMES;
private long msgNotFoundWaitPeriodMs =
Expand Down Expand Up @@ -84,12 +84,28 @@ public String getConsumerGroup() {
return consumerGroup;
}

public ConsumeModel getConsumeModel() {
return consumeModel;
public ConsumePosition getConsumePosition() {
return consumePosition;
}

public void setConsumeModel(ConsumeModel consumeModel) {
this.consumeModel = consumeModel;
public void setConsumePosition(ConsumePosition consumePosition) {
this.consumePosition = consumePosition;
}

/**
* recommend to use getConsumePosition
*/
@Deprecated
public int getConsumeModel() {
return consumePosition.getCode();
}

/**
* recommend to use setConsumePosition
*/
@Deprecated
public void setConsumeModel(int consumeModel) {
setConsumePosition(ConsumePosition.valueOf(consumeModel));
}

public long getMsgNotFoundWaitPeriodMs() {
Expand Down Expand Up @@ -208,7 +224,7 @@ public String toString() {
return new StringBuilder(512).append("\"ConsumerConfig\":{")
.append("\"consumerGroup\":\"").append(this.consumerGroup)
.append("\",\"maxSubInfoReportIntvlTimes\":").append(this.maxSubInfoReportIntvlTimes)
.append(",\"consumeModel\":").append(this.consumeModel)
.append(",\"consumePosition\":").append(this.consumePosition)
.append(",\"msgNotFoundWaitPeriodMs\":").append(this.msgNotFoundWaitPeriodMs)
.append(",\"shutDownRebalanceWaitPeriodMs\":").append(this.shutDownRebalanceWaitPeriodMs)
.append(",\"pushFetchThreadCnt\":").append(this.pushFetchThreadCnt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,15 +1179,15 @@ private void processAuthorizedToken(ClientMaster.MasterAuthorizedInfo inAuthoriz

private int getGroupInitReadStatus(boolean isFistReg) {
int readStatus = TBaseConstants.CONSUME_MODEL_READ_NORMAL;
switch (consumerConfig.getConsumeModel()) {
switch (consumerConfig.getConsumePosition()) {
case CONSUMER_FROM_LATEST_OFFSET: {
if (isFistReg) {
readStatus = TBaseConstants.CONSUME_MODEL_READ_FROM_MAX;
logger.info("[Consume From Max Offset]" + consumerId);
}
break;
}
case CONSUMER_FROM_MAX_OFFSET: {
case CONSUMER_FROM_MAX_OFFSET_ALWAYS: {
if (isFistReg) {
readStatus = TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS;
logger.info("[Consume From Max Offset Always]" + consumerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
*/
package org.apache.tubemq.client.consumer;

public enum ConsumeModel {
CONSUMER_FROM_MAX_OFFSET(1, "Always start from the max consume position."),
public enum ConsumePosition {
CONSUMER_FROM_MAX_OFFSET_ALWAYS(1, "Always start from the max consume position."),
CONSUMER_FROM_LATEST_OFFSET(0, "Start from the latest position for the first time. " +
"Otherwise start from last consume position."),
CONSUMER_FROM_FIRST_OFFSET(-1, "Start from 0 for the first time. Otherwise start from last consume position.");

private int code;
private String description;

ConsumeModel(int code, String description) {
ConsumePosition(int code, String description) {
this.code = code;
this.description = description;
}
Expand All @@ -37,4 +37,13 @@ public int getCode() {
public String getDescription() {
return description;
}

public static ConsumePosition valueOf(int code) {
for (ConsumePosition consumePosition : ConsumePosition.values()) {
if (consumePosition.getCode() == code) {
return consumePosition;
}
}
throw new IllegalArgumentException(String.format("unknown ConsumePosition code %s", code));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.tubemq.client.common.PeerInfo;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumeModel;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.MessageV2Listener;
import org.apache.tubemq.client.consumer.PushMessageConsumer;
import org.apache.tubemq.client.exception.TubeClientException;
Expand Down Expand Up @@ -68,7 +68,7 @@ public MessageConsumerExample(
int fetchCount
) throws Exception {
ConsumerConfig consumerConfig = new ConsumerConfig(localHost, masterHostAndPort, group);
consumerConfig.setConsumeModel(ConsumeModel.CONSUMER_FROM_LATEST_OFFSET);
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
if (fetchCount > 0) {
consumerConfig.setPushFetchThreadCnt(fetchCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.util.List;
import java.util.Map;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumeModel;
import org.apache.tubemq.client.consumer.ConsumeOffsetInfo;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.ConsumerResult;
import org.apache.tubemq.client.consumer.PullMessageConsumer;
import org.apache.tubemq.client.exception.TubeClientException;
Expand Down Expand Up @@ -55,7 +55,7 @@ public MessagePullConsumerExample(
String group
) throws Exception {
ConsumerConfig consumerConfig = new ConsumerConfig(localHost, masterHostAndPort, group);
consumerConfig.setConsumeModel(ConsumeModel.CONSUMER_FROM_LATEST_OFFSET);
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
}
Expand Down