Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop #137

Merged
merged 85 commits into from
Aug 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
d545f86
[ROCKETMQ-111] Fix possible MQClientException when query message befo…
Feb 28, 2017
3940924
[ROCKETMQ-104] Make MQAdmin commands throw exceptions, closes apache/…
shroman Feb 28, 2017
b692a73
[ROCKETMQ-99] Add scripts for Windows, closes apache/incubator-rocket…
lizhanhui Feb 28, 2017
42a3931
Fix Windows script to handle return code properly
lizhanhui Mar 3, 2017
53b98d0
Trivial Changes: Specify maven-assembly-plugin version to 2.8 for rel…
lizhanhui Mar 6, 2017
e3f4251
[ROCKETMQ-119] Add ThreadUtils and shutdown PullMessageService properly
zhouxinyu Mar 6, 2017
c5d9fcb
Corrected spellings.
shroman Mar 9, 2017
a146646
[ROCKETMQ-75] Logging when RemotingCommand header decoding swallows e…
shroman Mar 10, 2017
d7decc8
[ROCKETMQ-139] Degrade the client related modules' JDK version to 1.6
zhouxinyu Mar 10, 2017
e05a445
Logging on exception when producing a response.
shroman Mar 14, 2017
6b7d206
[ROCKETMQ-143][HOTFIX] Update fastjson from 1.2.12 to 1.2.28
zhouxinyu Mar 15, 2017
087d989
Corrected string check for emptiness.
shroman Mar 17, 2017
11653ce
ROCKETMQ-80 Add batch feature closes apache/incubator-rocketmq#53
dongeforever Mar 17, 2017
0b39fca
[ROCKETMQ-145][HOTFIX] Resolve concureent issue in HAService and Grou…
zhouxinyu Mar 18, 2017
19f358c
Update community projects illustration
vongosling Mar 16, 2017
72e6def
[ROCKETMQ-148] Polish the contributing guide.
zhouxinyu Mar 19, 2017
3fe7535
[ROCKETMQ-143][HOTFIX] Update fastjson to 1.2.29
zhouxinyu Mar 23, 2017
15af63e
[ROCKETMQ-153][HOTFIX] Fetch name server address dynamically.
zhouxinyu Mar 24, 2017
01a0eb0
Fix possible NullPointerException when retry in send Async way
Jaskey Feb 15, 2017
d933eeb
fix typo in ClientConfig
vesense Mar 24, 2017
08e70ee
[ROCKETMQ-154] Added a newline after help info. closes #83
vesense Mar 28, 2017
203cb30
Trivial simplification of broker initialization by removing unnecessa…
shroman Mar 28, 2017
7e37799
Aggregate packaging specific files to a new sub-module: distribution
lizhanhui Mar 28, 2017
ab01386
[ROCKETMQ-76] Expose IntegrationTestBase to be used by other integrat…
shroman Mar 29, 2017
45a64fd
Include client IP per message queue of consumer progress command output
lizhanhui Mar 29, 2017
a8fa05e
[ROCKETMQ-138] Remove hard coded Aliyun authentication code, closes a…
Jaskey Apr 11, 2017
40a233a
Guard MQVesion methods.
lizhanhui Apr 12, 2017
7c7374e
Fix issue 165
lizhanhui Apr 6, 2017
7bcb3b3
BugFix: WS_DOMAIN_NAME, SUBGROUP default values override custom value…
lizhanhui Apr 12, 2017
c183e0d
[ROCKETMQ-172]log improvement for rocketmq client closes apache/incub…
Jaskey Apr 17, 2017
f508f13
[ROCKETMQ-107] Fix possible concurrency problem on ServiceState when …
Jaskey Apr 19, 2017
deb0820
[ROCKETMQ-176] Use new maven central badge with the newest release ve…
zhouxinyu Apr 20, 2017
42f78c2
[ROCKETMQ-168] Polish the BUILDING guide.
zhouxinyu Apr 20, 2017
58f1574
[ROCKETMQ-121]Support message filtering based on SQL92 closes apache/…
vsair Apr 21, 2017
6a9628b
[ROCKETMQ-179] Fix errors of IT test cases closes apache/incubator-ro…
vsair Apr 21, 2017
6609c86
Add javadoc to NettyRemotingAbstract class and several other trivial …
lizhanhui Apr 25, 2017
8feb88d
Removed unnecessary semicolon.
shroman May 4, 2017
d72addf
Removed author info and formatted.
shroman May 4, 2017
6898d96
Changed list creation at DynaCode() to a singleton.
shroman May 4, 2017
e9814ad
Add javadoc to message store.
lizhanhui May 7, 2017
f5a2ee0
[ROCKETMQ-187] Measure the code coverage for Integration Tests, and a…
dongeforever May 9, 2017
80aac13
[ROCKETMQ-191] Fix socket options
lizhanhui May 10, 2017
1630f27
This is to close issue ROCKETMQ-198: Add language code for Go and PHP
lizhanhui May 16, 2017
1d966b5
[ROCKETMQ-186] Implement the OpenMessaging specification 0.1.0-alpha …
zhouxinyu May 24, 2017
ceeef8e
[ROCKETMQ-206] Fix bug when non-1byte character exists in JSON config…
zhouxinyu May 25, 2017
4d12d11
[ROCKETMQ-161] Update runbroker.sh and runserver.sh to support user d…
dongeforever May 25, 2017
c0fe02e
[ROCKETMQ-39] avoid duplicated codes closes apache/incubator-rocketmq#34
wu-sheng May 25, 2017
42826c4
[ROCKETMQ-178] Fix -p -m options closes apache/incubator-rocketmq#93
lizhanhui May 26, 2017
8280388
[ROCKETMQ-175] Consumer may miss messages because of inconsistent sub…
vsair May 26, 2017
99ce40a
[ROCKETMQ-189] Misleading tip on consumeTimestamp and wrong consumeTi…
lindzh May 26, 2017
37fbb7b
[ROCKETMQ-194] log appender support closes apache/incubator-rocketmq#101
lindzh May 26, 2017
e5d01b4
Merge branch 'ROCKETMQ-206' into develop
dongeforever May 26, 2017
0adad6f
Add test case for LocalFileOffsetStore closes apache/incubator-rocket…
kination May 27, 2017
b1fcf1b
[ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever cl…
Jaskey May 27, 2017
8c8610f
[ROCKETMQ-188]RemotingExecption is not consistent between invoke asyn…
Jaskey May 27, 2017
c796140
[ROCKETMQ-200]-Cluster name is always missing when fetch ClusterInfo …
Jaskey May 27, 2017
aced0de
[ROCKETMQ-206] Catch the IOException when call the file2String method.
zhouxinyu May 27, 2017
adae162
[ROCKETMQ-67] Consistent Hash allocate strategy closes apache/incubat…
Jaskey May 27, 2017
04c8925
[ROCKETMQ-160]SendHeartBeat may not be logged in the expected period …
Jaskey May 27, 2017
e57f9ac
Merge remote-tracking branch 'wip/ROCKETMQ-206' into develop
dongeforever May 27, 2017
8d78175
Remove diamond operator for client module with JDK 1.6
dongeforever May 27, 2017
de4c948
[ROCKETMQ-88] Polish the developer list in pom.xml
dongeforever May 27, 2017
0fe9471
Polish github links
dongeforever May 27, 2017
3d1ec32
Fix error tests, producer should wait a while for consumer to be ready
dongeforever Jun 6, 2017
2c28baa
Include guava copyright announcement
dongeforever Jun 6, 2017
96cd2e4
[ROCKETMQ-208]incompatibility problem found in enviroment of JDK 1.7 …
Jaskey Jun 6, 2017
e068ec1
Remove develops from pom
dongeforever Jun 6, 2017
7374914
Release zip too
dongeforever Jun 6, 2017
703ac00
[ROCKETMQ-220] Add IT test for Filter By SQL 92, closes
dongeforever Jun 8, 2017
f45a1bc
[ROCKETMQ-219] Add batch example, closes apache/incubator-rocketmq#112
dongeforever Jun 8, 2017
c4a3e0c
[ROCKETMQ-218] Polish README.md, closes apache/incubator-rocketmq#113
zhoudiqiu Jun 8, 2017
f619e45
Fixed typo.
shroman Jun 15, 2017
738e6d5
Add license for OpenMessaging
dongeforever Jun 8, 2017
5cc6d70
[maven-release-plugin] prepare release rocketmq-all-4.1.0-incubating
dongeforever Jun 8, 2017
824cbc7
[maven-release-plugin] prepare for next development iteration
dongeforever Jun 8, 2017
34b2b81
Update release link in readme
dongeforever Jun 23, 2017
a9b66d5
Update current version to V4_2_0_SNAPSHOT
dongeforever Jun 27, 2017
323eb5c
[ROCKETMQ-234][HOTFIX] Fix Bug, broker will write response twice in b…
Jun 29, 2017
b70b680
[ROCKETMQ-236] Script to merge github pull request
zhouxinyu Jun 29, 2017
9ad9ad0
[ROCKETMQ-238] Catch Throwable to avoid error cancel some key schedul…
zhouxinyu Jul 5, 2017
246be9e
ROCKETMQ-6: Use logger for exceptions instead of e.printStackTrace().
shroman Jul 5, 2017
3672f70
[ROCKETMQ-243] Avoid picking the 1st element in the list of addresses…
shroman Jul 12, 2017
118bdec
[ROCKETMQ-244] Message#putUserProperty uses == for String comparison.
shroman Jul 14, 2017
33a99a4
Add it plugin for separating it from profile configuration, you can u…
vongosling Jul 29, 2017
9bb6eae
Merge remote-tracking branch 'origin/master' into develop
zhouxinyu Aug 1, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public boolean initialize() throws CloneNotSupportedException {
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
e.printStackTrace();
log.error("Failed to initialize", e);
}
}

