From 4d81a4215d3d545ce6402f4c9ac6fcf4e1ea858f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=82=85=E5=86=B2?= Date: Wed, 21 Mar 2018 16:05:18 +0800 Subject: [PATCH] #issues-246# delete the unused feature that monitor pull invalid offset --- .../processor/PullMessageProcessor.java | 31 ------------------ .../broker/topic/TopicConfigManager.java | 9 ------ .../org/apache/rocketmq/common/MixAll.java | 1 - .../tools/monitor/MonitorService.java | 32 ------------------- 4 files changed, 73 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 10c0112feb4..606aa3b09a4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -35,7 +35,6 @@ import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.filter.ExpressionType; @@ -54,15 +53,12 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.GetMessageResult; -import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageFilter; -import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -436,7 +432,6 @@ public void operationComplete(ChannelFuture future) throws Exception { event.setMessageQueue(mq); event.setOffsetRequest(requestHeader.getQueueOffset()); event.setOffsetNew(getMessageResult.getNextBeginOffset()); - this.generateOffsetMovedEvent(event); log.warn( "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), @@ -504,32 +499,6 @@ private byte[] readGetMessageResult(final GetMessageResult getMessageResult, fin return byteBuffer.array(); } - private void generateOffsetMovedEvent(final OffsetMovedEvent event) { - try { - MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT); - msgInner.setTags(event.getConsumerGroup()); - msgInner.setDelayTimeLevel(0); - msgInner.setKeys(event.getConsumerGroup()); - msgInner.setBody(event.encode()); - msgInner.setFlag(0); - msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG, msgInner.getTags())); - - msgInner.setQueueId(0); - msgInner.setSysFlag(0); - msgInner.setBornTimestamp(System.currentTimeMillis()); - msgInner.setBornHost(RemotingUtil.string2SocketAddress(this.brokerController.getBrokerAddr())); - msgInner.setStoreHost(msgInner.getBornHost()); - - msgInner.setReconsumeTimes(0); - - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); - } catch (Exception e) { - log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e); - } - } - public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { Runnable run = new Runnable() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index cdae66f2aba..5ffe77a0ab8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -115,15 +115,6 @@ public TopicConfigManager(BrokerController brokerController) { topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } - { - // MixAll.OFFSET_MOVED_EVENT - String topic = MixAll.OFFSET_MOVED_EVENT; - TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); - topicConfig.setReadQueueNums(1); - topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); - } } public boolean isSystemTopic(final String topic) { diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 49085c3f1e1..7ce3c34aaf2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -68,7 +68,6 @@ public class MixAll { public static final String SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP"; public static final String SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP"; public static final String SELF_TEST_TOPIC = "SELF_TEST_TOPIC"; - public static final String OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT"; public static final String ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY"; public static final String CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION"; public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER"; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java index 9bf09ad4107..61040a7e78f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java @@ -19,7 +19,6 @@ import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map.Entry; import java.util.Random; import java.util.TreeMap; @@ -29,9 +28,6 @@ import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.PullResult; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; @@ -41,13 +37,11 @@ import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; @@ -80,32 +74,6 @@ public MonitorService(MonitorConfig monitorConfig, MonitorListener monitorListen this.defaultMQPushConsumer.setInstanceName(instanceName()); this.defaultMQPushConsumer.setNamesrvAddr(monitorConfig.getNamesrvAddr()); - try { - this.defaultMQPushConsumer.setConsumeThreadMin(1); - this.defaultMQPushConsumer.setConsumeThreadMax(1); - this.defaultMQPushConsumer.subscribe(MixAll.OFFSET_MOVED_EVENT, "*"); - this.defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { - - @Override - public ConsumeConcurrentlyStatus consumeMessage(List msgs, - ConsumeConcurrentlyContext context) { - try { - OffsetMovedEvent ome = - OffsetMovedEvent.decode(msgs.get(0).getBody(), OffsetMovedEvent.class); - - DeleteMsgsEvent deleteMsgsEvent = new DeleteMsgsEvent(); - deleteMsgsEvent.setOffsetMovedEvent(ome); - deleteMsgsEvent.setEventTimestamp(msgs.get(0).getStoreTimestamp()); - - MonitorService.this.monitorListener.reportDeleteMsgsEvent(deleteMsgsEvent); - } catch (Exception e) { - } - - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - }); - } catch (MQClientException e) { - } } public static void main(String[] args) throws MQClientException {