Skip to content

Commit

Permalink
[ISSUE apache#6884] Resolve proxy sending mentality to broker and una…
Browse files Browse the repository at this point in the history
…ble to find ACL configuration related
  • Loading branch information
fengbaichao committed Jun 13, 2023
1 parent ad4cc95 commit 82eb709
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public ClusterServiceManager(RPCHook rpcHook) {
this.adminService = new DefaultAdminService(this.operationClientAPIFactory);

this.producerManager = new ProducerManager();
this.consumerManager = new ClusterConsumerManager(this.topicRouteService, this.adminService, this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(), proxyConfig.getChannelExpiredTimeout());
this.consumerManager = new ClusterConsumerManager(this.topicRouteService, this.adminService, this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(), proxyConfig.getChannelExpiredTimeout(), rpcHook);

this.transactionClientAPIFactory = new MQClientAPIFactory(
nameserverAccessConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.proxy.service.sysmessage.HeartbeatSyncer;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
Expand All @@ -36,9 +37,9 @@ public class ClusterConsumerManager extends ConsumerManager implements StartAndS
protected HeartbeatSyncer heartbeatSyncer;

public ClusterConsumerManager(TopicRouteService topicRouteService, AdminService adminService,
MQClientAPIFactory mqClientAPIFactory, ConsumerIdsChangeListener consumerIdsChangeListener, long channelExpiredTimeout) {
MQClientAPIFactory mqClientAPIFactory, ConsumerIdsChangeListener consumerIdsChangeListener, long channelExpiredTimeout, RPCHook rpcHook) {
super(consumerIdsChangeListener, channelExpiredTimeout);
this.heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, this, mqClientAPIFactory);
this.heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, this, mqClientAPIFactory, rpcHook);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, M
protected final TopicRouteService topicRouteService;
protected final AdminService adminService;
protected final MQClientAPIFactory mqClientAPIFactory;
protected final RPCHook rpcHook;
protected DefaultMQPushConsumer defaultMQPushConsumer;

public AbstractSystemMessageSyncer(TopicRouteService topicRouteService, AdminService adminService, MQClientAPIFactory mqClientAPIFactory) {
public AbstractSystemMessageSyncer(TopicRouteService topicRouteService, AdminService adminService, MQClientAPIFactory mqClientAPIFactory, RPCHook rpcHook) {
this.topicRouteService = topicRouteService;
this.adminService = adminService;
this.mqClientAPIFactory = mqClientAPIFactory;
this.rpcHook = rpcHook;
}

protected String getSystemMessageProducerId() {
Expand Down Expand Up @@ -84,8 +86,8 @@ protected int getBroadcastTopicQueueNum() {
return 1;
}

protected RPCHook getRpcHook() {
return null;
public RPCHook getRpcHook() {
return rpcHook;
}

protected void sendSystemMessage(Object data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
Expand All @@ -53,8 +54,8 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer {
protected String localProxyId;

public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService,
ConsumerManager consumerManager, MQClientAPIFactory mqClientAPIFactory) {
super(topicRouteService, adminService, mqClientAPIFactory);
ConsumerManager consumerManager, MQClientAPIFactory mqClientAPIFactory, RPCHook rpcHook) {
super(topicRouteService, adminService, mqClientAPIFactory, rpcHook);
this.consumerManager = consumerManager;
this.localProxyId = buildLocalProxyId();
this.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void testSyncGrpcV2Channel() throws Exception {
.build();
when(grpcClientSettingsManager.getRawClientSettings(eq(clientId))).thenReturn(settings);

HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory);
HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory, null);
heartbeatSyncer.onConsumerRegister(
consumerGroup,
clientChannelInfo,
Expand Down Expand Up @@ -240,7 +240,7 @@ public void testSyncRemotingChannel() throws Exception {
4
);

HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory);
HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory, null);
SendResult okSendResult = new SendResult();
okSendResult.setSendStatus(SendStatus.SEND_OK);
{
Expand Down

0 comments on commit 82eb709

Please sign in to comment.