Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,18 @@ public class ProxyConfig implements ConfigFile {

private long invisibleTimeMillisWhenClear = 1000L;
private boolean enableProxyAutoRenew = true;
/**
* When enableProxyAutoRenew is true, this controls whether the proxy periodically renews
* (extends) the invisible time of receipt handles for gRPC (GrpcClientChannel) consumers
* via changeInvisibleTime calls to the broker.
* <p>
* When set to false, the proxy will NOT periodically renew handles for gRPC clients. Instead,
* it will use the consumer group's consumeTimeoutMinute as the initial invisible time during
* pop, and only save handles for client-disconnect cleanup (nack). This avoids the
* handle-mapping complexity and makes the client's original receipt handle remain valid
* even if the proxy crashes.
*/
private boolean enableGrpcChannelReceiptHandleRenew = true;
private int maxRenewRetryTimes = 3;
private int renewThreadPoolNums = 2;
private int renewMaxThreadPoolNums = 4;
Expand Down Expand Up @@ -1113,6 +1125,14 @@ public void setEnableProxyAutoRenew(boolean enableProxyAutoRenew) {
this.enableProxyAutoRenew = enableProxyAutoRenew;
}

public boolean isEnableGrpcChannelReceiptHandleRenew() {
return enableGrpcChannelReceiptHandleRenew;
}

public void setEnableGrpcChannelReceiptHandleRenew(boolean enableGrpcChannelReceiptHandleRenew) {
this.enableGrpcChannelReceiptHandleRenew = enableGrpcChannelReceiptHandleRenew;
}

public int getMaxRenewRetryTimes() {
return maxRenewRetryTimes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PopStatus;
Expand All @@ -50,6 +51,7 @@
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;

public class ReceiveMessageActivity extends AbstractMessagingActivity {
private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3";
Expand Down Expand Up @@ -107,7 +109,11 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
long actualInvisibleTime = Durations.toMillis(request.getInvisibleDuration());
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
actualInvisibleTime = proxyConfig.getDefaultInvisibleTimeMills();
if (proxyConfig.isEnableGrpcChannelReceiptHandleRenew()) {
actualInvisibleTime = proxyConfig.getDefaultInvisibleTimeMills();
} else {
actualInvisibleTime = getConsumeTimeoutInvisibleTime(ctx, group, proxyConfig);
}
} else {
validateInvisibleTime(actualInvisibleTime,
ConfigurationManager.getProxyConfig().getMinInvisibleTimeMillsForRecv());
Expand Down Expand Up @@ -229,6 +235,18 @@ protected ReceiveMessageResponseStreamWriter createWriter(ProxyContext ctx,
);
}

private long getConsumeTimeoutInvisibleTime(ProxyContext ctx, String group, ProxyConfig proxyConfig) {
try {
SubscriptionGroupConfig groupConfig = this.messagingProcessor.getSubscriptionGroupConfig(ctx, group);
if (groupConfig != null && groupConfig.getConsumeTimeoutMinute() > 0) {
return TimeUnit.MINUTES.toMillis(groupConfig.getConsumeTimeoutMinute());
}
} catch (Exception e) {
// fall through to default
}
return proxyConfig.getDefaultInvisibleTimeMills();
}

protected static class ReceiveMessageQueueSelector implements QueueSelector {

private final String brokerName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.rocketmq.proxy.common.RenewEvent;
import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
Expand Down Expand Up @@ -166,6 +167,20 @@ protected void scheduleRenewTask() {
}

ReceiptHandleGroup group = entry.getValue();

if (!proxyConfig.isEnableGrpcChannelReceiptHandleRenew()
&& key.getChannel() instanceof GrpcClientChannel) {
// When renew is disabled, only clean up expired handles to avoid memory leak
group.scan((msgID, handleStr, v) -> {
ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr());
if (handle.isExpired()) {
group.computeIfPresent(msgID, handleStr,
messageReceiptHandle -> CompletableFuture.completedFuture(null), 0);
}
});
continue;
}

group.scan((msgID, handleStr, v) -> {
long current = System.currentTimeMillis();
ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr());
Expand Down
Loading