Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions example/PullConsumeMessage.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,57 @@ void thread_sleep(unsigned milliseconds) {
#endif
}

int main(int argc,char * argv [])
{
int i = 0;
int main(int argc, char *argv[]) {
int i = 0, j = 0;
int ret = 0;
int size = 0;
CMessageQueue *mqs = NULL;
printf("PullConsumer Initializing....\n");
CPullConsumer* consumer = CreatePullConsumer("Group_Consumer_Test");
SetPullConsumerNameServerAddress(consumer,"172.17.0.2:9876");
CPullConsumer *consumer = CreatePullConsumer("Group_Consumer_Test");
SetPullConsumerNameServerAddress(consumer, "172.17.0.2:9876");
StartPullConsumer(consumer);
printf("Pull Consumer Start...\n");
for( i=0; i<10; i++)
{
printf("Now Running : %d S\n",i*10);
thread_sleep(10000);
for (i = 1; i <= 5; i++) {
printf("FetchSubscriptionMessageQueues : %d times\n", i);
ret = FetchSubscriptionMessageQueues(consumer, "T_TestTopic", &mqs, &size);
if(ret != OK) {
printf("Get MQ Queue Failed,ErrorCode:%d\n", ret);
}
printf("Get MQ Size:%d\n", size);
for (j = 0; j < size; j++) {
int noNewMsg = 0;
long long tmpoffset = 0;
printf("Pull Message For Topic:%s,Queue:%s,QueueId:%d\n", mqs[j].topic, mqs[j].brokerName, mqs[j].queueId);
do {
int k = 0;
CPullResult pullResult = Pull(consumer, &mqs[j], "*", tmpoffset, 32);
if (pullResult.pullStatus != E_BROKER_TIMEOUT) {
tmpoffset = pullResult.nextBeginOffset;
}
printf("PullStatus:%d,MaxOffset:%lld,MinOffset:%lld,NextBegainOffset:%lld", pullResult.pullStatus,
pullResult.maxOffset, pullResult.minOffset, pullResult.nextBeginOffset);
switch (pullResult.pullStatus) {
case E_FOUND:
printf("Get Message Size:%d\n", pullResult.size);
for (k = 0; k < pullResult.size; ++k) {
printf("Got Message ID:%s,Body:%s\n", GetMessageId(pullResult.msgFoundList[k]),GetMessageBody(pullResult.msgFoundList[k]));
}
break;
case E_NO_MATCHED_MSG:
noNewMsg = 1;
break;
default:
noNewMsg = 0;
}
ReleasePullResult(pullResult);
thread_sleep(100);
} while (noNewMsg == 0);
thread_sleep(1000);
}
thread_sleep(2000);
ReleaseSubscriptionMessageQueue(mqs);
}
thread_sleep(5000);
ShutdownPullConsumer(consumer);
DestroyPullConsumer(consumer);
printf("PullConsumer Shutdown!\n");
Expand Down
1 change: 1 addition & 0 deletions include/CProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ ROCKETMQCLIENT_API int StartProducer(CProducer *producer);
ROCKETMQCLIENT_API int ShutdownProducer(CProducer *producer);

ROCKETMQCLIENT_API int SetProducerNameServerAddress(CProducer *producer, const char *namesrv);
ROCKETMQCLIENT_API int SetProducerNameServerDomain(CProducer *producer, const char *domain);
ROCKETMQCLIENT_API int SetProducerGroupName(CProducer *producer, const char *groupName);
ROCKETMQCLIENT_API int SetProducerInstanceName(CProducer *producer, const char *instanceName);
ROCKETMQCLIENT_API int SetProducerSessionCredentials(CProducer *producer, const char *accessKey, const char *secretKey,
Expand Down
31 changes: 16 additions & 15 deletions include/CPullConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,26 @@ extern "C" {
typedef struct CPullConsumer CPullConsumer;


CPullConsumer *CreatePullConsumer(const char *groupId);
int DestroyPullConsumer(CPullConsumer *consumer);
int StartPullConsumer(CPullConsumer *consumer);
int ShutdownPullConsumer(CPullConsumer *consumer);
int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId);
const char *GetPullConsumerGroupID(CPullConsumer *consumer);
int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv);
int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
ROCKETMQCLIENT_API CPullConsumer *CreatePullConsumer(const char *groupId);
ROCKETMQCLIENT_API int DestroyPullConsumer(CPullConsumer *consumer);
ROCKETMQCLIENT_API int StartPullConsumer(CPullConsumer *consumer);
ROCKETMQCLIENT_API int ShutdownPullConsumer(CPullConsumer *consumer);
ROCKETMQCLIENT_API int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId);
ROCKETMQCLIENT_API const char *GetPullConsumerGroupID(CPullConsumer *consumer);
ROCKETMQCLIENT_API int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv);
ROCKETMQCLIENT_API int SetPullConsumerNameServerDomain(CPullConsumer *consumer, const char *domain);
ROCKETMQCLIENT_API int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
const char *channel);
int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath);
int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize);
int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level);
ROCKETMQCLIENT_API int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath);
ROCKETMQCLIENT_API int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize);
ROCKETMQCLIENT_API int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level);

