Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RIP-16]Support request/response pattern #1422

Merged
merged 29 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c6c699e
[RIP-16]impl rpc support
qqeasonchen Aug 27, 2019
46b2e12
[RIP-16]impl rpc support - code format
qqeasonchen Aug 27, 2019
1d3a508
[RIP-16]fix test case error
qqeasonchen Aug 28, 2019
7e221e5
add example of request-response model
qqeasonchen Sep 11, 2019
cc0a5e3
code format
qqeasonchen Sep 11, 2019
53a6955
[RIP-16]change the way creating reply message
qqeasonchen Sep 12, 2019
ec1f926
optimize request/response examples
qqeasonchen Sep 18, 2019
5e1732a
add unittest of request/response pattern
qqeasonchen Sep 18, 2019
4c4bf10
add unittest of request/response pattern
qqeasonchen Sep 21, 2019
28936fa
add reply interface to consumer
qqeasonchen Sep 23, 2019
8c372b4
optimize ReplyMessageProcessor
qqeasonchen Sep 23, 2019
58aa805
Merge branch 'rocketmq-unittest' of https://github.com/qqeasonchen/ro…
qqeasonchen Sep 24, 2019
c434ff4
Merge branch 'develop' of https://github.com/apache/rocketmq into roc…
qqeasonchen Sep 24, 2019
cc7543f
add reply interface to DefaultLitePullConsumer
qqeasonchen Sep 24, 2019
7772850
fix unit test err
qqeasonchen Sep 25, 2019
0c47fcf
add recommend client configs in rpc mode
qqeasonchen Sep 26, 2019
e7b9169
optimize unit test
qqeasonchen Sep 29, 2019
8353551
add unit test of reply
qqeasonchen Sep 30, 2019
18e44c2
remove reply interface in consumer
qqeasonchen Oct 12, 2019
a8c9fe6
add err code when create reply message exception
qqeasonchen Oct 12, 2019
ebc0ede
optimization exception declare of request
qqeasonchen Oct 16, 2019
9800afb
rename REQUEST_UNIQ_ID to CORRELATION_ID
qqeasonchen Oct 16, 2019
f16fd08
rename REPLY_TO to REPLY_TO_CLIENT
qqeasonchen Oct 16, 2019
4fa263f
code style format
qqeasonchen Oct 16, 2019
b6e4be8
fix unit test
qqeasonchen Oct 16, 2019
01827c8
remove unused code and fix typo
qqeasonchen Oct 21, 2019
119feee
use RemotingUtil to parse address
qqeasonchen Oct 24, 2019
a314e17
remove unused import
qqeasonchen Oct 24, 2019
c6cbab9
resolve Conflicts:
qqeasonchen Oct 24, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
Expand Down Expand Up @@ -132,6 +133,7 @@ public class BrokerController {
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> replyThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
Expand All @@ -147,6 +149,7 @@ public class BrokerController {
private TopicConfigManager topicConfigManager;
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
private ExecutorService replyMessageExecutor;
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
Expand All @@ -163,7 +166,7 @@ public class BrokerController {
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private Future<?> slaveSyncFuture;
private Map<Class,AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>();
private Map<Class, AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>();

public BrokerController(
final BrokerConfig brokerConfig,
Expand Down Expand Up @@ -194,6 +197,7 @@ public BrokerController(

this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
Expand Down Expand Up @@ -241,7 +245,7 @@ public boolean initialize() throws CloneNotSupportedException {
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
((DLedgerCommitLog) ((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
Expand Down Expand Up @@ -277,6 +281,14 @@ public boolean initialize() throws CloneNotSupportedException {
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));

this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));

this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
Expand Down Expand Up @@ -501,9 +513,9 @@ private void initialAcl() {
return;
}

for (AccessValidator accessValidator: accessValidators) {
for (AccessValidator accessValidator : accessValidators) {
final AccessValidator validator = accessValidator;
accessValidatorMap.put(validator.getClass(),validator);
accessValidatorMap.put(validator.getClass(), validator);
this.registerServerRPCHook(new RPCHook() {

@Override
Expand All @@ -519,14 +531,13 @@ public void doAfterResponse(String remoteAddr, RemotingCommand request, Remoting
}
}


private void initialRpcHooks() {

List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
if (rpcHooks == null || rpcHooks.isEmpty()) {
return;
}
for (RPCHook rpcHook: rpcHooks) {
for (RPCHook rpcHook : rpcHooks) {
this.registerServerRPCHook(rpcHook);
}
}
Expand All @@ -553,6 +564,17 @@ public void registerProcessor() {
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

/**
* ReplyMessageProcessor
*/
ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
replyMessageProcessor.registerSendMessageHook(sendMessageHookList);

this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);

/**
* QueryMessageProcessor
*/
Expand Down Expand Up @@ -763,6 +785,10 @@ public void shutdown() {
this.pullMessageExecutor.shutdown();
}

if (this.replyMessageExecutor != null) {
this.replyMessageExecutor.shutdown();
}

if (this.adminBrokerExecutor != null) {
this.adminBrokerExecutor.shutdown();
}
Expand Down Expand Up @@ -859,8 +885,6 @@ public void start() throws Exception {
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
}



this.registerBrokerAll(true, false, true);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
Expand All @@ -883,7 +907,6 @@ public void run() {
this.brokerFastFailure.start();
}


}

public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
Expand Down Expand Up @@ -1097,7 +1120,6 @@ public void setTransactionalMessageCheckListener(
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}


public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;

Expand All @@ -1118,8 +1140,7 @@ private void handleSlaveSynchronize(BrokerRole role) {
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
} catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
Expand Down Expand Up @@ -1165,8 +1186,6 @@ public void changeToSlave(int brokerId) {
log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}



public void changeToMaster(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
return;
Expand Down Expand Up @@ -1216,6 +1235,4 @@ private void shutdownProcessorByHa() {
}
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@
package org.apache.rocketmq.broker.client;

import io.netty.channel.Channel;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
Expand All @@ -43,7 +42,9 @@ public class ProducerManager {
private final Lock groupChannelLock = new ReentrantLock();
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();

public ProducerManager() {
}

Expand Down Expand Up @@ -82,6 +83,7 @@ public void scanNotActiveChannel() {
long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
clientChannelTable.remove(info.getClientId());
log.warn(
"SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
Expand Down Expand Up @@ -113,6 +115,7 @@ public void doChannelCloseEvent(final String remoteAddr, final Channel channel)
final ClientChannelInfo clientChannelInfo =
clientChannelInfoTable.remove(channel);
if (clientChannelInfo != null) {
clientChannelTable.remove(clientChannelInfo.getClientId());
log.info(
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
clientChannelInfo.toString(), remoteAddr, group);
Expand Down Expand Up @@ -146,6 +149,7 @@ public void registerProducer(final String group, final ClientChannelInfo clientC
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString());
}
Expand All @@ -171,6 +175,7 @@ public void unregisterProducer(final String group, final ClientChannelInfo clien
HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
clientChannelTable.remove(clientChannelInfo.getClientId());
if (old != null) {
log.info("unregister a producer[{}] from groupChannelTable {}", group,
clientChannelInfo.toString());
Expand Down Expand Up @@ -223,4 +228,8 @@ public Channel getAvaliableChannel(String groupId) {
}
return null;
}

public Channel findChannel(String clientId) {
return clientChannelTable.get(clientId);
}
}
Loading