Skip to content

Commit

Permalink
[ISSUE apache#5348] [RIP-48] Support server-side offset management in…
Browse files Browse the repository at this point in the history
… broadcast consumption mode (apache#5349)

* Support server-side offset management in broadcast consumption mode
* Fix unit test npe and and offset store test
* Fix fast encode decode test

Co-authored-by: 斜阳 <terrance.lzm@alibaba-inc.com>
  • Loading branch information
2 people authored and anurag-harness committed Nov 2, 2022
1 parent 48f8dc6 commit 8825f35
Show file tree
Hide file tree
Showing 11 changed files with 726 additions and 2 deletions.
Expand Up @@ -66,6 +66,7 @@
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.offset.BroadcastOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
Expand Down Expand Up @@ -168,6 +169,7 @@ public class BrokerController {
private final NettyClientConfig nettyClientConfig;
protected final MessageStoreConfig messageStoreConfig;
protected final ConsumerOffsetManager consumerOffsetManager;
protected final BroadcastOffsetManager broadcastOffsetManager;
protected final ConsumerManager consumerManager;
protected final ConsumerFilterManager consumerFilterManager;
protected final ConsumerOrderInfoManager consumerOrderInfoManager;
Expand Down Expand Up @@ -296,6 +298,7 @@ public BrokerController(
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
this.topicQueueMappingManager = new TopicQueueMappingManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
Expand Down Expand Up @@ -1170,6 +1173,10 @@ public ConsumerOffsetManager getConsumerOffsetManager() {
return consumerOffsetManager;
}

public BroadcastOffsetManager getBroadcastOffsetManager() {
return broadcastOffsetManager;
}

public MessageStoreConfig getMessageStoreConfig() {
return messageStoreConfig;
}
Expand Down Expand Up @@ -1277,6 +1284,10 @@ protected void shutdownBasicService() {
this.fileWatchService.shutdown();
}

if (this.broadcastOffsetManager != null) {
this.broadcastOffsetManager.shutdown();
}

if (this.messageStore != null) {
this.messageStore.shutdown();
}
Expand Down Expand Up @@ -1503,6 +1514,10 @@ protected void startBasicService() throws Exception {
this.brokerFastFailure.start();
}

if (this.broadcastOffsetManager != null) {
this.broadcastOffsetManager.start();
}

if (this.escapeBridge != null) {
this.escapeBridge.start();
}
Expand Down
@@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.offset;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.ServiceThread;

/**
* manage the offset of broadcast.
* now, use this to support switch remoting client between proxy and broker
*/
public class BroadcastOffsetManager extends ServiceThread {
private static final String TOPIC_GROUP_SEPARATOR = "@";
private final BrokerController brokerController;
private final BrokerConfig brokerConfig;

/**
* k: topic@groupId
* v: the pull offset of all client of all queue
*/
protected final ConcurrentHashMap<String /* topic@groupId */, BroadcastOffsetData> offsetStoreMap =
new ConcurrentHashMap<>();

public BroadcastOffsetManager(BrokerController brokerController) {
this.brokerController = brokerController;
this.brokerConfig = brokerController.getBrokerConfig();
}

public void updateOffset(String topic, String group, int queueId, long offset, String clientId, boolean fromProxy) {
BroadcastOffsetData broadcastOffsetData = offsetStoreMap.computeIfAbsent(
buildKey(topic, group), key -> new BroadcastOffsetData(topic, group));

broadcastOffsetData.clientOffsetStore.compute(clientId, (clientIdKey, broadcastTimedOffsetStore) -> {
if (broadcastTimedOffsetStore == null) {
broadcastTimedOffsetStore = new BroadcastTimedOffsetStore(fromProxy);
}

broadcastTimedOffsetStore.timestamp = System.currentTimeMillis();
broadcastTimedOffsetStore.fromProxy = fromProxy;
broadcastTimedOffsetStore.offsetStore.updateOffset(queueId, offset, true);
return broadcastTimedOffsetStore;
});
}

/**
* the time need init offset
* 1. client connect to proxy -> client connect to broker
* 2. client connect to broker -> client connect to proxy
* 3. client connect to proxy at the first time
*
* @return -1 means no init offset, use the queueOffset in pullRequestHeader
*/
public Long queryInitOffset(String topic, String groupId, int queueId, String clientId, long requestOffset,
boolean fromProxy) {

BroadcastOffsetData broadcastOffsetData = offsetStoreMap.get(buildKey(topic, groupId));
if (broadcastOffsetData == null) {
if (fromProxy && requestOffset < 0) {
return getOffset(null, topic, groupId, queueId);
} else {
return -1L;
}
}

final AtomicLong offset = new AtomicLong(-1L);
broadcastOffsetData.clientOffsetStore.compute(clientId, (clientIdK, offsetStore) -> {
if (offsetStore == null) {
offsetStore = new BroadcastTimedOffsetStore(fromProxy);
}

if (offsetStore.fromProxy && requestOffset < 0) {
// when from proxy and requestOffset is -1
// means proxy need a init offset to pull message
offset.set(getOffset(offsetStore, topic, groupId, queueId));
return offsetStore;
}

if (offsetStore.fromProxy == fromProxy) {
return offsetStore;
}

offset.set(getOffset(offsetStore, topic, groupId, queueId));
return offsetStore;
});
return offset.get();
}

private long getOffset(BroadcastTimedOffsetStore offsetStore, String topic, String groupId, int queueId) {
long storeOffset = -1;
if (offsetStore != null) {
storeOffset = offsetStore.offsetStore.readOffset(queueId);
}
if (storeOffset < 0) {
storeOffset =
brokerController.getConsumerOffsetManager().queryOffset(broadcastGroupId(groupId), topic, queueId);
}
if (storeOffset < 0) {
if (!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(topic, queueId, 0)) {
storeOffset = 0;
} else {
storeOffset = brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId, true);
}
}
return storeOffset;
}

/**
* 1. scan expire offset
* 2. calculate the min offset of all client of one topic@group,
* and then commit consumer offset by group@broadcast
*/
protected void scanOffsetData() {
for (String k : offsetStoreMap.keySet()) {
BroadcastOffsetData broadcastOffsetData = offsetStoreMap.get(k);
if (broadcastOffsetData == null) {
continue;
}

Map<Integer, Long> queueMinOffset = new HashMap<>();

for (String clientId : broadcastOffsetData.clientOffsetStore.keySet()) {
broadcastOffsetData.clientOffsetStore
.computeIfPresent(clientId, (clientIdKey, broadcastTimedOffsetStore) -> {
long interval = System.currentTimeMillis() - broadcastTimedOffsetStore.timestamp;
boolean clientIsOnline = brokerController.getConsumerManager().findChannel(broadcastOffsetData.group, clientId) != null;
if (clientIsOnline || interval < Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireSecond()).toMillis()) {
Set<Integer> queueSet = broadcastTimedOffsetStore.offsetStore.queueList();
for (Integer queue : queueSet) {
long offset = broadcastTimedOffsetStore.offsetStore.readOffset(queue);
offset = Math.min(queueMinOffset.getOrDefault(queue, offset), offset);
queueMinOffset.put(queue, offset);
}
}
if (clientIsOnline && interval >= Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireMaxSecond()).toMillis()) {
return null;
}
if (!clientIsOnline && interval >= Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireSecond()).toMillis()) {
return null;
}
return broadcastTimedOffsetStore;
});
}

offsetStoreMap.computeIfPresent(k, (key, broadcastOffsetDataVal) -> {
if (broadcastOffsetDataVal.clientOffsetStore.isEmpty()) {
return null;
}
return broadcastOffsetDataVal;
});

queueMinOffset.forEach((queueId, offset) ->
this.brokerController.getConsumerOffsetManager().commitOffset("BroadcastOffset",
broadcastGroupId(broadcastOffsetData.group), broadcastOffsetData.topic, queueId, offset));
}
}

private String buildKey(String topic, String group) {
return topic + TOPIC_GROUP_SEPARATOR + group;
}

/**
* @param group group of users
* @return the groupId used to commit offset
*/
private static String broadcastGroupId(String group) {
return group + TOPIC_GROUP_SEPARATOR + "broadcast";
}

@Override
public String getServiceName() {
return "BroadcastOffsetManager";
}

@Override
public void run() {
while (!this.isStopped()) {
this.waitForRunning(Duration.ofSeconds(5).toMillis());
}
}

@Override
protected void onWaitEnd() {
this.scanOffsetData();
}

public static class BroadcastOffsetData {
private final String topic;
private final String group;
private final ConcurrentHashMap<String /* clientId */, BroadcastTimedOffsetStore> clientOffsetStore;

public BroadcastOffsetData(String topic, String group) {
this.topic = topic;
this.group = group;
this.clientOffsetStore = new ConcurrentHashMap<>();
}
}

public static class BroadcastTimedOffsetStore {

/**
* the timeStamp of last update occurred
*/
private volatile long timestamp;

/**
* mark the offset of this client is updated by proxy or not
*/
private volatile boolean fromProxy;

/**
* the pulled offset of each queue
*/
private final BroadcastOffsetStore offsetStore;

public BroadcastTimedOffsetStore(boolean fromProxy) {
this.timestamp = System.currentTimeMillis();
this.fromProxy = fromProxy;
this.offsetStore = new BroadcastOffsetStore();
}
}
}
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.offset;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.MixAll;

public class BroadcastOffsetStore {

private final ConcurrentMap<Integer, AtomicLong> offsetTable = new ConcurrentHashMap<>();

public void updateOffset(int queueId, long offset, boolean increaseOnly) {
AtomicLong offsetOld = this.offsetTable.get(queueId);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(queueId, new AtomicLong(offset));
}

if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}

public long readOffset(int queueId) {
AtomicLong offset = this.offsetTable.get(queueId);
if (offset != null) {
return offset.get();
}
return -1L;
}

public Set<Integer> queueList() {
return offsetTable.keySet();
}
}
Expand Up @@ -71,6 +71,10 @@ public RemotingCommand handle(final GetMessageResult getMessageResult,
final MessageFilter messageFilter,
RemotingCommand response) {

PullMessageProcessor processor = brokerController.getPullMessageProcessor();
processor.updateBroadcastPulledOffset(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
requestHeader.getQueueId(), requestHeader, channel, response, getMessageResult.getNextBeginOffset());

final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();

switch (response.getCode()) {
Expand Down

0 comments on commit 8825f35

Please sign in to comment.