Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cmake_minimum_required(VERSION 2.8)

if (APPLE)
Expand Down Expand Up @@ -52,11 +52,11 @@ if(WIN32)
find_package(Boost 1.56 REQUIRED COMPONENTS atomic thread system chrono date_time
log log_setup regex serialization filesystem locale iostreams zlib)
if(Boost_FOUND)
message(status "** Boost Include dir: ${Boost_INCLUDE_DIR}")
message(status "** Boost Libraries dir: ${Boost_LIBRARY_DIRS}")
message(status "** Boost Libraries: ${Boost_LIBRARIES}")
message(STATUS "** Boost Include dir: ${Boost_INCLUDE_DIR}")
message(STATUS "** Boost Libraries dir: ${Boost_LIBRARY_DIRS}")
message(STATUS "** Boost Libraries: ${Boost_LIBRARIES}")
include_directories(${Boost_INCLUDE_DIRS})
endif()
endif()
else()
#find_package(Boost 1.56 REQUIRED COMPONENTS atomic thread system chrono date_time log log_setup regex serialization filesystem locale iostreams)
set(Boost_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/bin/include)
Expand All @@ -68,16 +68,16 @@ else()
include_directories(${Boost_INCLUDE_DIRS})
endif()

message(status "** Boost_INCLUDE_DIR: ${Boost_INCLUDE_DIR}")
message(status "** Boost_LIBRARIES: ${Boost_LIBRARIES}")
message(STATUS "** Boost_INCLUDE_DIR: ${Boost_INCLUDE_DIR}")
message(STATUS "** Boost_LIBRARIES: ${Boost_LIBRARIES}")

option(Libevent_USE_STATIC_LIBS "only find libevent static libs" ON) # only find static libs
if(WIN32)
find_package(Libevent 2.0.22 REQUIRED COMPONENTS)
if(LIBEVENT_FOUND)
include_directories(${LIBEVENT_INCLUDE_DIRS})
message(status "** libevent Include dir: ${LIBEVENT_INCLUDE_DIR}")
message(status "** libevent Libraries: ${LIBEVENT_LIBRARIES}")
message(STATUS "** libevent Include dir: ${LIBEVENT_INCLUDE_DIR}")
message(STATUS "** libevent Libraries: ${LIBEVENT_LIBRARIES}")
endif()
else()
#find_package(Libevent 2.0.22 REQUIRED COMPONENTS)
Expand All @@ -87,8 +87,8 @@ else()
include_directories(${LIBEVENT_INCLUDE_DIRS})
endif()

message(status "** LIBEVENT_INCLUDE_DIR: ${LIBEVENT_INCLUDE_DIR}")
message(status "** LIBEVENT_LIBRARIES: ${LIBEVENT_LIBRARIES}")
message(STATUS "** LIBEVENT_INCLUDE_DIR: ${LIBEVENT_INCLUDE_DIR}")
message(STATUS "** LIBEVENT_LIBRARIES: ${LIBEVENT_LIBRARIES}")

option(JSONCPP_USE_STATIC_LIBS "only find jsoncpp static libs" ON) # only find static libs
if(WIN32)
Expand All @@ -103,8 +103,8 @@ else()
include_directories(${JSONCPP_INCLUDE_DIRS})
endif()

message(status "** JSONCPP_INCLUDE_DIRS: ${JSONCPP_INCLUDE_DIRS}")
message(status "** JSONCPP_LIBRARIES: ${JSONCPP_LIBRARIES}")
message(STATUS "** JSONCPP_INCLUDE_DIRS: ${JSONCPP_INCLUDE_DIRS}")
message(STATUS "** JSONCPP_LIBRARIES: ${JSONCPP_LIBRARIES}")

# put binaries in a different dir to make them easier to find.
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
Expand Down Expand Up @@ -170,7 +170,7 @@ endif()

string(REPLACE ";" " " CMAKE_CXX_FLAGS "${CXX_FLAGS}")
string(REPLACE ";" " " CMAKE_C_FLAGS "${C_FLAGS}")

set(CMAKE_CXX_FLAGS_DEBUG "-O0 -DDEBUG")
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG")

Expand Down Expand Up @@ -223,6 +223,7 @@ add_subdirectory(example)
option(RUN_UNIT_TEST "RUN_UNIT_TEST" OFF)

if(RUN_UNIT_TEST)
message(status "** RUN_UNIT_TEST: ${RUN_UNIT_TEST} Do execution testing")
message(STATUS "** RUN_UNIT_TEST: ${RUN_UNIT_TEST} Do execution testing")
add_subdirectory(test)
endif()
endif()

