From 30d6b53bf5451224ffa6eb32495fc7f0511e99a1 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Fri, 27 Sep 2019 00:51:05 +0800 Subject: [PATCH 01/13] feat(producer) add send message orderly by sharding key --- sample/testProducer.py | 73 +++++++++++++++++++++++++++--------------- src/PythonWrapper.cpp | 14 ++++++++ src/PythonWrapper.h | 2 ++ 3 files changed, 64 insertions(+), 25 deletions(-) diff --git a/sample/testProducer.py b/sample/testProducer.py index 6938fd2..478e198 100644 --- a/sample/testProducer.py +++ b/sample/testProducer.py @@ -1,37 +1,39 @@ -#/* -#* Licensed to the Apache Software Foundation (ASF) under one or more -#* contributor license agreements. See the NOTICE file distributed with -#* this work for additional information regarding copyright ownership. -#* The ASF licenses this file to You under the Apache License, Version 2.0 -#* (the "License"); you may not use this file except in compliance with -#* the License. You may obtain a copy of the License at -#* -#* http://www.apache.org/licenses/LICENSE-2.0 -#* -#* Unless required by applicable law or agreed to in writing, software -#* distributed under the License is distributed on an "AS IS" BASIS, -#* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -#* See the License for the specific language governing permissions and -#* limitations under the License. -#*/ +# /* +# * Licensed to the Apache Software Foundation (ASF) under one or more +# * contributor license agreements. See the NOTICE file distributed with +# * this work for additional information regarding copyright ownership. +# * The ASF licenses this file to You under the Apache License, Version 2.0 +# * (the "License"); you may not use this file except in compliance with +# * the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ from base import * import time + def initProducer(name): print("---------Create Producer---------------") - producer =CreateProducer(name) - SetProducerNameServerAddress(producer,"172.17.0.2:9876") + producer = CreateProducer(name) + SetProducerNameServerAddress(producer, "172.17.0.2:9876") StartProducer(producer) return producer -def testSendMssage(producer,topic,key,body): + +def testSendMssage(producer, topic, key, body): print("Starting Sending.....") msg = CreateMessage(topic) SetMessageBody(msg, body) SetMessageKeys(msg, key) SetMessageTags(msg, "ThisMessageTag.") - result = SendMessageSync(producer,msg) + result = SendMessageSync(producer, msg) print(result) print("Msgid:") print(result.GetMsgId()) @@ -42,21 +44,42 @@ def testSendMssage(producer,topic,key,body): DestroyMessage(msg) print("Done...............") + def testSendMessageOneway(producer, topic, key, body): print("Starting Sending(Oneway).....") msg = CreateMessage(topic) SetMessageBody(msg, body) SetMessageKeys(msg, key) SetMessageTags(msg, "Send Message Oneway Test.") - SendMessageOneway(producer,msg) + SendMessageOneway(producer, msg) + DestroyMessage(msg) + print("Done...............") + + +def testSendMssageOrderly(producer, topic, key, body): + print("Starting Sending.....") + msg = CreateMessage(topic) + SetMessageBody(msg, body) + SetMessageKeys(msg, key) + SetMessageTags(msg, "ThisMessageTag.") + result = SendMessageOrderlyByShardingKey(producer, msg, "orderId", None) + print(result) + print("Msgid:") + print(result.GetMsgId()) + print("Offset:") + print(result.offset) + print("sendStatus:") + print(result.sendStatus) DestroyMessage(msg) print("Done...............") + def releaseProducer(producer): ShutdownProducer(producer) DestroyProducer(producer) print("--------Release producer-----------") + showClientVersion() producer = initProducer("TestPythonProducer") topic = "T_TestTopic" @@ -65,13 +88,13 @@ def releaseProducer(producer): i = 0 while i < 10000: i += 1 - testSendMssage(producer,topic,key,body) - - print("Now Send Message:",i) + testSendMssage(producer, topic, key, body) + + print("Now Send Message:", i) while i < 10: i += 1 testSendMessageOneway(producer, topic, key, body) - print("Now Send Message One way:",i) + print("Now Send Message One way:", i) releaseProducer(producer) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index f924e56..e08138e 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -165,6 +165,19 @@ PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, return ret; } +PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey, void *args) { + PySendResult ret; + CSendResult result; + PyUserData userData = {queueSelector,args}; + SendMessageOrderlyByShardingKey((CProducer *) producer, (CMessage *) msg, shardingKey, &result); + ret.sendStatus = result.sendStatus; + ret.offset = result.offset; + strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); + ret.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; + return ret; +} + + int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) { PyUserData *userData = (PyUserData *)args; int index = boost::python::call(userData->pyObject, size, (void *) msg, userData->pData); @@ -313,6 +326,7 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { def("SendMessageSync", PySendMessageSync); def("SendMessageOneway", PySendMessageOneway); def("SendMessageOrderly", PySendMessageOrderly); + def("SendMessageOrderlyByShardingKey", PySendMessageOrderlyByShardingKey); //For Consumer def("CreatePushConsumer", PyCreatePushConsumer, return_value_policy()); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index 987d839..737485b 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -84,6 +84,8 @@ PySendResult PySendMessageSync(void *producer, void *msg); int PySendMessageOneway(void *producer, void *msg); // PySendResult PySendMessageOrderly(void *producer, void *msg , int autoRetryTimes, PyObject *args, PyObject *callback); PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector); +PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey, void *args); + int PyOrderlyCallbackInner(int size, CMessage *msg, void *args); //sendResult From a505be2e6c432a2dc8de8c351db24d2ae37cfbec Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Fri, 27 Sep 2019 01:05:04 +0800 Subject: [PATCH 02/13] fix(producer) remove useless parameter in send message orderly --- sample/testProducer.py | 6 +++--- src/PythonWrapper.cpp | 3 +-- src/PythonWrapper.h | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sample/testProducer.py b/sample/testProducer.py index 478e198..34023c6 100644 --- a/sample/testProducer.py +++ b/sample/testProducer.py @@ -62,7 +62,7 @@ def testSendMssageOrderly(producer, topic, key, body): SetMessageBody(msg, body) SetMessageKeys(msg, key) SetMessageTags(msg, "ThisMessageTag.") - result = SendMessageOrderlyByShardingKey(producer, msg, "orderId", None) + result = SendMessageOrderlyByShardingKey(producer, msg, "orderId") print(result) print("Msgid:") print(result.GetMsgId()) @@ -86,9 +86,9 @@ def releaseProducer(producer): key = "TestKeys" body = "ThisIsTestBody" i = 0 -while i < 10000: +while i < 100: i += 1 - testSendMssage(producer, topic, key, body) + testSendMssageOrderly(producer, topic, key, body) print("Now Send Message:", i) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index e08138e..2c5e8c2 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -165,10 +165,9 @@ PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, return ret; } -PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey, void *args) { +PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey) { PySendResult ret; CSendResult result; - PyUserData userData = {queueSelector,args}; SendMessageOrderlyByShardingKey((CProducer *) producer, (CMessage *) msg, shardingKey, &result); ret.sendStatus = result.sendStatus; ret.offset = result.offset; diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index 737485b..28bfc1e 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -84,7 +84,7 @@ PySendResult PySendMessageSync(void *producer, void *msg); int PySendMessageOneway(void *producer, void *msg); // PySendResult PySendMessageOrderly(void *producer, void *msg , int autoRetryTimes, PyObject *args, PyObject *callback); PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector); -PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey, void *args); +PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey); int PyOrderlyCallbackInner(int size, CMessage *msg, void *args); From d216bdf4b20b255973b9eb471395a688c162f552 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Sun, 29 Sep 2019 08:38:30 +0800 Subject: [PATCH 03/13] feature(producer) add async send message for producer --- src/PythonWrapper.cpp | 32 ++++++++++++++++++++++++++++++++ src/PythonWrapper.h | 8 +++++++- test/TestSendMessages.py | 15 +++++++++++++++ 3 files changed, 54 insertions(+), 1 deletion(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index 2c5e8c2..b45fc46 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -21,6 +21,7 @@ #include "CProducer.h" #include "CPushConsumer.h" #include "PythonWrapper.h" +#include "CMQException.h" #include #include @@ -31,6 +32,9 @@ const char *VERSION = "PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " "; map> g_CallBackMap; +map> g_CallBackMap; +map g_SendSuccessCallbackMap; +map g_SendExceptionCallbackMap; class PyThreadStateLock { public: @@ -153,6 +157,29 @@ int PySendMessageOneway(void *producer, void *msg) { return SendMessageOneway((CProducer *) producer, (CMessage *) msg); } +void PySendSuccessCallback(CSendResult result){ + map::iterator iter = g_SendSuccessCallbackMap.begin(); + while(iter != pCallbackMap.end()) { + boost::python::call(iter->second, iter->first); + } +} + +void PySendExceptionCallback(CMQException e){ + map::iterator iter = g_SendExceptionCallbackMap.begin(); + while(iter != pCallbackMap.end()) { + boost::python::call(iter->second, iter->first); + } +} + +int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback , PyObject *sendExceptionCallback){ + g_SendSuccessCallbackMap[msg] = sendSuccessCallback; + g_SendExceptionCallbackMap[msg] = sendExceptionCallback; + + return SendMessageAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback); +} + + + PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector) { PySendResult ret; CSendResult result; @@ -323,6 +350,11 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { def("SetProducerInstanceName", PySetProducerInstanceName); def("SetProducerSessionCredentials", PySetProducerSessionCredentials); def("SendMessageSync", PySendMessageSync); + + def("SendSuccessCallback", PySendSuccessCallback); + def("SendExceptionCallback", PySendExceptionCallback); + def("SendMessageAsync", PySendMessageAsync); + def("SendMessageOneway", PySendMessageOneway); def("SendMessageOrderly", PySendMessageOrderly); def("SendMessageOrderlyByShardingKey", PySendMessageOrderlyByShardingKey); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index 28bfc1e..5fd46af 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -22,6 +22,7 @@ #include "CProducer.h" #include "CPushConsumer.h" #include "CPullConsumer.h" +#include "CMQException.h" #include using namespace boost::python; @@ -82,7 +83,12 @@ int PySetProducerInstanceName(void *producer, const char *instanceName); int PySetProducerSessionCredentials(void *producer, const char *accessKey, const char *secretKey, const char *channel); PySendResult PySendMessageSync(void *producer, void *msg); int PySendMessageOneway(void *producer, void *msg); -// PySendResult PySendMessageOrderly(void *producer, void *msg , int autoRetryTimes, PyObject *args, PyObject *callback); + +void PySendSuccessCallback(CSendResult result); +void PySendExceptionCallback(CMQException ex); +int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback , PyObject *sendExceptionCallback); + + PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector); PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey); diff --git a/test/TestSendMessages.py b/test/TestSendMessages.py index 7258d70..dc80784 100644 --- a/test/TestSendMessages.py +++ b/test/TestSendMessages.py @@ -214,6 +214,21 @@ def send_message_orderly(count): DestroyMessage(msg) print 'msg id =' + result.GetMsgId() +def send_message_orderly_with_shardingkey(count): + key = 'rmq-key' + print 'start sending sharding key order-ly message' + tag = 'test' + for n in range(count): + body = 'hi rmq sharding orderly-message, now is' + str(n) + msg = CreateMessage(topic_orderly) + SetMessageBody(msg, body) + SetMessageKeys(msg, key) + SetMessageTags(msg, tag) + + result = SendMessageOrderlyByShardingKey(producer, msg, 'orderId') + DestroyMessage(msg) + print 'msg id =' + result.GetMsgId() + def calc_which_queue_to_send(size, msg, arg): ## it is index start with 0.... return 0 From 07c42cd6322a05a5a6ed386538b5af22590aa99f Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Sun, 29 Sep 2019 09:23:20 +0800 Subject: [PATCH 04/13] fix(producer) fix the call back map redefinition error --- src/PythonWrapper.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index b45fc46..17bd911 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -31,7 +31,6 @@ using namespace std; const char *VERSION = "PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " "; -map> g_CallBackMap; map> g_CallBackMap; map g_SendSuccessCallbackMap; map g_SendExceptionCallbackMap; @@ -159,14 +158,14 @@ int PySendMessageOneway(void *producer, void *msg) { void PySendSuccessCallback(CSendResult result){ map::iterator iter = g_SendSuccessCallbackMap.begin(); - while(iter != pCallbackMap.end()) { + while(iter != g_SendSuccessCallbackMap.end()) { boost::python::call(iter->second, iter->first); } } void PySendExceptionCallback(CMQException e){ map::iterator iter = g_SendExceptionCallbackMap.begin(); - while(iter != pCallbackMap.end()) { + while(iter != g_SendExceptionCallbackMap.end()) { boost::python::call(iter->second, iter->first); } } From cb4347f538df6fe52765ea0db72db30a82f4c563 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Wed, 9 Oct 2019 15:27:50 +0800 Subject: [PATCH 05/13] feat(producer) add compress and max message size set support --- src/PythonWrapper.cpp | 12 +++++++++++- src/PythonWrapper.h | 3 +++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index 17bd911..cf80639 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -141,6 +141,14 @@ int PySetProducerInstanceName(void *producer, const char *instanceName) { int PySetProducerSessionCredentials(void *producer, const char *accessKey, const char *secretKey, const char *channel) { return SetProducerSessionCredentials((CProducer *)producer, accessKey, secretKey, channel); } +int PySetProducerCompressLevel(void *producer, int level) { + return SetProducerCompressLevel((CProducer *)producer, level); +} +int PySetProducerMaxMessageSize(void *producer, int size) { + return SetProducerMaxMessageSize((CProducer *)producer, size); +} + + PySendResult PySendMessageSync(void *producer, void *msg) { PySendResult ret; CSendResult result; @@ -348,8 +356,10 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { def("SetProducerNameServerDomain", PySetProducerNameServerDomain); def("SetProducerInstanceName", PySetProducerInstanceName); def("SetProducerSessionCredentials", PySetProducerSessionCredentials); - def("SendMessageSync", PySendMessageSync); + def("SetProducerCompressLevel", PySetProducerCompressLevel); + def("SetProducerMaxMessageSize", PySetProducerMaxMessageSize); + def("SendMessageSync", PySendMessageSync); def("SendSuccessCallback", PySendSuccessCallback); def("SendExceptionCallback", PySendExceptionCallback); def("SendMessageAsync", PySendMessageAsync); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index 5fd46af..3dcbf15 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -81,6 +81,9 @@ int PySetProducerNameServerAddress(void *producer, const char *namesrv); int PySetProducerNameServerDomain(void *producer, const char *domain); int PySetProducerInstanceName(void *producer, const char *instanceName); int PySetProducerSessionCredentials(void *producer, const char *accessKey, const char *secretKey, const char *channel); +int PySetProducerCompressLevel(void *producer, int level); +int PySetProducerMaxMessageSize(void *producer, int size); + PySendResult PySendMessageSync(void *producer, void *msg); int PySendMessageOneway(void *producer, void *msg); From 5c5517386389e8d220f626bcb18c1081f372cdc3 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 10 Oct 2019 00:17:11 +0800 Subject: [PATCH 06/13] feat(producer) polish async send message --- src/PythonWrapper.cpp | 35 ++++++++++++++++++----------------- src/PythonWrapper.h | 11 ++++++++++- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index cf80639..32f1566 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -32,8 +32,6 @@ const char *VERSION = "PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " "; map> g_CallBackMap; -map g_SendSuccessCallbackMap; -map g_SendExceptionCallbackMap; class PyThreadStateLock { public: @@ -164,25 +162,27 @@ int PySendMessageOneway(void *producer, void *msg) { return SendMessageOneway((CProducer *) producer, (CMessage *) msg); } -void PySendSuccessCallback(CSendResult result){ - map::iterator iter = g_SendSuccessCallbackMap.begin(); - while(iter != g_SendSuccessCallbackMap.end()) { - boost::python::call(iter->second, iter->first); - } +void PySendSuccessCallback(CSendResult result, CMessage* msg, void* pyCallback){ + PySendResult ret; + PyCallback *callback = (PyCallback *)pyCallback; + PyMessage message = { .pMessage = msg }; + ret.sendStatus = result.sendStatus; + ret.offset = result.offset; + strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); + ret.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; + boost::python::call(callback->successCallback, sendResult, message); } -void PySendExceptionCallback(CMQException e){ - map::iterator iter = g_SendExceptionCallbackMap.begin(); - while(iter != g_SendExceptionCallbackMap.end()) { - boost::python::call(iter->second, iter->first); - } -} -int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback , PyObject *sendExceptionCallback){ - g_SendSuccessCallbackMap[msg] = sendSuccessCallback; - g_SendExceptionCallbackMap[msg] = sendExceptionCallback; +void PySendExceptionCallback(CMQException e, CMessage* msg, void* pyCallback){ + PyCallback *callback = (PyCallback *)pyCallback; + PyMessageExt message = { .pMessage = msg }; + boost::python::call(callback->execptionCallback, message, e); +} - return SendMessageAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback); +int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback){ + PyCallback pyCallback = {sendSuccessCallback, sendExceptionCallback}; + return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback, &pyCallback); } @@ -327,6 +327,7 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { .def_readonly("sendStatus", &PySendResult::sendStatus, "sendStatus") .def("GetMsgId", &PySendResult::GetMsgId); class_("CMessageExt"); + class_("CMessage"); //For Message def("CreateMessage", PyCreateMessage, return_value_policy()); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index 3dcbf15..6ade5cb 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -41,11 +41,20 @@ typedef struct _PyMessageExt_ { CMessageExt *pMessageExt; } PyMessageExt; +typedef struct _PyMessage_ { + CMessage *pMessage; +} PyMessage; + typedef struct _PyUserData_ { PyObject *pyObject; void *pData; } PyUserData; +typedef struct _PyCallback_ { + PyObject *successCallback; + PyObject *execptionCallback; +} PyCallback; + #define PYTHON_CLIENT_VERSION "1.2.0" #define PYCLI_BUILD_DATE "04-12-2018" @@ -89,7 +98,7 @@ int PySendMessageOneway(void *producer, void *msg); void PySendSuccessCallback(CSendResult result); void PySendExceptionCallback(CMQException ex); -int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback , PyObject *sendExceptionCallback); +int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback, void* userData); PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector); From 88a432a48a013389f2369dc4b35ccf2aac23850c Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 10 Oct 2019 00:25:41 +0800 Subject: [PATCH 07/13] fix(producer) fix send message call back delcare error --- src/PythonWrapper.cpp | 12 ++++++------ src/PythonWrapper.h | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index 32f1566..da42c20 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -163,20 +163,20 @@ int PySendMessageOneway(void *producer, void *msg) { } void PySendSuccessCallback(CSendResult result, CMessage* msg, void* pyCallback){ - PySendResult ret; + PySendResult sendResult; PyCallback *callback = (PyCallback *)pyCallback; PyMessage message = { .pMessage = msg }; - ret.sendStatus = result.sendStatus; - ret.offset = result.offset; - strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); - ret.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; + sendResult.sendStatus = result.sendStatus; + sendResult.offset = result.offset; + strncpy(sendResult.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); + sendResult.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; boost::python::call(callback->successCallback, sendResult, message); } void PySendExceptionCallback(CMQException e, CMessage* msg, void* pyCallback){ PyCallback *callback = (PyCallback *)pyCallback; - PyMessageExt message = { .pMessage = msg }; + PyMessage message = { .pMessage = msg }; boost::python::call(callback->execptionCallback, message, e); } diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index 6ade5cb..bf6c7ce 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -96,9 +96,9 @@ int PySetProducerMaxMessageSize(void *producer, int size); PySendResult PySendMessageSync(void *producer, void *msg); int PySendMessageOneway(void *producer, void *msg); -void PySendSuccessCallback(CSendResult result); -void PySendExceptionCallback(CMQException ex); -int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback, void* userData); +void PySendSuccessCallback(CSendResult result, CMessage* msg, void* pyCallback); +void PySendExceptionCallback(CMQException e, CMessage* msg, void* pyCallback); +int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback); PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector); From 35498a5f3363d074790fae4e713433454437e804 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 10 Oct 2019 00:37:01 +0800 Subject: [PATCH 08/13] fix(producer) remove Message convert --- src/PythonWrapper.cpp | 7 ++----- src/PythonWrapper.h | 4 ---- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index da42c20..0c3657d 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -165,19 +165,17 @@ int PySendMessageOneway(void *producer, void *msg) { void PySendSuccessCallback(CSendResult result, CMessage* msg, void* pyCallback){ PySendResult sendResult; PyCallback *callback = (PyCallback *)pyCallback; - PyMessage message = { .pMessage = msg }; sendResult.sendStatus = result.sendStatus; sendResult.offset = result.offset; strncpy(sendResult.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); sendResult.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; - boost::python::call(callback->successCallback, sendResult, message); + boost::python::call(callback->successCallback, sendResult, (void *) msg); } void PySendExceptionCallback(CMQException e, CMessage* msg, void* pyCallback){ PyCallback *callback = (PyCallback *)pyCallback; - PyMessage message = { .pMessage = msg }; - boost::python::call(callback->execptionCallback, message, e); + boost::python::call(callback->execptionCallback, (void *) msg, e); } int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback){ @@ -327,7 +325,6 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { .def_readonly("sendStatus", &PySendResult::sendStatus, "sendStatus") .def("GetMsgId", &PySendResult::GetMsgId); class_("CMessageExt"); - class_("CMessage"); //For Message def("CreateMessage", PyCreateMessage, return_value_policy()); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index bf6c7ce..f9beea9 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -41,10 +41,6 @@ typedef struct _PyMessageExt_ { CMessageExt *pMessageExt; } PyMessageExt; -typedef struct _PyMessage_ { - CMessage *pMessage; -} PyMessage; - typedef struct _PyUserData_ { PyObject *pyObject; void *pData; From 44406b2116f8d32ea30b10191cfdeee22a6604bc Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 10 Oct 2019 14:38:30 +0800 Subject: [PATCH 09/13] feat(producer) add thread lock for send message async --- src/PythonWrapper.cpp | 27 ++++++++++++++------------- src/PythonWrapper.h | 6 +++--- test/TestSendMessages.py | 25 ++++++++++++++++++++++++- 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index 0c3657d..f515c9e 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -116,6 +116,7 @@ const char *PyGetMessageId(PyMessageExt msgExt) { //producer void *PyCreateProducer(const char *groupId) { + PyEval_InitThreads(); // ensure create GIL, for call Python callback from C. return (void *) CreateProducer(groupId); } int PyDestroyProducer(void *producer) { @@ -125,6 +126,7 @@ int PyStartProducer(void *producer) { return StartProducer((CProducer *) producer); } int PyShutdownProducer(void *producer) { + PyThreadStateUnlock PyThreadUnlock; // Shutdown Producer is a block call, ensure thread don't hold GIL. return ShutdownProducer((CProducer *) producer); } int PySetProducerNameServerAddress(void *producer, const char *namesrv) { @@ -162,20 +164,22 @@ int PySendMessageOneway(void *producer, void *msg) { return SendMessageOneway((CProducer *) producer, (CMessage *) msg); } -void PySendSuccessCallback(CSendResult result, CMessage* msg, void* pyCallback){ +void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback){ + PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback PySendResult sendResult; - PyCallback *callback = (PyCallback *)pyCallback; sendResult.sendStatus = result.sendStatus; sendResult.offset = result.offset; strncpy(sendResult.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); sendResult.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; + PyCallback *callback = (PyCallback *)pyCallback; boost::python::call(callback->successCallback, sendResult, (void *) msg); } -void PySendExceptionCallback(CMQException e, CMessage* msg, void* pyCallback){ +void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback){ + PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback PyCallback *callback = (PyCallback *)pyCallback; - boost::python::call(callback->execptionCallback, (void *) msg, e); + boost::python::call(callback->exceptionCallback, (void *) msg, e); } int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback){ @@ -197,6 +201,12 @@ PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, return ret; } +int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) { + PyUserData *userData = (PyUserData *)args; + int index = boost::python::call(userData->pyObject, size, (void *) msg, userData->pData); + return index; +} + PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey) { PySendResult ret; CSendResult result; @@ -208,13 +218,6 @@ PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const return ret; } - -int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) { - PyUserData *userData = (PyUserData *)args; - int index = boost::python::call(userData->pyObject, size, (void *) msg, userData->pData); - return index; -} - //SendResult const char *PyGetSendResultMsgID(CSendResult &sendResult) { return (const char *) (sendResult.msgId); @@ -358,8 +361,6 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { def("SetProducerMaxMessageSize", PySetProducerMaxMessageSize); def("SendMessageSync", PySendMessageSync); - def("SendSuccessCallback", PySendSuccessCallback); - def("SendExceptionCallback", PySendExceptionCallback); def("SendMessageAsync", PySendMessageAsync); def("SendMessageOneway", PySendMessageOneway); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index f9beea9..d978984 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -48,7 +48,7 @@ typedef struct _PyUserData_ { typedef struct _PyCallback_ { PyObject *successCallback; - PyObject *execptionCallback; + PyObject *exceptionCallback; } PyCallback; #define PYTHON_CLIENT_VERSION "1.2.0" @@ -92,8 +92,8 @@ int PySetProducerMaxMessageSize(void *producer, int size); PySendResult PySendMessageSync(void *producer, void *msg); int PySendMessageOneway(void *producer, void *msg); -void PySendSuccessCallback(CSendResult result, CMessage* msg, void* pyCallback); -void PySendExceptionCallback(CMQException e, CMessage* msg, void* pyCallback); +void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback); +void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback); int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback); diff --git a/test/TestSendMessages.py b/test/TestSendMessages.py index dc80784..7da9157 100644 --- a/test/TestSendMessages.py +++ b/test/TestSendMessages.py @@ -231,6 +231,29 @@ def send_message_orderly_with_shardingkey(count): def calc_which_queue_to_send(size, msg, arg): ## it is index start with 0.... return 0 - + +def send_message_async(count): + key = 'rmq-key' + print 'start sending message' + tag = 'test' + for n in range(count): + body = 'hi rmq message, now is' + str(n) + msg = CreateMessage(topic) + SetMessageBody(msg, body) + SetMessageKeys(msg, key) + SetMessageTags(msg, tag) + + SendMessageAsync(producer, msg, send_message_async_success, send_message_async_fail) + DestroyMessage(msg) + print 'send done' + +def send_message_async_success(result, msg): + print 'send success' + pass + +def send_message_async_fail(msg, exception): + print 'send failed' + pass + if __name__ == '__main__': send_message_orderly(10) From 72ffeeb5830f0303292e5e62e7a7241b110e4e3d Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 10 Oct 2019 15:34:57 +0800 Subject: [PATCH 10/13] fix(producer) polish async send callback implementation --- src/PythonWrapper.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index f515c9e..f54a46d 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -173,6 +173,7 @@ void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback){ sendResult.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; PyCallback *callback = (PyCallback *)pyCallback; boost::python::call(callback->successCallback, sendResult, (void *) msg); + delete pyCallback; } @@ -180,11 +181,14 @@ void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback){ PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback PyCallback *callback = (PyCallback *)pyCallback; boost::python::call(callback->exceptionCallback, (void *) msg, e); + delete pyCallback; } int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback){ - PyCallback pyCallback = {sendSuccessCallback, sendExceptionCallback}; - return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback, &pyCallback); + PyCallback* pyCallback = new PyCallback(); + pyCallback->successCallback = sendSuccessCallback; + pyCallback->exceptionCallback = sendExceptionCallback; + return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback, (void *)pyCallback); } From e48b3692d9a474607edba9df28a75e6f9dc54340 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 10 Oct 2019 20:55:51 +0800 Subject: [PATCH 11/13] test(producer) add async send message test --- test/TestSendMessages.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/TestSendMessages.py b/test/TestSendMessages.py index 7da9157..a5cde58 100644 --- a/test/TestSendMessages.py +++ b/test/TestSendMessages.py @@ -245,7 +245,8 @@ def send_message_async(count): SendMessageAsync(producer, msg, send_message_async_success, send_message_async_fail) DestroyMessage(msg) - print 'send done' + print 'send async message done' + time.sleep(10000) def send_message_async_success(result, msg): print 'send success' From 37bc0bca0cbcbfd3868630f5bb194c95ee96aa4d Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 10 Oct 2019 23:25:31 +0800 Subject: [PATCH 12/13] feat(producer) add exception mechanism for send message --- src/PythonWrapper.cpp | 18 +++++++++++++++++- src/PythonWrapper.h | 19 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index f54a46d..dec63ce 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -179,8 +179,17 @@ void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback){ void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback){ PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback + PyMQException exception; PyCallback *callback = (PyCallback *)pyCallback; - boost::python::call(callback->exceptionCallback, (void *) msg, e); + exception.error = e.error; + exception.line = e.line; + strncpy(exception.file, e.file, MAX_EXEPTION_FILE_LENGTH - 1); + exception.file[MAX_EXEPTION_FILE_LENGTH - 1] = 0; + strncpy(exception.msg, e.msg, MAX_EXEPTION_MSG_LENGTH - 1); + exception.msg[MAX_EXEPTION_MSG_LENGTH - 1] = 0; + strncpy(exception.type, e.type, MAX_EXEPTION_TYPE_LENGTH - 1); + exception.type[MAX_EXEPTION_TYPE_LENGTH - 1] = 0; + boost::python::call(callback->exceptionCallback, (void *) msg, exception); delete pyCallback; } @@ -333,6 +342,13 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { .def("GetMsgId", &PySendResult::GetMsgId); class_("CMessageExt"); + class_("MQException") + .def_readonly("error", &PyMQException::error, "error") + .def_readonly("line", &PyMQException::line, "line") + .def("GetFile", &PyMQException::GetFile) + .def("GetMsg", &PyMQException::GetMsg) + .def("GetType", &PyMQException::GetType); + //For Message def("CreateMessage", PyCreateMessage, return_value_policy()); def("DestroyMessage", PyDestroyMessage); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index d978984..c5bc5b9 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -37,6 +37,25 @@ typedef struct _PySendResult_ { } } PySendResult; +typedef struct _PyMQException_ { + int error; + int line; + char file[MAX_EXEPTION_FILE_LENGTH]; + char msg[MAX_EXEPTION_MSG_LENGTH]; + char type[MAX_EXEPTION_TYPE_LENGTH]; + + const char *GetFile() { + return (const char *) file; + } + const char *GetMsg() { + return (const char *) msg; + } + const char *GetType() { + return (const char *) type; + } +} PyMQException; + + typedef struct _PyMessageExt_ { CMessageExt *pMessageExt; } PyMessageExt; From d905905916dbccdf563f75c7720573f6b15307e4 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 10 Oct 2019 23:47:05 +0800 Subject: [PATCH 13/13] test(producer) add exception test in async send message --- test/TestSendMessages.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/TestSendMessages.py b/test/TestSendMessages.py index a5cde58..142b6d8 100644 --- a/test/TestSendMessages.py +++ b/test/TestSendMessages.py @@ -250,11 +250,11 @@ def send_message_async(count): def send_message_async_success(result, msg): print 'send success' - pass + print 'msg id =' + result.GetMsgId() def send_message_async_fail(msg, exception): - print 'send failed' - pass + print 'send message failed' + print 'error msg: ' + exception.GetMsg() if __name__ == '__main__': send_message_orderly(10)