Expand Down Expand Up @@ -296,7 +296,7 @@ public void run() {
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Exception e) {
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
Expand All @@ -307,7 +307,7 @@ public void run() {
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Exception e) {
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public void start() {
public void run() {
try {
ClientHousekeepingService.this.scanExceptionChannel();
} catch (Exception e) {
log.error("", e);
} catch (Throwable e) {
log.error("Error occurred when scan not active client channels.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import org.slf4j.LoggerFactory;

public class PullMessageProcessor implements NettyRequestProcessor {
private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private List<ConsumeMessageHook> consumeMessageHookList;

Expand All @@ -94,9 +94,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re

response.setOpaque(request.getOpaque());

if (LOG.isDebugEnabled()) {
LOG.debug("receive PullMessage request command, {}", request);
}
log.debug("receive PullMessage request command, {}", request);

if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
Expand Down Expand Up @@ -126,7 +124,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
LOG.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
Expand All @@ -141,7 +139,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
LOG.warn(errorInfo);
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
Expand All @@ -162,7 +160,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
assert consumerFilterData != null;
}
} catch (Exception e) {
LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
Expand All @@ -172,7 +170,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo) {
LOG.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
Expand All @@ -187,14 +185,14 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re

subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
LOG.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}

if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
Expand All @@ -209,7 +207,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
return response;
}
if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
LOG.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
response.setRemark("the consumer's consumer filter data not latest");
Expand Down Expand Up @@ -287,12 +285,12 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
response.setCode(ResponseCode.PULL_OFFSET_MOVED);

// XXX: warn and notify me
LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
requestHeader.getQueueOffset(), //
getMessageResult.getNextBeginOffset(), //
requestHeader.getTopic(), //
requestHeader.getQueueId(), //
requestHeader.getConsumerGroup()//
log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
requestHeader.getQueueOffset(), //
getMessageResult.getNextBeginOffset(), //
requestHeader.getTopic(), //
requestHeader.getQueueId(), //
requestHeader.getConsumerGroup()//
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
Expand All @@ -307,16 +305,17 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
case OFFSET_OVERFLOW_BADLY:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
LOG.info("The request offset:{} over flow badly, broker max offset:{} , consumer: {}", requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
LOG.info("The request offset is too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
assert false;
Expand Down Expand Up @@ -391,12 +390,12 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
LOG.error("Error occurred when transferring messages from page cache", e);
log.error("transfer many message by pagecache exception", e);
getMessageResult.release();
}

Expand Down Expand Up @@ -437,16 +436,16 @@ public void operationComplete(ChannelFuture future) throws Exception {
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(getMessageResult.getNextBeginOffset());
this.generateOffsetMovedEvent(event);
LOG.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
log.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
}

break;
Expand Down Expand Up @@ -525,7 +524,7 @@ private void generateOffsetMovedEvent(final OffsetMovedEvent event) {

PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
} catch (Exception e) {
LOG.warn(String.format("GenerateOffsetMovedEvent Exception, %s", event.toString()), e);
log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e);
}
}

Expand All @@ -544,20 +543,21 @@ public void run() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
LOG.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
LOG.error(request.toString());
LOG.error(response.toString());
log.error("processRequestWrapper response to {} failed",
future.channel().remoteAddress(), future.cause());
log.error(request.toString());
log.error(response.toString());
}
}
});
} catch (Throwable e) {
LOG.error("ProcessRequestWrapper process request over, but response failed", e);
LOG.error(request.toString());
LOG.error(response.toString());
log.error("processRequestWrapper process request over, but response failed", e);
log.error(request.toString());
log.error(response.toString());
}
}
} catch (RemotingCommandException e1) {
LOG.error("ExecuteRequestWhenWakeup run", e1);
log.error("excuteRequestWhenWakeup run", e1);
}
}
};
Expand Down