Skip to content

Commit

Permalink
[ISSUE #292] Add support of transactional message feature (#358)
Browse files Browse the repository at this point in the history
  • Loading branch information
duhengforever authored and vongosling committed Jul 15, 2018
1 parent 3ccd6f9 commit b054a9d
Show file tree
Hide file tree
Showing 50 changed files with 2,937 additions and 398 deletions.
104 changes: 85 additions & 19 deletions broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,6 @@
*/
package org.apache.rocketmq.broker;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
Expand Down Expand Up @@ -61,6 +46,13 @@
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
Expand All @@ -69,12 +61,12 @@
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode;
Expand All @@ -93,6 +85,22 @@
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BrokerController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
Expand Down Expand Up @@ -142,6 +150,9 @@ public class BrokerController {
private BrokerFastFailure brokerFastFailure;
private Configuration configuration;
private FileWatchService fileWatchService;
private TransactionalMessageCheckService transactionalMessageCheckService;
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;

public BrokerController(
final BrokerConfig brokerConfig,
Expand Down Expand Up @@ -405,6 +416,7 @@ public void run() {
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;

@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
Expand All @@ -423,6 +435,7 @@ public void onChanged(String path) {
reloadServerSslContext();
}
}

private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
((NettyRemotingServer) fastRemotingServer).loadSslContext();
Expand All @@ -432,11 +445,26 @@ private void reloadServerSslContext() {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
initialTransaction();
}

return result;
}

private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}

public void registerProcessor() {
/**
* SendMessageProcessor
Expand Down Expand Up @@ -539,8 +567,9 @@ public long headSlowTimeMills(BlockingQueue<Runnable> q) {
slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
}

if (slowTimeMills < 0)
if (slowTimeMills < 0) {
slowTimeMills = 0;
}

return slowTimeMills;
}
Expand Down Expand Up @@ -700,6 +729,10 @@ public void shutdown() {
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}

if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(false);
}
}

private void unregisterBrokerAll() {
Expand Down Expand Up @@ -768,6 +801,13 @@ public void run() {
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}

if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
if (this.transactionalMessageCheckService != null) {
log.info("Start transaction service!");
this.transactionalMessageCheckService.start();
}
}
}

public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
Expand Down Expand Up @@ -949,4 +989,30 @@ public void setStoreHost(InetSocketAddress storeHost) {
public Configuration getConfiguration() {
return this.configuration;
}

public TransactionalMessageCheckService getTransactionalMessageCheckService() {
return transactionalMessageCheckService;
}

public void setTransactionalMessageCheckService(
TransactionalMessageCheckService transactionalMessageCheckService) {
this.transactionalMessageCheckService = transactionalMessageCheckService;
}

public TransactionalMessageService getTransactionalMessageService() {
return transactionalMessageService;
}

public void setTransactionalMessageService(TransactionalMessageService transactionalMessageService) {
this.transactionalMessageService = transactionalMessageService;
}

public AbstractTransactionalMessageCheckListener getTransactionalMessageCheckListener() {
return transactionalMessageCheckListener;
}

public void setTransactionalMessageCheckListener(
AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
Expand All @@ -45,6 +40,12 @@
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;

public class BrokerStartup {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package org.apache.rocketmq.broker.client;

import io.netty.channel.Channel;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
Expand All @@ -34,10 +39,11 @@ public class ProducerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
private final Lock groupChannelLock = new ReentrantLock();
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();

private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
public ProducerManager() {
}

Expand Down Expand Up @@ -185,4 +191,33 @@ public void unregisterProducer(final String group, final ClientChannelInfo clien
log.error("", e);
}
}

public Channel getAvaliableChannel(String groupId) {
HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
List<Channel> channelList = new ArrayList<Channel>();
if (channelClientChannelInfoHashMap != null) {
for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
channelList.add(channel);
}
int size = channelList.size();
if (0 == size) {
log.warn("Channel list is empty. groupId={}", groupId);
return null;
}

int index = positiveAtomicCounter.incrementAndGet() % size;
Channel channel = channelList.get(index);
int count = 0;
boolean isOk = channel.isActive() && channel.isWritable();

This comment has been minimized.

Copy link
@walking98

walking98 Aug 23, 2018

move this check logic into the while segment?

while (isOk && count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
index = (++index) % size;
channel = channelList.get(index);
return channel;
}
} else {
log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
return null;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,15 @@
package org.apache.rocketmq.broker.client.net;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueForC;
import org.apache.rocketmq.common.protocol.RequestCode;
Expand All @@ -47,11 +37,19 @@
import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.SelectMappedBufferResult;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;

public class Broker2Client {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
Expand All @@ -62,34 +60,22 @@ public Broker2Client(BrokerController brokerController) {
}

public void checkProducerTransactionState(
final String group,
final Channel channel,
final CheckTransactionStateRequestHeader requestHeader,
final SelectMappedBufferResult selectMappedBufferResult) {
final MessageExt messageExt) throws Exception {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.markOnewayRPC();

request.setBody(MessageDecoder.encode(messageExt, false));
try {
FileRegion fileRegion =
new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
selectMappedBufferResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
selectMappedBufferResult.release();
if (!future.isSuccess()) {
log.error("invokeProducer failed,", future.cause());
}
}
});
} catch (Throwable e) {
log.error("invokeProducer exception", e);
selectMappedBufferResult.release();
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("Check transaction failed because invoke producer exception. group={}, msgId={}", group, messageExt.getMsgId(), e.getMessage());
}
}

public RemotingCommand callClient(final Channel channel,
final RemotingCommand request
final RemotingCommand request
) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
}
Expand Down Expand Up @@ -119,7 +105,7 @@ public RemotingCommand resetOffset(String topic, String group, long timeStamp, b
}

public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
boolean isC) {
boolean isC) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
Expand Down
Loading

0 comments on commit b054a9d

Please sign in to comment.