diff --git a/example/PullConsumeMessage.c b/example/PullConsumeMessage.c index 44e2abdb0..8be2207c5 100644 --- a/example/PullConsumeMessage.c +++ b/example/PullConsumeMessage.c @@ -40,19 +40,57 @@ void thread_sleep(unsigned milliseconds) { #endif } -int main(int argc,char * argv []) -{ - int i = 0; +int main(int argc, char *argv[]) { + int i = 0, j = 0; + int ret = 0; + int size = 0; + CMessageQueue *mqs = NULL; printf("PullConsumer Initializing....\n"); - CPullConsumer* consumer = CreatePullConsumer("Group_Consumer_Test"); - SetPullConsumerNameServerAddress(consumer,"172.17.0.2:9876"); + CPullConsumer *consumer = CreatePullConsumer("Group_Consumer_Test"); + SetPullConsumerNameServerAddress(consumer, "172.17.0.2:9876"); StartPullConsumer(consumer); printf("Pull Consumer Start...\n"); - for( i=0; i<10; i++) - { - printf("Now Running : %d S\n",i*10); - thread_sleep(10000); + for (i = 1; i <= 5; i++) { + printf("FetchSubscriptionMessageQueues : %d times\n", i); + ret = FetchSubscriptionMessageQueues(consumer, "T_TestTopic", &mqs, &size); + if(ret != OK) { + printf("Get MQ Queue Failed,ErrorCode:%d\n", ret); + } + printf("Get MQ Size:%d\n", size); + for (j = 0; j < size; j++) { + int noNewMsg = 0; + long long tmpoffset = 0; + printf("Pull Message For Topic:%s,Queue:%s,QueueId:%d\n", mqs[j].topic, mqs[j].brokerName, mqs[j].queueId); + do { + int k = 0; + CPullResult pullResult = Pull(consumer, &mqs[j], "*", tmpoffset, 32); + if (pullResult.pullStatus != E_BROKER_TIMEOUT) { + tmpoffset = pullResult.nextBeginOffset; + } + printf("PullStatus:%d,MaxOffset:%lld,MinOffset:%lld,NextBegainOffset:%lld", pullResult.pullStatus, + pullResult.maxOffset, pullResult.minOffset, pullResult.nextBeginOffset); + switch (pullResult.pullStatus) { + case E_FOUND: + printf("Get Message Size:%d\n", pullResult.size); + for (k = 0; k < pullResult.size; ++k) { + printf("Got Message ID:%s,Body:%s\n", GetMessageId(pullResult.msgFoundList[k]),GetMessageBody(pullResult.msgFoundList[k])); + } + break; + case E_NO_MATCHED_MSG: + noNewMsg = 1; + break; + default: + noNewMsg = 0; + } + ReleasePullResult(pullResult); + thread_sleep(100); + } while (noNewMsg == 0); + thread_sleep(1000); + } + thread_sleep(2000); + ReleaseSubscriptionMessageQueue(mqs); } + thread_sleep(5000); ShutdownPullConsumer(consumer); DestroyPullConsumer(consumer); printf("PullConsumer Shutdown!\n"); diff --git a/include/CProducer.h b/include/CProducer.h index 74bdb9d8b..6edd99e53 100644 --- a/include/CProducer.h +++ b/include/CProducer.h @@ -36,6 +36,7 @@ ROCKETMQCLIENT_API int StartProducer(CProducer *producer); ROCKETMQCLIENT_API int ShutdownProducer(CProducer *producer); ROCKETMQCLIENT_API int SetProducerNameServerAddress(CProducer *producer, const char *namesrv); +ROCKETMQCLIENT_API int SetProducerNameServerDomain(CProducer *producer, const char *domain); ROCKETMQCLIENT_API int SetProducerGroupName(CProducer *producer, const char *groupName); ROCKETMQCLIENT_API int SetProducerInstanceName(CProducer *producer, const char *instanceName); ROCKETMQCLIENT_API int SetProducerSessionCredentials(CProducer *producer, const char *accessKey, const char *secretKey, diff --git a/include/CPullConsumer.h b/include/CPullConsumer.h index c84f8d4a3..a4c4f5aef 100644 --- a/include/CPullConsumer.h +++ b/include/CPullConsumer.h @@ -31,25 +31,26 @@ extern "C" { typedef struct CPullConsumer CPullConsumer; -CPullConsumer *CreatePullConsumer(const char *groupId); -int DestroyPullConsumer(CPullConsumer *consumer); -int StartPullConsumer(CPullConsumer *consumer); -int ShutdownPullConsumer(CPullConsumer *consumer); -int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId); -const char *GetPullConsumerGroupID(CPullConsumer *consumer); -int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv); -int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey, +ROCKETMQCLIENT_API CPullConsumer *CreatePullConsumer(const char *groupId); +ROCKETMQCLIENT_API int DestroyPullConsumer(CPullConsumer *consumer); +ROCKETMQCLIENT_API int StartPullConsumer(CPullConsumer *consumer); +ROCKETMQCLIENT_API int ShutdownPullConsumer(CPullConsumer *consumer); +ROCKETMQCLIENT_API int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId); +ROCKETMQCLIENT_API const char *GetPullConsumerGroupID(CPullConsumer *consumer); +ROCKETMQCLIENT_API int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv); +ROCKETMQCLIENT_API int SetPullConsumerNameServerDomain(CPullConsumer *consumer, const char *domain); +ROCKETMQCLIENT_API int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey, const char *channel); -int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath); -int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize); -int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level); +ROCKETMQCLIENT_API int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath); +ROCKETMQCLIENT_API int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize); +ROCKETMQCLIENT_API int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level); -int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue **mqs, int *size); -int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs); +ROCKETMQCLIENT_API int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue **mqs, int *size); +ROCKETMQCLIENT_API int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs); -CPullResult +ROCKETMQCLIENT_API CPullResult Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums); -int ReleasePullResult(CPullResult pullResult); +ROCKETMQCLIENT_API int ReleasePullResult(CPullResult pullResult); #ifdef __cplusplus }; diff --git a/include/CPullResult.h b/include/CPullResult.h index eb44fbd75..e22fd9ed3 100644 --- a/include/CPullResult.h +++ b/include/CPullResult.h @@ -24,8 +24,7 @@ #ifdef __cplusplus extern "C" { #endif -typedef enum E_CPullStatus -{ +typedef enum E_CPullStatus { E_FOUND, E_NO_NEW_MSG, E_NO_MATCHED_MSG, @@ -35,11 +34,12 @@ typedef enum E_CPullStatus typedef struct _CPullResult_ { CPullStatus pullStatus; - long long nextBeginOffset; - long long minOffset; - long long maxOffset; - CMessageExt** msgFoundList; + long long nextBeginOffset; + long long minOffset; + long long maxOffset; + CMessageExt **msgFoundList; int size; + void *pData; } CPullResult; #ifdef __cplusplus diff --git a/include/CPushConsumer.h b/include/CPushConsumer.h index aee344032..49cbf9645 100644 --- a/include/CPushConsumer.h +++ b/include/CPushConsumer.h @@ -42,6 +42,7 @@ ROCKETMQCLIENT_API int ShutdownPushConsumer(CPushConsumer *consumer); ROCKETMQCLIENT_API int SetPushConsumerGroupID(CPushConsumer *consumer, const char *groupId); ROCKETMQCLIENT_API const char *GetPushConsumerGroupID(CPushConsumer *consumer); ROCKETMQCLIENT_API int SetPushConsumerNameServerAddress(CPushConsumer *consumer, const char *namesrv); +ROCKETMQCLIENT_API int SetPushConsumerNameServerDomain(CPushConsumer *consumer, const char *domain); ROCKETMQCLIENT_API int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression); ROCKETMQCLIENT_API int RegisterMessageCallbackOrderly(CPushConsumer *consumer, MessageCallBack pCallback); ROCKETMQCLIENT_API int RegisterMessageCallback(CPushConsumer *consumer, MessageCallBack pCallback); diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp index 67a99bd02..2c3c9c914 100644 --- a/src/extern/CProducer.cpp +++ b/src/extern/CProducer.cpp @@ -85,7 +85,13 @@ int SetProducerNameServerAddress(CProducer *producer, const char *namesrv) { ((DefaultMQProducer *) producer)->setNamesrvAddr(namesrv); return OK; } - +int SetProducerNameServerDomain(CProducer *producer, const char *domain) { + if (producer == NULL) { + return NULL_POINTER; + } + ((DefaultMQProducer *) producer)->setNamesrvDomain(domain); + return OK; +} int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) { //CSendResult sendResult; if (producer == NULL || msg == NULL || result == NULL) { diff --git a/src/extern/CPullConsumer.cpp b/src/extern/CPullConsumer.cpp index bd71ffb92..415bb5b41 100644 --- a/src/extern/CPullConsumer.cpp +++ b/src/extern/CPullConsumer.cpp @@ -80,6 +80,13 @@ int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesr ((DefaultMQPullConsumer *) consumer)->setNamesrvAddr(namesrv); return OK; } +int SetPullConsumerNameServerDomain(CPullConsumer *consumer, const char *domain) { + if (consumer == NULL) { + return NULL_POINTER; + } + ((DefaultMQPullConsumer *) consumer)->setNamesrvDomain(domain); + return OK; +} int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey, const char *channel) { if (consumer == NULL) { @@ -119,24 +126,26 @@ int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, C return NULL_POINTER; } unsigned int index = 0; + CMessageQueue *temMQ = NULL; std::vector fullMQ; try { ((DefaultMQPullConsumer *) consumer)->fetchSubscribeMessageQueues(topic, fullMQ); *size = fullMQ.size(); //Alloc memory to save the pointer to CPP MessageQueue, and the MessageQueues may be changed. //Thus, this memory should be released by users using @ReleaseSubscribeMessageQueue every time. - *mqs = (CMessageQueue *) malloc(*size * sizeof(CMessageQueue)); - if (*mqs == NULL) { + temMQ = (CMessageQueue *) malloc(*size * sizeof(CMessageQueue)); + if (temMQ == NULL) { *size = 0; *mqs = NULL; return MALLOC_FAILED; } auto iter = fullMQ.begin(); for (index = 0; iter != fullMQ.end() && index <= fullMQ.size(); ++iter, index++) { - strncpy(mqs[index]->topic, iter->getTopic().c_str(), MAX_TOPIC_LENGTH - 1); - strncpy(mqs[index]->brokerName, iter->getBrokerName().c_str(), MAX_BROKER_NAME_ID_LENGTH - 1); - mqs[index]->queueId = iter->getQueueId(); + strncpy(temMQ[index].topic, iter->getTopic().c_str(), MAX_TOPIC_LENGTH - 1); + strncpy(temMQ[index].brokerName, iter->getBrokerName().c_str(), MAX_BROKER_NAME_ID_LENGTH - 1); + temMQ[index].queueId = iter->getQueueId(); } + *mqs = temMQ; } catch (MQException &e) { *size = 0; *mqs = NULL; @@ -160,7 +169,7 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression PullResult cppPullResult; try { cppPullResult = ((DefaultMQPullConsumer *) consumer)->pull(messageQueue, subExpression, offset, maxNums); - }catch (exception &e){ + } catch (exception &e) { cppPullResult.pullStatus = BROKER_TIMEOUT; } @@ -171,11 +180,13 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression pullResult.minOffset = cppPullResult.minOffset; pullResult.nextBeginOffset = cppPullResult.nextBeginOffset; pullResult.size = cppPullResult.msgFoundList.size(); + PullResult *tmpPullResult = new PullResult(cppPullResult); + pullResult.pData = tmpPullResult; //Alloc memory to save the pointer to CPP MQMessageExt, which will be release by the CPP SDK core. //Thus, this memory should be released by users using @ReleasePullResult pullResult.msgFoundList = (CMessageExt **) malloc(pullResult.size * sizeof(CMessageExt *)); for (size_t i = 0; i < cppPullResult.msgFoundList.size(); i++) { - MQMessageExt *msg = const_cast(&cppPullResult.msgFoundList[i]); + MQMessageExt *msg = const_cast(&tmpPullResult->msgFoundList[i]); pullResult.msgFoundList[i] = (CMessageExt *) (msg); } break; @@ -204,9 +215,16 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression return pullResult; } int ReleasePullResult(CPullResult pullResult) { - if (pullResult.size == 0 || pullResult.msgFoundList == NULL) { + if (pullResult.size == 0 || pullResult.msgFoundList == NULL || pullResult.pData == NULL) { return NULL_POINTER; } + if (pullResult.pData != NULL) { + try { + delete ((PullResult *) pullResult.pData); + } catch (exception &e) { + return NULL_POINTER; + } + } free((void *) pullResult.msgFoundList); pullResult.msgFoundList = NULL; return OK; diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp index 9d3ab6be3..2c35c7491 100644 --- a/src/extern/CPushConsumer.cpp +++ b/src/extern/CPushConsumer.cpp @@ -139,6 +139,13 @@ int SetPushConsumerNameServerAddress(CPushConsumer *consumer, const char *namesr ((DefaultMQPushConsumer *) consumer)->setNamesrvAddr(namesrv); return OK; } +int SetPushConsumerNameServerDomain(CPushConsumer *consumer, const char *domain) { + if (consumer == NULL) { + return NULL_POINTER; + } + ((DefaultMQPushConsumer *) consumer)->setNamesrvDomain(domain); + return OK; +} int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression) { if (consumer == NULL) { return NULL_POINTER;