2 changes: 1 addition & 1 deletion include/DefaultMQPushConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
OffsetStore* m_pOffsetStore;
Rebalance* m_pRebalance;
PullAPIWrapper* m_pPullAPIWrapper;
ConsumeMsgService* m_consumerServeice;
ConsumeMsgService* m_consumerService;
MQMessageListener* m_pMessageListener;
int m_consumeMessageBatchMaxSize;
int m_maxMsgCacheSize;
Expand Down
254 changes: 133 additions & 121 deletions include/MQMessage.h
Original file line number Diff line number Diff line change
@@ -1,121 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __MESSAGE_H__
#define __MESSAGE_H__

#include <map>
#include <sstream>
#include <string>
#include <vector>
#include "RocketMQClient.h"


namespace rocketmq {
//<!***************************************************************************
class ROCKETMQCLIENT_API MQMessage {
public:
MQMessage();
MQMessage(const std::string& topic, const std::string& body);
MQMessage(const std::string& topic, const std::string& tags, const std::string& body);
MQMessage(const std::string& topic, const std::string& tags, const std::string& keys,
const std::string& body);
MQMessage(const std::string& topic, const std::string& tags, const std::string& keys,
const int flag, const std::string& body, bool waitStoreMsgOK);

virtual ~MQMessage();
MQMessage(const MQMessage& other);
MQMessage& operator=(const MQMessage& other);

void setProperty(const std::string& name, const std::string& value) ;
const std::string & getProperty(const std::string& name) const;

const std::string &getTopic() const;
void setTopic(const std::string& topic);
void setTopic(const char* body, int len);

const std::string &getTags() const;
void setTags(const std::string& tags);

const std::string &getKeys() const;
void setKeys(const std::string& keys);
void setKeys(const std::vector<std::string>& keys);

int getDelayTimeLevel() const;
void setDelayTimeLevel(int level);

bool isWaitStoreMsgOK();
void setWaitStoreMsgOK(bool waitStoreMsgOK);

int getFlag() const;
void setFlag(int flag);

const std::string &getBody() const;
void setBody(const char* body, int len);
void setBody(const std::string& body);

std::map<std::string, std::string> getProperties() const;
void setProperties(std::map<std::string, std::string>& properties);

const std::string toString() const {
std::stringstream ss;
ss << "Message [topic=" << m_topic << ", flag=" << m_flag
<< ", tag=" << getTags() << "]";
return ss.str();
}

protected:
void Init(const std::string& topic, const std::string& tags, const std::string& keys,
const int flag, const std::string& body, bool waitStoreMsgOK);

public:
static const std::string PROPERTY_KEYS;
static const std::string PROPERTY_TAGS;
static const std::string PROPERTY_WAIT_STORE_MSG_OK;
static const std::string PROPERTY_DELAY_TIME_LEVEL;
static const std::string PROPERTY_RETRY_TOPIC;
static const std::string PROPERTY_REAL_TOPIC;
static const std::string PROPERTY_REAL_QUEUE_ID;
static const std::string PROPERTY_TRANSACTION_PREPARED;
static const std::string PROPERTY_PRODUCER_GROUP;
static const std::string PROPERTY_MIN_OFFSET;
static const std::string PROPERTY_MAX_OFFSET;

static const std::string PROPERTY_BUYER_ID;
static const std::string PROPERTY_ORIGIN_MESSAGE_ID;
static const std::string PROPERTY_TRANSFER_FLAG;
static const std::string PROPERTY_CORRECTION_FLAG;
static const std::string PROPERTY_MQ2_FLAG;
static const std::string PROPERTY_RECONSUME_TIME;
static const std::string PROPERTY_MSG_REGION;
static const std::string PROPERTY_TRACE_SWITCH;
static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX;
static const std::string PROPERTY_MAX_RECONSUME_TIMES;
static const std::string PROPERTY_CONSUME_START_TIMESTAMP;
static const std::string PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET;
static const std::string PROPERTY_TRANSACTION_CHECK_TIMES;
static const std::string PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS;

static const std::string KEY_SEPARATOR;
private:
std::string m_topic;
int m_flag;
std::string m_body;
std::map<std::string, std::string> m_properties;
};
//<!***************************************************************************
} //<!end namespace;
#endif
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __MESSAGE_H__
#define __MESSAGE_H__

#include <map>
#include <sstream>
#include <string>
#include <vector>
#include "RocketMQClient.h"


