Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ROCKETMQ-6] Use logger for exceptions instead of e.printStackTrace(). #3

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public boolean initialize() throws CloneNotSupportedException {
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
} catch (IOException e) {
result = false;
e.printStackTrace();
log.error("Failed to initialize", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import org.slf4j.LoggerFactory;

public class PullMessageProcessor implements NettyRequestProcessor {
private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private List<ConsumeMessageHook> consumeMessageHookList;

Expand All @@ -87,9 +87,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re

response.setOpaque(request.getOpaque());

if (LOG.isDebugEnabled()) {
LOG.debug("receive PullMessage request command, {}", request);
}
log.debug("receive PullMessage request command, {}", request);

if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
Expand Down Expand Up @@ -119,7 +117,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
LOG.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
Expand All @@ -132,9 +130,9 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
}

if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illagal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
LOG.warn(errorInfo);
String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illegal,Topic :" + requestHeader.getTopic()
+ " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
Expand All @@ -146,7 +144,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getSubscription());
} catch (Exception e) {
LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
log.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
Expand All @@ -156,7 +154,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo) {
LOG.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
Expand All @@ -171,14 +169,14 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re

subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
LOG.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}

if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
log.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
Expand Down Expand Up @@ -239,12 +237,12 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
response.setCode(ResponseCode.PULL_OFFSET_MOVED);

// XXX: warn and notify me
LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
requestHeader.getQueueOffset(), //
getMessageResult.getNextBeginOffset(), //
requestHeader.getTopic(), //
requestHeader.getQueueId(), //
requestHeader.getConsumerGroup()//
log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
requestHeader.getQueueOffset(), //
getMessageResult.getNextBeginOffset(), //
requestHeader.getTopic(), //
requestHeader.getQueueId(), //
requestHeader.getConsumerGroup()//
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
Expand All @@ -259,16 +257,17 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
case OFFSET_OVERFLOW_BADLY:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
LOG.info("The request offset:{} over flow badly, broker max offset:{} , consumer: {}", requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
LOG.info("The request offset is too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
assert false;
Expand Down Expand Up @@ -343,12 +342,12 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
LOG.error("Error occurred when transferring messages from page cache", e);
log.error("transfer many message by pagecache exception", e);
getMessageResult.release();
}

Expand Down Expand Up @@ -389,16 +388,16 @@ public void operationComplete(ChannelFuture future) throws Exception {
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(getMessageResult.getNextBeginOffset());
this.generateOffsetMovedEvent(event);
LOG.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
log.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
}

break;
Expand Down Expand Up @@ -477,7 +476,7 @@ private void generateOffsetMovedEvent(final OffsetMovedEvent event) {

PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
} catch (Exception e) {
LOG.warn(String.format("GenerateOffsetMovedEvent Exception, %s", event.toString()), e);
log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e);
}
}

Expand All @@ -496,20 +495,21 @@ public void run() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
LOG.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
LOG.error(request.toString());
LOG.error(response.toString());
log.error("processRequestWrapper response to {} failed",
future.channel().remoteAddress(), future.cause());
log.error(request.toString());
log.error(response.toString());
}
}
});
} catch (Throwable e) {
LOG.error("ProcessRequestWrapper process request over, but response failed", e);
LOG.error(request.toString());
LOG.error(response.toString());
log.error("processRequestWrapper process request over, but response failed", e);
log.error(request.toString());
log.error(response.toString());
}
}
} catch (RemotingCommandException e1) {
LOG.error("ExecuteRequestWhenWakeup run", e1);
log.error("excuteRequestWhenWakeup run", e1);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,7 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

if (log.isDebugEnabled()) {
log.debug("receive SendMessage request command, {}", request);
}
log.debug("receive SendMessage request command, {}", request);

final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
Expand Down
Loading