Skip to content

Commit

Permalink
Remove the filter server module
Browse files Browse the repository at this point in the history
  • Loading branch information
RongtongJin committed May 15, 2023
1 parent c3ada73 commit c469a60
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
Expand Down Expand Up @@ -44,7 +45,6 @@
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
Expand Down Expand Up @@ -145,7 +145,6 @@ public class BrokerController {
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
Expand Down Expand Up @@ -198,7 +197,6 @@ public BrokerController(
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
this.filterServerManager = new FilterServerManager(this);

this.slaveSynchronize = new SlaveSynchronize(this);

Expand Down Expand Up @@ -802,10 +800,6 @@ public void shutdown() {

this.consumerOffsetManager.persist();

if (this.filterServerManager != null) {
this.filterServerManager.shutdown();
}

if (this.brokerFastFailure != null) {
this.brokerFastFailure.shutdown();
}
Expand Down Expand Up @@ -879,10 +873,6 @@ public void start() throws Exception {
this.clientHousekeepingService.start();
}

if (this.filterServerManager != null) {
this.filterServerManager.start();
}

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
Expand Down Expand Up @@ -963,7 +953,7 @@ private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
Lists.newArrayList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
Expand Down Expand Up @@ -1034,10 +1024,6 @@ public BlockingQueue<Runnable> getSendThreadPoolQueue() {
return sendThreadPoolQueue;
}

public FilterServerManager getFilterServerManager() {
return filterServerManager;
}

public BrokerStatsManager getBrokerStatsManager() {
return brokerStatsManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public void run() {
private void scanExceptionChannel() {
this.brokerController.getProducerManager().scanNotActiveChannel();
this.brokerController.getConsumerManager().scanNotActiveChannel();
this.brokerController.getFilterServerManager().scanNotActiveChannel();
}

public void shutdown() {
Expand All @@ -71,20 +70,17 @@ public void onChannelConnect(String remoteAddr, Channel channel) {
public void onChannelClose(String remoteAddr, Channel channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}

@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}

@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsSnapshot;
Expand Down Expand Up @@ -203,8 +201,6 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return this.getConsumerStatus(ctx, request);
case RequestCode.QUERY_TOPIC_CONSUME_BY_WHO:
return this.queryTopicConsumeByWho(ctx, request);
case RequestCode.REGISTER_FILTER_SERVER:
return this.registerFilterServer(ctx, request);
case RequestCode.QUERY_CONSUME_TIME_SPAN:
return this.queryConsumeTimeSpan(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER:
Expand Down Expand Up @@ -1092,23 +1088,6 @@ private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx,
return response;
}

private RemotingCommand registerFilterServer(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class);
final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader();
final RegisterFilterServerRequestHeader requestHeader =
(RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);

this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr());

responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId());
responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());

response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
Expand Down

0 comments on commit c469a60

Please sign in to comment.