Skip to content

Commit

Permalink
Merge pull request #2 from apache/develop
Browse files Browse the repository at this point in the history
Update project
  • Loading branch information
caojiele committed Mar 8, 2019
2 parents b52738c + de34ce7 commit a8597ec
Show file tree
Hide file tree
Showing 139 changed files with 12,537 additions and 365 deletions.
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ devenv
*.versionsBackup
!NOTICE-BIN
!LICENSE-BIN
.DS_Store
localbin
nohup.out
.DS_Store
localbin
nohup.out
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ It offers a variety of features:
* Various message filter mechanics such as SQL and Tag
* Docker images for isolated testing and cloud isolated clusters
* Feature-rich administrative dashboard for configuration, metrics and monitoring
* Access control list
* Message trace


----------
Expand All @@ -46,3 +48,5 @@ We always welcome new contributions, whether for trivial cleanups, big new featu
----------
## License
[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation


Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public AccessResource parse(RemotingCommand request, String remoteAddr) {
} else {
accessResource.setWhiteRemoteAddress(remoteAddr);
}

if (request.getExtFields() == null) {
throw new AclException("request's extFields value is null");
}

accessResource.setRequestCode(request.getCode());
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedA

if (!ownedPermMap.containsKey(resource)) {
// Check the default perm
byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() :
needCheckedAccess.getDefaultTopicPerm();
byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() :
ownedAccess.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, ownedPerm)) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class PlainAccessValidatorTest {
@Before
public void init() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
plainAccessValidator = new PlainAccessValidator();
sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ");
Expand Down Expand Up @@ -115,6 +116,22 @@ public void validateSendMessageV2Test() {
plainAccessValidator.validate(accessResource);
}

@Test(expected = AclException.class)
public void validateForAdminCommandWithOutAclRPCHook() {
RemotingCommand consumerOffsetAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
plainAccessValidator.parse(consumerOffsetAdminRequest, "192.168.0.1:9876");

RemotingCommand subscriptionGroupAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
plainAccessValidator.parse(subscriptionGroupAdminRequest, "192.168.0.1:9876");

RemotingCommand delayOffsetAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
plainAccessValidator.parse(delayOffsetAdminRequest, "192.168.0.1:9876");

RemotingCommand allTopicConfigAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
plainAccessValidator.parse(allTopicConfigAdminRequest, "192.168.0.1:9876");

}

@Test
public void validatePullMessageTest() {
PullMessageRequestHeader pullMessageRequestHeader=new PullMessageRequestHeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ public void checkPerm() {

}
@Test(expected = AclException.class)
public void checkErrorPerm() {
public void checkErrorPermDefaultValueNotMatch() {

plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicF", Permission.SUB);
plainAccessResource.addResourceAndPerm("topicF", Permission.PUB);
plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource);
}
@Test(expected = AclException.class)
Expand Down
183 changes: 148 additions & 35 deletions broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -39,6 +40,7 @@
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
Expand Down Expand Up @@ -99,6 +101,7 @@
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

Expand Down Expand Up @@ -158,6 +161,7 @@ public class BrokerController {
private TransactionalMessageCheckService transactionalMessageCheckService;
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private Future<?> slaveSyncFuture;


public BrokerController(
Expand Down Expand Up @@ -234,6 +238,10 @@ public boolean initialize() throws CloneNotSupportedException {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
Expand Down Expand Up @@ -396,37 +404,26 @@ public void run() {
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}

if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
Expand Down Expand Up @@ -855,6 +852,13 @@ public void start() throws Exception {
this.filterServerManager.start();
}

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
}



this.registerBrokerAll(true, false, true);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
Expand All @@ -877,12 +881,7 @@ public void run() {
this.brokerFastFailure.start();
}

if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
if (this.transactionalMessageCheckService != null) {
log.info("Start transaction service!");
this.transactionalMessageCheckService.start();
}
}

}

public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
Expand Down Expand Up @@ -1101,4 +1100,118 @@ public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;

}



private void handleSlaveSynchronize(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}

public void changeToSlave(int brokerId) {
log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);

//change the role
brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);

//handle the scheduled service
try {
this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
}

//handle the transactional service
try {
this.shutdownProcessorByHa();
} catch (Throwable t) {
log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
}

//handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SLAVE);

try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {

}
log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}



public void changeToMaster(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
return;
}
log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());

//handle the slave synchronise
handleSlaveSynchronize(role);

//handle the scheduled service
try {
this.messageStore.handleScheduleMessageService(role);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
}

//handle the transactional service
try {
this.startProcessorByHa(BrokerRole.SYNC_MASTER);
} catch (Throwable t) {
log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
}

//if the operations above are totally successful, we change to master
brokerConfig.setBrokerId(0); //TO DO check
messageStoreConfig.setBrokerRole(role);

try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {

}
log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
}

private void startProcessorByHa(BrokerRole role) {
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.start();
}
}
}

private void shutdownProcessorByHa() {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(true);
}
}



}
Loading

0 comments on commit a8597ec

Please sign in to comment.