From 2176261a4eabca4c24154e80423960986b6341d1 Mon Sep 17 00:00:00 2001 From: James Yin Date: Tue, 25 Dec 2018 16:35:52 +0800 Subject: [PATCH 1/4] fixed typo --- include/DefaultMQPushConsumer.h | 2 +- src/consumer/DefaultMQPushConsumer.cpp | 22 +++++++++++----------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h index 9a3948457..4051c772b 100755 --- a/include/DefaultMQPushConsumer.h +++ b/include/DefaultMQPushConsumer.h @@ -138,7 +138,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer { OffsetStore* m_pOffsetStore; Rebalance* m_pRebalance; PullAPIWrapper* m_pPullAPIWrapper; - ConsumeMsgService* m_consumerServeice; + ConsumeMsgService* m_consumerService; MQMessageListener* m_pMessageListener; int m_consumeMessageBatchMaxSize; int m_maxMsgCacheSize; diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp index afa44f0fe..70c9b8888 100644 --- a/src/consumer/DefaultMQPushConsumer.cpp +++ b/src/consumer/DefaultMQPushConsumer.cpp @@ -230,7 +230,7 @@ DefaultMQPushConsumer::~DefaultMQPushConsumer() { deleteAndZero(m_pRebalance); deleteAndZero(m_pOffsetStore); deleteAndZero(m_pPullAPIWrapper); - deleteAndZero(m_consumerServeice); + deleteAndZero(m_consumerService); PullMAP::iterator it = m_PullCallback.begin(); for (; it != m_PullCallback.end(); ++it) { deleteAndZero(it->second); @@ -308,14 +308,14 @@ void DefaultMQPushConsumer::start() { if (m_pMessageListener->getMessageListenerType() == messageListenerOrderly) { LOG_INFO("start orderly consume service:%s", getGroupName().c_str()); - m_consumerServeice = new ConsumeMessageOrderlyService( + m_consumerService = new ConsumeMessageOrderlyService( this, m_consumeThreadCount, m_pMessageListener); } else // for backward compatible, defaultly and concurrently listeners // are allocating ConsumeMessageConcurrentlyService { LOG_INFO("start concurrently consume service:%s", getGroupName().c_str()); - m_consumerServeice = new ConsumeMessageConcurrentlyService( + m_consumerService = new ConsumeMessageConcurrentlyService( this, m_consumeThreadCount, m_pMessageListener); } } @@ -354,7 +354,7 @@ void DefaultMQPushConsumer::start() { bStartFailed = true; errorMsg = std::string(e.what()); } - m_consumerServeice->start(); + m_consumerService->start(); getFactory()->start(); @@ -389,7 +389,7 @@ void DefaultMQPushConsumer::shutdown() { m_pullmsgQueue->close(); m_pullmsgThread->interrupt(); m_pullmsgThread->join(); - m_consumerServeice->shutdown(); + m_consumerService->shutdown(); persistConsumerOffset(); shutdownAsyncPullCallBack(); // delete aync pullMsg resources getFactory()->unregisterConsumer(this); @@ -420,7 +420,7 @@ MessageListenerType DefaultMQPushConsumer::getMessageListenerType() { } ConsumeMsgService* DefaultMQPushConsumer::getConsumerMsgService() const { - return m_consumerServeice; + return m_consumerService; } OffsetStore* DefaultMQPushConsumer::getOffsetStore() const { @@ -572,7 +572,7 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* request) { } MQMessageQueue& messageQueue = request->m_messageQueue; - if (m_consumerServeice->getConsumeMsgSerivceListenerType() == + if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) { if (!request->isLocked() || request->isLockExpired()) { if (!m_pRebalance->lock(messageQueue)) { @@ -646,8 +646,8 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* request) { request->setNextOffset(pullResult.nextBeginOffset); request->putMessage(pullResult.msgFoundList); - m_consumerServeice->submitConsumeRequest(request, - pullResult.msgFoundList); + m_consumerService->submitConsumeRequest(request, + pullResult.msgFoundList); producePullMsgTask(request); LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld", @@ -760,7 +760,7 @@ void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) { } MQMessageQueue& messageQueue = request->m_messageQueue; - if (m_consumerServeice->getConsumeMsgSerivceListenerType() == + if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) { if (!request->isLocked() || request->isLockExpired()) { if (!m_pRebalance->lock(messageQueue)) { @@ -888,7 +888,7 @@ int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const { ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() { ConsumerRunningInfo* info = new ConsumerRunningInfo(); if (info) { - if (m_consumerServeice->getConsumeMsgSerivceListenerType() == + if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true"); else From 7825e0b693a2692c73645c6b15fea074b29ba61c Mon Sep 17 00:00:00 2001 From: James Yin Date: Tue, 25 Dec 2018 16:36:09 +0800 Subject: [PATCH 2/4] fixed message level in CMakeLists.txt --- CMakeLists.txt | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 547132d64..3a21ab4fc 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - + cmake_minimum_required(VERSION 2.8) if (APPLE) @@ -52,11 +52,11 @@ if(WIN32) find_package(Boost 1.56 REQUIRED COMPONENTS atomic thread system chrono date_time log log_setup regex serialization filesystem locale iostreams zlib) if(Boost_FOUND) - message(status "** Boost Include dir: ${Boost_INCLUDE_DIR}") - message(status "** Boost Libraries dir: ${Boost_LIBRARY_DIRS}") - message(status "** Boost Libraries: ${Boost_LIBRARIES}") + message(STATUS "** Boost Include dir: ${Boost_INCLUDE_DIR}") + message(STATUS "** Boost Libraries dir: ${Boost_LIBRARY_DIRS}") + message(STATUS "** Boost Libraries: ${Boost_LIBRARIES}") include_directories(${Boost_INCLUDE_DIRS}) - endif() + endif() else() #find_package(Boost 1.56 REQUIRED COMPONENTS atomic thread system chrono date_time log log_setup regex serialization filesystem locale iostreams) set(Boost_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/bin/include) @@ -68,16 +68,16 @@ else() include_directories(${Boost_INCLUDE_DIRS}) endif() -message(status "** Boost_INCLUDE_DIR: ${Boost_INCLUDE_DIR}") -message(status "** Boost_LIBRARIES: ${Boost_LIBRARIES}") +message(STATUS "** Boost_INCLUDE_DIR: ${Boost_INCLUDE_DIR}") +message(STATUS "** Boost_LIBRARIES: ${Boost_LIBRARIES}") option(Libevent_USE_STATIC_LIBS "only find libevent static libs" ON) # only find static libs if(WIN32) find_package(Libevent 2.0.22 REQUIRED COMPONENTS) if(LIBEVENT_FOUND) include_directories(${LIBEVENT_INCLUDE_DIRS}) - message(status "** libevent Include dir: ${LIBEVENT_INCLUDE_DIR}") - message(status "** libevent Libraries: ${LIBEVENT_LIBRARIES}") + message(STATUS "** libevent Include dir: ${LIBEVENT_INCLUDE_DIR}") + message(STATUS "** libevent Libraries: ${LIBEVENT_LIBRARIES}") endif() else() #find_package(Libevent 2.0.22 REQUIRED COMPONENTS) @@ -87,8 +87,8 @@ else() include_directories(${LIBEVENT_INCLUDE_DIRS}) endif() -message(status "** LIBEVENT_INCLUDE_DIR: ${LIBEVENT_INCLUDE_DIR}") -message(status "** LIBEVENT_LIBRARIES: ${LIBEVENT_LIBRARIES}") +message(STATUS "** LIBEVENT_INCLUDE_DIR: ${LIBEVENT_INCLUDE_DIR}") +message(STATUS "** LIBEVENT_LIBRARIES: ${LIBEVENT_LIBRARIES}") option(JSONCPP_USE_STATIC_LIBS "only find jsoncpp static libs" ON) # only find static libs if(WIN32) @@ -103,8 +103,8 @@ else() include_directories(${JSONCPP_INCLUDE_DIRS}) endif() -message(status "** JSONCPP_INCLUDE_DIRS: ${JSONCPP_INCLUDE_DIRS}") -message(status "** JSONCPP_LIBRARIES: ${JSONCPP_LIBRARIES}") +message(STATUS "** JSONCPP_INCLUDE_DIRS: ${JSONCPP_INCLUDE_DIRS}") +message(STATUS "** JSONCPP_LIBRARIES: ${JSONCPP_LIBRARIES}") # put binaries in a different dir to make them easier to find. set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin) @@ -170,7 +170,7 @@ endif() string(REPLACE ";" " " CMAKE_CXX_FLAGS "${CXX_FLAGS}") string(REPLACE ";" " " CMAKE_C_FLAGS "${C_FLAGS}") - + set(CMAKE_CXX_FLAGS_DEBUG "-O0 -DDEBUG") set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG") @@ -223,6 +223,7 @@ add_subdirectory(example) option(RUN_UNIT_TEST "RUN_UNIT_TEST" OFF) if(RUN_UNIT_TEST) - message(status "** RUN_UNIT_TEST: ${RUN_UNIT_TEST} Do execution testing") + message(STATUS "** RUN_UNIT_TEST: ${RUN_UNIT_TEST} Do execution testing") add_subdirectory(test) -endif() \ No newline at end of file +endif() + From 5e2e1cc7c2a8237a38a8d181cc783821e28268dc Mon Sep 17 00:00:00 2001 From: James Yin Date: Tue, 25 Dec 2018 16:36:19 +0800 Subject: [PATCH 3/4] fixed SetProducerInstanceName --- src/extern/CProducer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp index 2c3c9c914..0715942ac 100644 --- a/src/extern/CProducer.cpp +++ b/src/extern/CProducer.cpp @@ -175,7 +175,7 @@ int SetProducerInstanceName(CProducer *producer, const char *instanceName) { if (producer == NULL) { return NULL_POINTER; } - ((DefaultMQProducer *) producer)->setGroupName(instanceName); + ((DefaultMQProducer *) producer)->setInstanceName(instanceName); return OK; } int SetProducerSessionCredentials(CProducer *producer, const char *accessKey, const char *secretKey, From 4779893de7c17945f099127610cc86717083b3c5 Mon Sep 17 00:00:00 2001 From: James Yin Date: Tue, 25 Dec 2018 16:36:30 +0800 Subject: [PATCH 4/4] fixed repeat compress when retry send --- include/MQMessage.h | 11 ++++++++ include/MQMessageExt.h | 4 --- src/message/MQDecoder.cpp | 2 +- src/message/MQMessage.cpp | 43 ++++++++++++++++++++++++++---- src/message/MQMessageExt.cpp | 6 ----- src/producer/DefaultMQProducer.cpp | 20 ++++++-------- 6 files changed, 58 insertions(+), 28 deletions(-) diff --git a/include/MQMessage.h b/include/MQMessage.h index 9c14e541b..99ab97fc0 100755 --- a/include/MQMessage.h +++ b/include/MQMessage.h @@ -63,6 +63,9 @@ class ROCKETMQCLIENT_API MQMessage { int getFlag() const; void setFlag(int flag); + int getSysFlag() const; + void setSysFlag(int sysFlag); + std::string getBody() const; void setBody(const char* body, int len); void setBody(const std::string& body); @@ -78,6 +81,10 @@ class ROCKETMQCLIENT_API MQMessage { } protected: + friend class MQDecoder; + void setPropertyInternal(const std::string& name, const std::string& value); + void setPropertiesInternal(std::map& properties); + void Init(const std::string& topic, const std::string& tags, const std::string& keys, const int flag, const std::string& body, bool waitStoreMsgOK); @@ -110,6 +117,10 @@ class ROCKETMQCLIENT_API MQMessage { static const std::string PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS; static const std::string KEY_SEPARATOR; + + protected: + int m_sysFlag; + private: std::string m_topic; int m_flag; diff --git a/include/MQMessageExt.h b/include/MQMessageExt.h index d2119c710..fb48daeff 100755 --- a/include/MQMessageExt.h +++ b/include/MQMessageExt.h @@ -64,9 +64,6 @@ class ROCKETMQCLIENT_API MQMessageExt : public MQMessage { const std::string& getOffsetMsgId() const; void setOffsetMsgId(const std::string& offsetMsgId); - int getSysFlag() const; - void setSysFlag(int sysFlag); - int getBodyCRC() const; void setBodyCRC(int bodyCRC); @@ -108,7 +105,6 @@ class ROCKETMQCLIENT_API MQMessageExt : public MQMessage { int64 m_preparedTransactionOffset; int m_queueId; int m_storeSize; - int m_sysFlag; int m_bodyCRC; int m_reconsumeTimes; sockaddr m_bornHost; diff --git a/src/message/MQDecoder.cpp b/src/message/MQDecoder.cpp index 489cd805c..d4f0dd4dd 100755 --- a/src/message/MQDecoder.cpp +++ b/src/message/MQDecoder.cpp @@ -191,7 +191,7 @@ MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) { map propertiesMap; string2messageProperties(propertiesString, propertiesMap); - msgExt->setProperties(propertiesMap); + msgExt->setPropertiesInternal(propertiesMap); propertiesMap.clear(); } diff --git a/src/message/MQMessage.cpp b/src/message/MQMessage.cpp index 5f3812e35..75fa5821b 100755 --- a/src/message/MQMessage.cpp +++ b/src/message/MQMessage.cpp @@ -15,6 +15,7 @@ * limitations under the License. */ #include "MQMessage.h" +#include "MessageSysFlag.h" #include "UtilAll.h" namespace rocketmq { @@ -76,6 +77,7 @@ MQMessage::MQMessage(const MQMessage& other) { m_body = other.m_body; m_topic = other.m_topic; m_flag = other.m_flag; + m_sysFlag = other.m_sysFlag; m_properties = other.m_properties; } @@ -84,12 +86,24 @@ MQMessage& MQMessage::operator=(const MQMessage& other) { m_body = other.m_body; m_topic = other.m_topic; m_flag = other.m_flag; + m_sysFlag = other.m_sysFlag; m_properties = other.m_properties; } return *this; } void MQMessage::setProperty(const string& name, const string& value) { + if (PROPERTY_TRANSACTION_PREPARED == name) { + if (!value.empty() && value == "true") { + m_sysFlag |= MessageSysFlag::TransactionPreparedType; + } else { + m_sysFlag &= ~MessageSysFlag::TransactionPreparedType; + } + } + m_properties[name] = value; +} + +void MQMessage::setPropertyInternal(const string& name, const string& value) { m_properties[name] = value; } @@ -111,13 +125,13 @@ void MQMessage::setTopic(const char* body, int len) { string MQMessage::getTags() const { return getProperty(PROPERTY_TAGS); } void MQMessage::setTags(const string& tags) { - setProperty(PROPERTY_TAGS, tags); + setPropertyInternal(PROPERTY_TAGS, tags); } string MQMessage::getKeys() const { return getProperty(PROPERTY_KEYS); } void MQMessage::setKeys(const string& keys) { - setProperty(PROPERTY_KEYS, keys); + setPropertyInternal(PROPERTY_KEYS, keys); } void MQMessage::setKeys(const vector& keys) { @@ -150,7 +164,7 @@ void MQMessage::setDelayTimeLevel(int level) { char tmp[16]; sprintf(tmp, "%d", level); - setProperty(PROPERTY_DELAY_TIME_LEVEL, tmp); + setPropertyInternal(PROPERTY_DELAY_TIME_LEVEL, tmp); } bool MQMessage::isWaitStoreMsgOK() { @@ -164,9 +178,9 @@ bool MQMessage::isWaitStoreMsgOK() { void MQMessage::setWaitStoreMsgOK(bool waitStoreMsgOK) { if (waitStoreMsgOK) { - setProperty(PROPERTY_WAIT_STORE_MSG_OK, "true"); + setPropertyInternal(PROPERTY_WAIT_STORE_MSG_OK, "true"); } else { - setProperty(PROPERTY_WAIT_STORE_MSG_OK, "false"); + setPropertyInternal(PROPERTY_WAIT_STORE_MSG_OK, "false"); } } @@ -174,6 +188,10 @@ int MQMessage::getFlag() const { return m_flag; } void MQMessage::setFlag(int flag) { m_flag = flag; } +int MQMessage::getSysFlag() const { return m_sysFlag; } + +void MQMessage::setSysFlag(int sysFlag) { m_sysFlag = sysFlag; } + string MQMessage::getBody() const { return m_body; } void MQMessage::setBody(const char* body, int len) { @@ -190,6 +208,20 @@ map MQMessage::getProperties() const { return m_properties; } void MQMessage::setProperties(map& properties) { m_properties = properties; + + map::const_iterator it = m_properties.find(PROPERTY_TRANSACTION_PREPARED); + if (it != m_properties.end()) { + string tranMsg = it->second; + if (!tranMsg.empty() && tranMsg == "true") { + m_sysFlag |= MessageSysFlag::TransactionPreparedType; + } else { + m_sysFlag &= ~MessageSysFlag::TransactionPreparedType; + } + } +} + +void MQMessage::setPropertiesInternal(map& properties) { + m_properties = properties; } void MQMessage::Init(const string& topic, const string& tags, @@ -197,6 +229,7 @@ void MQMessage::Init(const string& topic, const string& tags, bool waitStoreMsgOK) { m_topic = topic; m_flag = flag; + m_sysFlag = 0; m_body = body; if (tags.length() > 0) { diff --git a/src/message/MQMessageExt.cpp b/src/message/MQMessageExt.cpp index 3f2eae83a..43f4bf05f 100755 --- a/src/message/MQMessageExt.cpp +++ b/src/message/MQMessageExt.cpp @@ -29,7 +29,6 @@ MQMessageExt::MQMessageExt() m_preparedTransactionOffset(0), m_queueId(0), m_storeSize(0), - m_sysFlag(0), m_bodyCRC(0), m_reconsumeTimes(3), m_msgId("") {} @@ -44,7 +43,6 @@ MQMessageExt::MQMessageExt(int queueId, int64 bornTimestamp, sockaddr bornHost, m_preparedTransactionOffset(0), m_queueId(queueId), m_storeSize(0), - m_sysFlag(0), m_bodyCRC(0), m_reconsumeTimes(3), m_bornHost(bornHost), @@ -101,10 +99,6 @@ const string& MQMessageExt::getOffsetMsgId() const { return m_offsetMsgId; } void MQMessageExt::setOffsetMsgId(const string& offsetMsgId) { m_offsetMsgId = offsetMsgId; } -int MQMessageExt::getSysFlag() const { return m_sysFlag; } - -void MQMessageExt::setSysFlag(int sysFlag) { m_sysFlag = sysFlag; } - int MQMessageExt::getBodyCRC() const { return m_bodyCRC; } void MQMessageExt::setBodyCRC(int bodyCRC) { m_bodyCRC = bodyCRC; } diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp index 6811590f2..4c39bd24d 100755 --- a/src/producer/DefaultMQProducer.cpp +++ b/src/producer/DefaultMQProducer.cpp @@ -26,7 +26,6 @@ #include "MQClientManager.h" #include "MQDecoder.h" #include "MQProtos.h" -#include "MessageSysFlag.h" #include "TopicPublishInfo.h" #include "Validators.h" #include "StringIdMaker.h" @@ -339,17 +338,8 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg, msg.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id); LOG_DEBUG("produce before:%s to %s", msg.toString().c_str(), mq.toString().c_str()); - - int sysFlag = 0; - if (tryToCompressMessage(msg)) { - sysFlag |= MessageSysFlag::CompressedFlag; - } - string tranMsg = - msg.getProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED); - if (!tranMsg.empty() && tranMsg == "true") { - sysFlag |= MessageSysFlag::TransactionPreparedType; - } + tryToCompressMessage(msg); SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader(); requestHeader->producerGroup = getGroupName(); @@ -357,7 +347,7 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg, requestHeader->defaultTopic = DEFAULT_TOPIC; requestHeader->defaultTopicQueueNums = 4; requestHeader->queueId = (mq.getQueueId()); - requestHeader->sysFlag = (sysFlag); + requestHeader->sysFlag = (msg.getSysFlag()); requestHeader->bornTimestamp = UtilAll::currentTimeMillis(); requestHeader->flag = (msg.getFlag()); requestHeader->properties = @@ -471,11 +461,17 @@ SendResult DefaultMQProducer::sendAutoRetrySelectImpl( } bool DefaultMQProducer::tryToCompressMessage(MQMessage& msg) { + int sysFlag = msg.getSysFlag(); + if ((sysFlag & MessageSysFlag::CompressedFlag) == MessageSysFlag::CompressedFlag) { + return true; + } + string body = msg.getBody(); if ((int)body.length() >= getCompressMsgBodyOverHowmuch()) { string outBody; if (UtilAll::deflate(body, outBody, getCompressLevel())) { msg.setBody(outBody); + msg.setSysFlag(sysFlag | MessageSysFlag::CompressedFlag); return true; } }