Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Unified Logger name.
  • Loading branch information
shroman committed Dec 23, 2016
1 parent bff5367 commit cba6a0f
Show file tree
Hide file tree
Showing 14 changed files with 93 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
* @author shijia.wxr
*/
public class PullMessageProcessor implements NettyRequestProcessor {
private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private List<ConsumeMessageHook> consumeMessageHookList;

Expand Down Expand Up @@ -89,8 +89,8 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re

response.setOpaque(request.getOpaque());

if (LOG.isDebugEnabled()) {
LOG.debug("receive PullMessage request command, " + request);
if (log.isDebugEnabled()) {
log.debug("receive PullMessage request command, " + request);
}


Expand Down Expand Up @@ -126,7 +126,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel));
log.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(
"topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
Expand All @@ -144,7 +144,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic()
+ " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
LOG.warn(errorInfo);
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
Expand All @@ -157,7 +157,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getSubscription());
} catch (Exception e) {
LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
log.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
Expand All @@ -167,7 +167,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo) {
LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
Expand All @@ -182,15 +182,15 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re

subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
LOG.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}


if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
log.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
Expand Down Expand Up @@ -252,7 +252,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
response.setCode(ResponseCode.PULL_OFFSET_MOVED);

// XXX: warn and notify me
LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
requestHeader.getQueueOffset(), //
getMessageResult.getNextBeginOffset(), //
requestHeader.getTopic(), //
Expand All @@ -272,15 +272,15 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
case OFFSET_OVERFLOW_BADLY:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: "
log.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: "
+ getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
Expand Down Expand Up @@ -356,12 +356,12 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
LOG.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause());
log.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
LOG.error("transfer many message by pagecache exception", e);
log.error("transfer many message by pagecache exception", e);
getMessageResult.release();
}

Expand Down Expand Up @@ -403,14 +403,14 @@ public void operationComplete(ChannelFuture future) throws Exception {
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(getMessageResult.getNextBeginOffset());
this.generateOffsetMovedEvent(event);
LOG.warn(
log.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
}
Expand Down Expand Up @@ -493,7 +493,7 @@ private void generateOffsetMovedEvent(final OffsetMovedEvent event) {

PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
} catch (Exception e) {
LOG.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e);
log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e);
}
}

Expand All @@ -512,21 +512,21 @@ public void run() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed",
log.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed",
future.cause());
LOG.error(request.toString());
LOG.error(response.toString());
log.error(request.toString());
log.error(response.toString());
}
}
});
} catch (Throwable e) {
LOG.error("processRequestWrapper process request over, but response failed", e);
LOG.error(request.toString());
LOG.error(response.toString());
log.error("processRequestWrapper process request over, but response failed", e);
log.error(request.toString());
log.error(response.toString());
}
}
} catch (RemotingCommandException e1) {
LOG.error("excuteRequestWhenWakeup run", e1);
log.error("excuteRequestWhenWakeup run", e1);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* @author shijia.wxr
*/
public class TopicConfigManager extends ConfigManager {
private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private transient final Lock lockTopicConfigTable = new ReentrantLock();

Expand All @@ -56,11 +56,9 @@ public class TopicConfigManager extends ConfigManager {
private final Set<String> systemTopicList = new HashSet<String>();
private transient BrokerController brokerController;


public TopicConfigManager() {
}


public TopicConfigManager(BrokerController brokerController) {
this.brokerController = brokerController;
{
Expand Down Expand Up @@ -193,17 +191,17 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri
topicConfig.setTopicSysFlag(topicSysFlag);
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
} else {
LOG.warn("create new topic failed, because the default topic[" + defaultTopic
log.warn("create new topic failed, because the default topic[" + defaultTopic
+ "] no perm, " + defaultTopicConfig.getPerm() + " producer: "
+ remoteAddress);
}
} else {
LOG.warn("create new topic failed, because the default topic[" + defaultTopic
log.warn("create new topic failed, because the default topic[" + defaultTopic
+ "] not exist." + " producer: " + remoteAddress);
}

if (topicConfig != null) {
LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig
log.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig
+ " producer: " + remoteAddress);

this.topicConfigTable.put(topic, topicConfig);
Expand All @@ -219,7 +217,7 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri
}
}
} catch (InterruptedException e) {
LOG.error("createTopicInSendMessageMethod exception", e);
log.error("createTopicInSendMessageMethod exception", e);
}

if (createNew) {
Expand Down Expand Up @@ -253,7 +251,7 @@ public TopicConfig createTopicInSendMessageBackMethod(
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);

LOG.info("create new topic {}", topicConfig);
log.info("create new topic {}", topicConfig);
this.topicConfigTable.put(topic, topicConfig);
createNew = true;
this.dataVersion.nextVersion();
Expand All @@ -263,7 +261,7 @@ public TopicConfig createTopicInSendMessageBackMethod(
}
}
} catch (InterruptedException e) {
LOG.error("createTopicInSendMessageBackMethod exception", e);
log.error("createTopicInSendMessageBackMethod exception", e);
}

if (createNew) {
Expand All @@ -284,7 +282,7 @@ public void updateTopicUnitFlag(final String topic, final boolean unit) {
topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));
}

LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
topicConfig.getTopicSysFlag());

this.topicConfigTable.put(topic, topicConfig);
Expand All @@ -304,7 +302,7 @@ public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub)
topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));
}

LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
topicConfig.getTopicSysFlag());

this.topicConfigTable.put(topic, topicConfig);
Expand All @@ -319,9 +317,9 @@ public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub)
public void updateTopicConfig(final TopicConfig topicConfig) {
TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
if (old != null) {
LOG.info("update topic config, old: " + old + " new: " + topicConfig);
log.info("update topic config, old: " + old + " new: " + topicConfig);
} else {
LOG.info("create new topic, " + topicConfig);
log.info("create new topic, " + topicConfig);
}

this.dataVersion.nextVersion();
Expand All @@ -340,7 +338,7 @@ public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
if (topicConfig != null && !topicConfig.isOrder()) {
topicConfig.setOrder(true);
isChange = true;
LOG.info("update order topic config, topic={}, order={}", topic, true);
log.info("update order topic config, topic={}, order={}", topic, true);
}
}

Expand All @@ -351,7 +349,7 @@ public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
if (topicConfig.isOrder()) {
topicConfig.setOrder(false);
isChange = true;
LOG.info("update order topic config, topic={}, order={}", topic, false);
log.info("update order topic config, topic={}, order={}", topic, false);
}
}
}
Expand All @@ -375,11 +373,11 @@ public boolean isOrderTopic(final String topic) {
public void deleteTopicConfig(final String topic) {
TopicConfig old = this.topicConfigTable.remove(topic);
if (old != null) {
LOG.info("delete topic config OK, topic: " + old);
log.info("delete topic config OK, topic: " + old);
this.dataVersion.nextVersion();
this.persist();
} else {
LOG.warn("delete topic config failed, topic: " + topic + " not exist");
log.warn("delete topic config failed, topic: " + topic + " not exist");
}
}

Expand Down Expand Up @@ -426,7 +424,7 @@ private void printLoadDataWhenFirstBoot(final TopicConfigSerializeWrapper tcs) {
Iterator<Entry<String, TopicConfig>> it = tcs.getTopicConfigTable().entrySet().iterator();
while (it.hasNext()) {
Entry<String, TopicConfig> next = it.next();
LOG.info("load exist local topic, {}", next.getValue().toString());
log.info("load exist local topic, {}", next.getValue().toString());
}
}

Expand Down
Loading

0 comments on commit cba6a0f

Please sign in to comment.