int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue **mqs, int *size);
int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs);
ROCKETMQCLIENT_API int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue **mqs, int *size);
ROCKETMQCLIENT_API int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs);

CPullResult
ROCKETMQCLIENT_API CPullResult
Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums);
int ReleasePullResult(CPullResult pullResult);
ROCKETMQCLIENT_API int ReleasePullResult(CPullResult pullResult);

#ifdef __cplusplus
};
Expand Down
12 changes: 6 additions & 6 deletions include/CPullResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
#ifdef __cplusplus
extern "C" {
#endif
typedef enum E_CPullStatus
{
typedef enum E_CPullStatus {
E_FOUND,
E_NO_NEW_MSG,
E_NO_MATCHED_MSG,
Expand All @@ -35,11 +34,12 @@ typedef enum E_CPullStatus

typedef struct _CPullResult_ {
CPullStatus pullStatus;
long long nextBeginOffset;
long long minOffset;
long long maxOffset;
CMessageExt** msgFoundList;
long long nextBeginOffset;
long long minOffset;
long long maxOffset;
CMessageExt **msgFoundList;
int size;
void *pData;
} CPullResult;

#ifdef __cplusplus
Expand Down
1 change: 1 addition & 0 deletions include/CPushConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ ROCKETMQCLIENT_API int ShutdownPushConsumer(CPushConsumer *consumer);
ROCKETMQCLIENT_API int SetPushConsumerGroupID(CPushConsumer *consumer, const char *groupId);
ROCKETMQCLIENT_API const char *GetPushConsumerGroupID(CPushConsumer *consumer);
ROCKETMQCLIENT_API int SetPushConsumerNameServerAddress(CPushConsumer *consumer, const char *namesrv);
ROCKETMQCLIENT_API int SetPushConsumerNameServerDomain(CPushConsumer *consumer, const char *domain);
ROCKETMQCLIENT_API int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression);
ROCKETMQCLIENT_API int RegisterMessageCallbackOrderly(CPushConsumer *consumer, MessageCallBack pCallback);
ROCKETMQCLIENT_API int RegisterMessageCallback(CPushConsumer *consumer, MessageCallBack pCallback);
Expand Down
8 changes: 7 additions & 1 deletion src/extern/CProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ int SetProducerNameServerAddress(CProducer *producer, const char *namesrv) {
((DefaultMQProducer *) producer)->setNamesrvAddr(namesrv);
return OK;
}

int SetProducerNameServerDomain(CProducer *producer, const char *domain) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->setNamesrvDomain(domain);
return OK;
}
int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) {
//CSendResult sendResult;
if (producer == NULL || msg == NULL || result == NULL) {
Expand Down
34 changes: 26 additions & 8 deletions src/extern/CPullConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesr
((DefaultMQPullConsumer *) consumer)->setNamesrvAddr(namesrv);
return OK;
}
int SetPullConsumerNameServerDomain(CPullConsumer *consumer, const char *domain) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPullConsumer *) consumer)->setNamesrvDomain(domain);
return OK;
}
int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
const char *channel) {
if (consumer == NULL) {
Expand Down Expand Up @@ -119,24 +126,26 @@ int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, C
return NULL_POINTER;
}
unsigned int index = 0;
CMessageQueue *temMQ = NULL;
std::vector<MQMessageQueue> fullMQ;
try {
((DefaultMQPullConsumer *) consumer)->fetchSubscribeMessageQueues(topic, fullMQ);
*size = fullMQ.size();
//Alloc memory to save the pointer to CPP MessageQueue, and the MessageQueues may be changed.
//Thus, this memory should be released by users using @ReleaseSubscribeMessageQueue every time.
*mqs = (CMessageQueue *) malloc(*size * sizeof(CMessageQueue));
if (*mqs == NULL) {
temMQ = (CMessageQueue *) malloc(*size * sizeof(CMessageQueue));
if (temMQ == NULL) {
*size = 0;
*mqs = NULL;
return MALLOC_FAILED;
}
auto iter = fullMQ.begin();
for (index = 0; iter != fullMQ.end() && index <= fullMQ.size(); ++iter, index++) {
strncpy(mqs[index]->topic, iter->getTopic().c_str(), MAX_TOPIC_LENGTH - 1);
strncpy(mqs[index]->brokerName, iter->getBrokerName().c_str(), MAX_BROKER_NAME_ID_LENGTH - 1);
mqs[index]->queueId = iter->getQueueId();
strncpy(temMQ[index].topic, iter->getTopic().c_str(), MAX_TOPIC_LENGTH - 1);
strncpy(temMQ[index].brokerName, iter->getBrokerName().c_str(), MAX_BROKER_NAME_ID_LENGTH - 1);
temMQ[index].queueId = iter->getQueueId();
}
*mqs = temMQ;
} catch (MQException &e) {
*size = 0;
*mqs = NULL;
Expand All @@ -160,7 +169,7 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression
PullResult cppPullResult;
try {
cppPullResult = ((DefaultMQPullConsumer *) consumer)->pull(messageQueue, subExpression, offset, maxNums);
}catch (exception &e){
} catch (exception &e) {
cppPullResult.pullStatus = BROKER_TIMEOUT;
}

Expand All @@ -171,11 +180,13 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression
pullResult.minOffset = cppPullResult.minOffset;
pullResult.nextBeginOffset = cppPullResult.nextBeginOffset;
pullResult.size = cppPullResult.msgFoundList.size();
PullResult *tmpPullResult = new PullResult(cppPullResult);
pullResult.pData = tmpPullResult;
//Alloc memory to save the pointer to CPP MQMessageExt, which will be release by the CPP SDK core.
//Thus, this memory should be released by users using @ReleasePullResult
pullResult.msgFoundList = (CMessageExt **) malloc(pullResult.size * sizeof(CMessageExt *));
for (size_t i = 0; i < cppPullResult.msgFoundList.size(); i++) {
MQMessageExt *msg = const_cast<MQMessageExt *>(&cppPullResult.msgFoundList[i]);
MQMessageExt *msg = const_cast<MQMessageExt *>(&tmpPullResult->msgFoundList[i]);
pullResult.msgFoundList[i] = (CMessageExt *) (msg);
}
break;
Expand Down Expand Up @@ -204,9 +215,16 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression
return pullResult;
}
int ReleasePullResult(CPullResult pullResult) {
if (pullResult.size == 0 || pullResult.msgFoundList == NULL) {
if (pullResult.size == 0 || pullResult.msgFoundList == NULL || pullResult.pData == NULL) {
return NULL_POINTER;
}
if (pullResult.pData != NULL) {
try {
delete ((PullResult *) pullResult.pData);
} catch (exception &e) {
return NULL_POINTER;
}
}
free((void *) pullResult.msgFoundList);
pullResult.msgFoundList = NULL;
return OK;
Expand Down
7 changes: 7 additions & 0 deletions src/extern/CPushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ int SetPushConsumerNameServerAddress(CPushConsumer *consumer, const char *namesr
((DefaultMQPushConsumer *) consumer)->setNamesrvAddr(namesrv);
return OK;
}
int SetPushConsumerNameServerDomain(CPushConsumer *consumer, const char *domain) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setNamesrvDomain(domain);
return OK;
}
int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression) {
if (consumer == NULL) {
return NULL_POINTER;
Expand Down