namespace rocketmq {
//<!***************************************************************************
class ROCKETMQCLIENT_API MQMessage {
public:
MQMessage();
MQMessage(const std::string& topic, const std::string& body);
MQMessage(const std::string& topic, const std::string& tags, const std::string& body);
MQMessage(const std::string& topic, const std::string& tags, const std::string& keys,
const std::string& body);
MQMessage(const std::string& topic, const std::string& tags, const std::string& keys,
const int flag, const std::string& body, bool waitStoreMsgOK);

virtual ~MQMessage();
MQMessage(const MQMessage& other);
MQMessage& operator=(const MQMessage& other);

void setProperty(const std::string& name, const std::string& value) ;
const std::string & getProperty(const std::string& name) const;

const std::string &getTopic() const;
void setTopic(const std::string& topic);
void setTopic(const char* body, int len);

const std::string &getTags() const;
void setTags(const std::string& tags);

const std::string &getKeys() const;
void setKeys(const std::string& keys);
void setKeys(const std::vector<std::string>& keys);

int getDelayTimeLevel() const;
void setDelayTimeLevel(int level);

bool isWaitStoreMsgOK();
void setWaitStoreMsgOK(bool waitStoreMsgOK);

int getFlag() const;
void setFlag(int flag);

int getSysFlag() const;
void setSysFlag(int sysFlag);

const std::string &getBody() const;

void setBody(const char* body, int len);
void setBody(const std::string& body);

std::map<std::string, std::string> getProperties() const;
void setProperties(std::map<std::string, std::string>& properties);

const std::string toString() const {
std::stringstream ss;
ss << "Message [topic=" << m_topic << ", flag=" << m_flag
<< ", tag=" << getTags() << "]";
return ss.str();
}

protected:
friend class MQDecoder;
void setPropertyInternal(const std::string& name, const std::string& value);
void setPropertiesInternal(std::map<std::string, std::string>& properties);

void Init(const std::string& topic, const std::string& tags, const std::string& keys,
const int flag, const std::string& body, bool waitStoreMsgOK);

public:
static const std::string PROPERTY_KEYS;
static const std::string PROPERTY_TAGS;
static const std::string PROPERTY_WAIT_STORE_MSG_OK;
static const std::string PROPERTY_DELAY_TIME_LEVEL;
static const std::string PROPERTY_RETRY_TOPIC;
static const std::string PROPERTY_REAL_TOPIC;
static const std::string PROPERTY_REAL_QUEUE_ID;
static const std::string PROPERTY_TRANSACTION_PREPARED;
static const std::string PROPERTY_PRODUCER_GROUP;
static const std::string PROPERTY_MIN_OFFSET;
static const std::string PROPERTY_MAX_OFFSET;

static const std::string PROPERTY_BUYER_ID;
static const std::string PROPERTY_ORIGIN_MESSAGE_ID;
static const std::string PROPERTY_TRANSFER_FLAG;
static const std::string PROPERTY_CORRECTION_FLAG;
static const std::string PROPERTY_MQ2_FLAG;
static const std::string PROPERTY_RECONSUME_TIME;
static const std::string PROPERTY_MSG_REGION;
static const std::string PROPERTY_TRACE_SWITCH;
static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX;
static const std::string PROPERTY_MAX_RECONSUME_TIMES;
static const std::string PROPERTY_CONSUME_START_TIMESTAMP;
static const std::string PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET;
static const std::string PROPERTY_TRANSACTION_CHECK_TIMES;
static const std::string PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS;

static const std::string KEY_SEPARATOR;

protected:
int m_sysFlag;

private:
std::string m_topic;
int m_flag;
std::string m_body;
std::map<std::string, std::string> m_properties;
};
//<!***************************************************************************
} //<!end namespace;
#endif
4 changes: 0 additions & 4 deletions include/MQMessageExt.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ class ROCKETMQCLIENT_API MQMessageExt : public MQMessage {
const std::string& getOffsetMsgId() const;
void setOffsetMsgId(const std::string& offsetMsgId);

int getSysFlag() const;
void setSysFlag(int sysFlag);

int getBodyCRC() const;
void setBodyCRC(int bodyCRC);

Expand Down Expand Up @@ -108,7 +105,6 @@ class ROCKETMQCLIENT_API MQMessageExt : public MQMessage {
int64 m_preparedTransactionOffset;
int m_queueId;
int m_storeSize;
int m_sysFlag;
int m_bodyCRC;
int m_reconsumeTimes;
sockaddr m_bornHost;
Expand Down
Loading