diff --git a/CMakeLists.txt b/CMakeLists.txt index 547132d64..3a21ab4fc 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - + cmake_minimum_required(VERSION 2.8) if (APPLE) @@ -52,11 +52,11 @@ if(WIN32) find_package(Boost 1.56 REQUIRED COMPONENTS atomic thread system chrono date_time log log_setup regex serialization filesystem locale iostreams zlib) if(Boost_FOUND) - message(status "** Boost Include dir: ${Boost_INCLUDE_DIR}") - message(status "** Boost Libraries dir: ${Boost_LIBRARY_DIRS}") - message(status "** Boost Libraries: ${Boost_LIBRARIES}") + message(STATUS "** Boost Include dir: ${Boost_INCLUDE_DIR}") + message(STATUS "** Boost Libraries dir: ${Boost_LIBRARY_DIRS}") + message(STATUS "** Boost Libraries: ${Boost_LIBRARIES}") include_directories(${Boost_INCLUDE_DIRS}) - endif() + endif() else() #find_package(Boost 1.56 REQUIRED COMPONENTS atomic thread system chrono date_time log log_setup regex serialization filesystem locale iostreams) set(Boost_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/bin/include) @@ -68,16 +68,16 @@ else() include_directories(${Boost_INCLUDE_DIRS}) endif() -message(status "** Boost_INCLUDE_DIR: ${Boost_INCLUDE_DIR}") -message(status "** Boost_LIBRARIES: ${Boost_LIBRARIES}") +message(STATUS "** Boost_INCLUDE_DIR: ${Boost_INCLUDE_DIR}") +message(STATUS "** Boost_LIBRARIES: ${Boost_LIBRARIES}") option(Libevent_USE_STATIC_LIBS "only find libevent static libs" ON) # only find static libs if(WIN32) find_package(Libevent 2.0.22 REQUIRED COMPONENTS) if(LIBEVENT_FOUND) include_directories(${LIBEVENT_INCLUDE_DIRS}) - message(status "** libevent Include dir: ${LIBEVENT_INCLUDE_DIR}") - message(status "** libevent Libraries: ${LIBEVENT_LIBRARIES}") + message(STATUS "** libevent Include dir: ${LIBEVENT_INCLUDE_DIR}") + message(STATUS "** libevent Libraries: ${LIBEVENT_LIBRARIES}") endif() else() #find_package(Libevent 2.0.22 REQUIRED COMPONENTS) @@ -87,8 +87,8 @@ else() include_directories(${LIBEVENT_INCLUDE_DIRS}) endif() -message(status "** LIBEVENT_INCLUDE_DIR: ${LIBEVENT_INCLUDE_DIR}") -message(status "** LIBEVENT_LIBRARIES: ${LIBEVENT_LIBRARIES}") +message(STATUS "** LIBEVENT_INCLUDE_DIR: ${LIBEVENT_INCLUDE_DIR}") +message(STATUS "** LIBEVENT_LIBRARIES: ${LIBEVENT_LIBRARIES}") option(JSONCPP_USE_STATIC_LIBS "only find jsoncpp static libs" ON) # only find static libs if(WIN32) @@ -103,8 +103,8 @@ else() include_directories(${JSONCPP_INCLUDE_DIRS}) endif() -message(status "** JSONCPP_INCLUDE_DIRS: ${JSONCPP_INCLUDE_DIRS}") -message(status "** JSONCPP_LIBRARIES: ${JSONCPP_LIBRARIES}") +message(STATUS "** JSONCPP_INCLUDE_DIRS: ${JSONCPP_INCLUDE_DIRS}") +message(STATUS "** JSONCPP_LIBRARIES: ${JSONCPP_LIBRARIES}") # put binaries in a different dir to make them easier to find. set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin) @@ -170,7 +170,7 @@ endif() string(REPLACE ";" " " CMAKE_CXX_FLAGS "${CXX_FLAGS}") string(REPLACE ";" " " CMAKE_C_FLAGS "${C_FLAGS}") - + set(CMAKE_CXX_FLAGS_DEBUG "-O0 -DDEBUG") set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG") @@ -223,6 +223,7 @@ add_subdirectory(example) option(RUN_UNIT_TEST "RUN_UNIT_TEST" OFF) if(RUN_UNIT_TEST) - message(status "** RUN_UNIT_TEST: ${RUN_UNIT_TEST} Do execution testing") + message(STATUS "** RUN_UNIT_TEST: ${RUN_UNIT_TEST} Do execution testing") add_subdirectory(test) -endif() \ No newline at end of file +endif() + diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h index 9a3948457..4051c772b 100755 --- a/include/DefaultMQPushConsumer.h +++ b/include/DefaultMQPushConsumer.h @@ -138,7 +138,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer { OffsetStore* m_pOffsetStore; Rebalance* m_pRebalance; PullAPIWrapper* m_pPullAPIWrapper; - ConsumeMsgService* m_consumerServeice; + ConsumeMsgService* m_consumerService; MQMessageListener* m_pMessageListener; int m_consumeMessageBatchMaxSize; int m_maxMsgCacheSize; diff --git a/include/MQMessage.h b/include/MQMessage.h index cc65fd121..e03c6645b 100755 --- a/include/MQMessage.h +++ b/include/MQMessage.h @@ -1,121 +1,133 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __MESSAGE_H__ -#define __MESSAGE_H__ - -#include -#include -#include -#include -#include "RocketMQClient.h" - - -namespace rocketmq { -//& keys); - - int getDelayTimeLevel() const; - void setDelayTimeLevel(int level); - - bool isWaitStoreMsgOK(); - void setWaitStoreMsgOK(bool waitStoreMsgOK); - - int getFlag() const; - void setFlag(int flag); - - const std::string &getBody() const; - void setBody(const char* body, int len); - void setBody(const std::string& body); - - std::map getProperties() const; - void setProperties(std::map& properties); - - const std::string toString() const { - std::stringstream ss; - ss << "Message [topic=" << m_topic << ", flag=" << m_flag - << ", tag=" << getTags() << "]"; - return ss.str(); - } - - protected: - void Init(const std::string& topic, const std::string& tags, const std::string& keys, - const int flag, const std::string& body, bool waitStoreMsgOK); - - public: - static const std::string PROPERTY_KEYS; - static const std::string PROPERTY_TAGS; - static const std::string PROPERTY_WAIT_STORE_MSG_OK; - static const std::string PROPERTY_DELAY_TIME_LEVEL; - static const std::string PROPERTY_RETRY_TOPIC; - static const std::string PROPERTY_REAL_TOPIC; - static const std::string PROPERTY_REAL_QUEUE_ID; - static const std::string PROPERTY_TRANSACTION_PREPARED; - static const std::string PROPERTY_PRODUCER_GROUP; - static const std::string PROPERTY_MIN_OFFSET; - static const std::string PROPERTY_MAX_OFFSET; - - static const std::string PROPERTY_BUYER_ID; - static const std::string PROPERTY_ORIGIN_MESSAGE_ID; - static const std::string PROPERTY_TRANSFER_FLAG; - static const std::string PROPERTY_CORRECTION_FLAG; - static const std::string PROPERTY_MQ2_FLAG; - static const std::string PROPERTY_RECONSUME_TIME; - static const std::string PROPERTY_MSG_REGION; - static const std::string PROPERTY_TRACE_SWITCH; - static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX; - static const std::string PROPERTY_MAX_RECONSUME_TIMES; - static const std::string PROPERTY_CONSUME_START_TIMESTAMP; - static const std::string PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET; - static const std::string PROPERTY_TRANSACTION_CHECK_TIMES; - static const std::string PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS; - - static const std::string KEY_SEPARATOR; - private: - std::string m_topic; - int m_flag; - std::string m_body; - std::map m_properties; -}; -// +#include +#include +#include +#include "RocketMQClient.h" + + +namespace rocketmq { +//& keys); + + int getDelayTimeLevel() const; + void setDelayTimeLevel(int level); + + bool isWaitStoreMsgOK(); + void setWaitStoreMsgOK(bool waitStoreMsgOK); + + int getFlag() const; + void setFlag(int flag); + + int getSysFlag() const; + void setSysFlag(int sysFlag); + + const std::string &getBody() const; + + void setBody(const char* body, int len); + void setBody(const std::string& body); + + std::map getProperties() const; + void setProperties(std::map& properties); + + const std::string toString() const { + std::stringstream ss; + ss << "Message [topic=" << m_topic << ", flag=" << m_flag + << ", tag=" << getTags() << "]"; + return ss.str(); + } + + protected: + friend class MQDecoder; + void setPropertyInternal(const std::string& name, const std::string& value); + void setPropertiesInternal(std::map& properties); + + void Init(const std::string& topic, const std::string& tags, const std::string& keys, + const int flag, const std::string& body, bool waitStoreMsgOK); + + public: + static const std::string PROPERTY_KEYS; + static const std::string PROPERTY_TAGS; + static const std::string PROPERTY_WAIT_STORE_MSG_OK; + static const std::string PROPERTY_DELAY_TIME_LEVEL; + static const std::string PROPERTY_RETRY_TOPIC; + static const std::string PROPERTY_REAL_TOPIC; + static const std::string PROPERTY_REAL_QUEUE_ID; + static const std::string PROPERTY_TRANSACTION_PREPARED; + static const std::string PROPERTY_PRODUCER_GROUP; + static const std::string PROPERTY_MIN_OFFSET; + static const std::string PROPERTY_MAX_OFFSET; + + static const std::string PROPERTY_BUYER_ID; + static const std::string PROPERTY_ORIGIN_MESSAGE_ID; + static const std::string PROPERTY_TRANSFER_FLAG; + static const std::string PROPERTY_CORRECTION_FLAG; + static const std::string PROPERTY_MQ2_FLAG; + static const std::string PROPERTY_RECONSUME_TIME; + static const std::string PROPERTY_MSG_REGION; + static const std::string PROPERTY_TRACE_SWITCH; + static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX; + static const std::string PROPERTY_MAX_RECONSUME_TIMES; + static const std::string PROPERTY_CONSUME_START_TIMESTAMP; + static const std::string PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET; + static const std::string PROPERTY_TRANSACTION_CHECK_TIMES; + static const std::string PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS; + + static const std::string KEY_SEPARATOR; + + protected: + int m_sysFlag; + + private: + std::string m_topic; + int m_flag; + std::string m_body; + std::map m_properties; +}; +//second); @@ -308,14 +308,14 @@ void DefaultMQPushConsumer::start() { if (m_pMessageListener->getMessageListenerType() == messageListenerOrderly) { LOG_INFO("start orderly consume service:%s", getGroupName().c_str()); - m_consumerServeice = new ConsumeMessageOrderlyService( + m_consumerService = new ConsumeMessageOrderlyService( this, m_consumeThreadCount, m_pMessageListener); } else // for backward compatible, defaultly and concurrently listeners // are allocating ConsumeMessageConcurrentlyService { LOG_INFO("start concurrently consume service:%s", getGroupName().c_str()); - m_consumerServeice = new ConsumeMessageConcurrentlyService( + m_consumerService = new ConsumeMessageConcurrentlyService( this, m_consumeThreadCount, m_pMessageListener); } } @@ -354,7 +354,7 @@ void DefaultMQPushConsumer::start() { bStartFailed = true; errorMsg = std::string(e.what()); } - m_consumerServeice->start(); + m_consumerService->start(); getFactory()->start(); @@ -389,7 +389,7 @@ void DefaultMQPushConsumer::shutdown() { m_pullmsgQueue->close(); m_pullmsgThread->interrupt(); m_pullmsgThread->join(); - m_consumerServeice->shutdown(); + m_consumerService->shutdown(); persistConsumerOffset(); shutdownAsyncPullCallBack(); // delete aync pullMsg resources getFactory()->unregisterConsumer(this); @@ -420,7 +420,7 @@ MessageListenerType DefaultMQPushConsumer::getMessageListenerType() { } ConsumeMsgService* DefaultMQPushConsumer::getConsumerMsgService() const { - return m_consumerServeice; + return m_consumerService; } OffsetStore* DefaultMQPushConsumer::getOffsetStore() const { @@ -572,7 +572,7 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* request) { } MQMessageQueue& messageQueue = request->m_messageQueue; - if (m_consumerServeice->getConsumeMsgSerivceListenerType() == + if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) { if (!request->isLocked() || request->isLockExpired()) { if (!m_pRebalance->lock(messageQueue)) { @@ -646,8 +646,8 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* request) { request->setNextOffset(pullResult.nextBeginOffset); request->putMessage(pullResult.msgFoundList); - m_consumerServeice->submitConsumeRequest(request, - pullResult.msgFoundList); + m_consumerService->submitConsumeRequest(request, + pullResult.msgFoundList); producePullMsgTask(request); LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld", @@ -760,7 +760,7 @@ void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) { } MQMessageQueue& messageQueue = request->m_messageQueue; - if (m_consumerServeice->getConsumeMsgSerivceListenerType() == + if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) { if (!request->isLocked() || request->isLockExpired()) { if (!m_pRebalance->lock(messageQueue)) { @@ -888,7 +888,7 @@ int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const { ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() { ConsumerRunningInfo* info = new ConsumerRunningInfo(); if (info) { - if (m_consumerServeice->getConsumeMsgSerivceListenerType() == + if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true"); else diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp index 2c3c9c914..0715942ac 100644 --- a/src/extern/CProducer.cpp +++ b/src/extern/CProducer.cpp @@ -175,7 +175,7 @@ int SetProducerInstanceName(CProducer *producer, const char *instanceName) { if (producer == NULL) { return NULL_POINTER; } - ((DefaultMQProducer *) producer)->setGroupName(instanceName); + ((DefaultMQProducer *) producer)->setInstanceName(instanceName); return OK; } int SetProducerSessionCredentials(CProducer *producer, const char *accessKey, const char *secretKey, diff --git a/src/message/MQDecoder.cpp b/src/message/MQDecoder.cpp index 489cd805c..d4f0dd4dd 100755 --- a/src/message/MQDecoder.cpp +++ b/src/message/MQDecoder.cpp @@ -191,7 +191,7 @@ MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) { map propertiesMap; string2messageProperties(propertiesString, propertiesMap); - msgExt->setProperties(propertiesMap); + msgExt->setPropertiesInternal(propertiesMap); propertiesMap.clear(); } diff --git a/src/message/MQMessage.cpp b/src/message/MQMessage.cpp index c8bae4b53..7fb14a7b8 100755 --- a/src/message/MQMessage.cpp +++ b/src/message/MQMessage.cpp @@ -15,6 +15,7 @@ * limitations under the License. */ #include "MQMessage.h" +#include "MessageSysFlag.h" #include "UtilAll.h" namespace rocketmq { @@ -76,6 +77,7 @@ MQMessage::MQMessage(const MQMessage& other) { m_body = other.m_body; m_topic = other.m_topic; m_flag = other.m_flag; + m_sysFlag = other.m_sysFlag; m_properties = other.m_properties; } @@ -84,12 +86,24 @@ MQMessage& MQMessage::operator=(const MQMessage& other) { m_body = other.m_body; m_topic = other.m_topic; m_flag = other.m_flag; + m_sysFlag = other.m_sysFlag; m_properties = other.m_properties; } return *this; } void MQMessage::setProperty(const string& name, const string& value) { + if (PROPERTY_TRANSACTION_PREPARED == name) { + if (!value.empty() && value == "true") { + m_sysFlag |= MessageSysFlag::TransactionPreparedType; + } else { + m_sysFlag &= ~MessageSysFlag::TransactionPreparedType; + } + } + m_properties[name] = value; +} + +void MQMessage::setPropertyInternal(const string& name, const string& value) { m_properties[name] = value; } @@ -114,13 +128,13 @@ void MQMessage::setTopic(const char* body, int len) { const string& MQMessage::getTags() const { return getProperty(PROPERTY_TAGS); } void MQMessage::setTags(const string& tags) { - setProperty(PROPERTY_TAGS, tags); + setPropertyInternal(PROPERTY_TAGS, tags); } const string& MQMessage::getKeys() const { return getProperty(PROPERTY_KEYS); } void MQMessage::setKeys(const string& keys) { - setProperty(PROPERTY_KEYS, keys); + setPropertyInternal(PROPERTY_KEYS, keys); } void MQMessage::setKeys(const vector& keys) { @@ -153,7 +167,7 @@ void MQMessage::setDelayTimeLevel(int level) { char tmp[16]; sprintf(tmp, "%d", level); - setProperty(PROPERTY_DELAY_TIME_LEVEL, tmp); + setPropertyInternal(PROPERTY_DELAY_TIME_LEVEL, tmp); } bool MQMessage::isWaitStoreMsgOK() { @@ -167,9 +181,9 @@ bool MQMessage::isWaitStoreMsgOK() { void MQMessage::setWaitStoreMsgOK(bool waitStoreMsgOK) { if (waitStoreMsgOK) { - setProperty(PROPERTY_WAIT_STORE_MSG_OK, "true"); + setPropertyInternal(PROPERTY_WAIT_STORE_MSG_OK, "true"); } else { - setProperty(PROPERTY_WAIT_STORE_MSG_OK, "false"); + setPropertyInternal(PROPERTY_WAIT_STORE_MSG_OK, "false"); } } @@ -177,6 +191,10 @@ int MQMessage::getFlag() const { return m_flag; } void MQMessage::setFlag(int flag) { m_flag = flag; } +int MQMessage::getSysFlag() const { return m_sysFlag; } + +void MQMessage::setSysFlag(int sysFlag) { m_sysFlag = sysFlag; } + const string& MQMessage::getBody() const { return m_body; } void MQMessage::setBody(const char* body, int len) { @@ -193,6 +211,20 @@ map MQMessage::getProperties() const { return m_properties; } void MQMessage::setProperties(map& properties) { m_properties = properties; + + map::const_iterator it = m_properties.find(PROPERTY_TRANSACTION_PREPARED); + if (it != m_properties.end()) { + string tranMsg = it->second; + if (!tranMsg.empty() && tranMsg == "true") { + m_sysFlag |= MessageSysFlag::TransactionPreparedType; + } else { + m_sysFlag &= ~MessageSysFlag::TransactionPreparedType; + } + } +} + +void MQMessage::setPropertiesInternal(map& properties) { + m_properties = properties; } void MQMessage::Init(const string& topic, const string& tags, @@ -200,6 +232,7 @@ void MQMessage::Init(const string& topic, const string& tags, bool waitStoreMsgOK) { m_topic = topic; m_flag = flag; + m_sysFlag = 0; m_body = body; if (tags.length() > 0) { diff --git a/src/message/MQMessageExt.cpp b/src/message/MQMessageExt.cpp index 3f2eae83a..43f4bf05f 100755 --- a/src/message/MQMessageExt.cpp +++ b/src/message/MQMessageExt.cpp @@ -29,7 +29,6 @@ MQMessageExt::MQMessageExt() m_preparedTransactionOffset(0), m_queueId(0), m_storeSize(0), - m_sysFlag(0), m_bodyCRC(0), m_reconsumeTimes(3), m_msgId("") {} @@ -44,7 +43,6 @@ MQMessageExt::MQMessageExt(int queueId, int64 bornTimestamp, sockaddr bornHost, m_preparedTransactionOffset(0), m_queueId(queueId), m_storeSize(0), - m_sysFlag(0), m_bodyCRC(0), m_reconsumeTimes(3), m_bornHost(bornHost), @@ -101,10 +99,6 @@ const string& MQMessageExt::getOffsetMsgId() const { return m_offsetMsgId; } void MQMessageExt::setOffsetMsgId(const string& offsetMsgId) { m_offsetMsgId = offsetMsgId; } -int MQMessageExt::getSysFlag() const { return m_sysFlag; } - -void MQMessageExt::setSysFlag(int sysFlag) { m_sysFlag = sysFlag; } - int MQMessageExt::getBodyCRC() const { return m_bodyCRC; } void MQMessageExt::setBodyCRC(int bodyCRC) { m_bodyCRC = bodyCRC; } diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp index 6811590f2..4c39bd24d 100755 --- a/src/producer/DefaultMQProducer.cpp +++ b/src/producer/DefaultMQProducer.cpp @@ -26,7 +26,6 @@ #include "MQClientManager.h" #include "MQDecoder.h" #include "MQProtos.h" -#include "MessageSysFlag.h" #include "TopicPublishInfo.h" #include "Validators.h" #include "StringIdMaker.h" @@ -339,17 +338,8 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg, msg.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id); LOG_DEBUG("produce before:%s to %s", msg.toString().c_str(), mq.toString().c_str()); - - int sysFlag = 0; - if (tryToCompressMessage(msg)) { - sysFlag |= MessageSysFlag::CompressedFlag; - } - string tranMsg = - msg.getProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED); - if (!tranMsg.empty() && tranMsg == "true") { - sysFlag |= MessageSysFlag::TransactionPreparedType; - } + tryToCompressMessage(msg); SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader(); requestHeader->producerGroup = getGroupName(); @@ -357,7 +347,7 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg, requestHeader->defaultTopic = DEFAULT_TOPIC; requestHeader->defaultTopicQueueNums = 4; requestHeader->queueId = (mq.getQueueId()); - requestHeader->sysFlag = (sysFlag); + requestHeader->sysFlag = (msg.getSysFlag()); requestHeader->bornTimestamp = UtilAll::currentTimeMillis(); requestHeader->flag = (msg.getFlag()); requestHeader->properties = @@ -471,11 +461,17 @@ SendResult DefaultMQProducer::sendAutoRetrySelectImpl( } bool DefaultMQProducer::tryToCompressMessage(MQMessage& msg) { + int sysFlag = msg.getSysFlag(); + if ((sysFlag & MessageSysFlag::CompressedFlag) == MessageSysFlag::CompressedFlag) { + return true; + } + string body = msg.getBody(); if ((int)body.length() >= getCompressMsgBodyOverHowmuch()) { string outBody; if (UtilAll::deflate(body, outBody, getCompressLevel())) { msg.setBody(outBody); + msg.setSysFlag(sysFlag | MessageSysFlag::CompressedFlag); return true; } }