diff --git a/mama/c_cpp/src/c/mama/publisher.h b/mama/c_cpp/src/c/mama/publisher.h index 19595bc41..09feb34b4 100644 --- a/mama/c_cpp/src/c/mama/publisher.h +++ b/mama/c_cpp/src/c/mama/publisher.h @@ -97,6 +97,12 @@ typedef void (MAMACALLTYPE *mama_publisherOnErrorCb) ( const char* info, void* closure); +typedef void (MAMACALLTYPE *mama_publisherOnSuccessCb) ( + mamaPublisher publisher, + mama_status status, + const char* info, + void* closure); + /** * Callbacks for publisher events. * If any cb is NULL then the callback will not be made. @@ -106,6 +112,7 @@ typedef struct mamaPublisherCallbacks mama_publisherOnCreateCb onCreate; mama_publisherOnErrorCb onError; mama_publisherOnDestroyCb onDestroy; + mama_publisherOnSuccessCb onSuccess; } mamaPublisherCallbacks; diff --git a/mama/c_cpp/src/c/publisher.c b/mama/c_cpp/src/c/publisher.c index 4bcd4ceee..a885875e9 100644 --- a/mama/c_cpp/src/c/publisher.c +++ b/mama/c_cpp/src/c/publisher.c @@ -160,6 +160,7 @@ _createByIndex (mamaPublisher* result, { impl->mUserCallbacks.onCreate = publisherCallbacks->onCreate; impl->mUserCallbacks.onError = publisherCallbacks->onError; + impl->mUserCallbacks.onSuccess = publisherCallbacks->onSuccess; impl->mUserCallbacks.onDestroy = publisherCallbacks->onDestroy; } impl->mQueue = queue; @@ -193,6 +194,7 @@ _createByIndex (mamaPublisher* result, mamaPublisherCallbacks_allocate(&cb); cb->onCreate = publisherCallbacks ? impl->mUserCallbacks.onCreate : NULL; cb->onError = publisherCallbacks ? impl->mUserCallbacks.onError : NULL; + cb->onSuccess = publisherCallbacks ? impl->mUserCallbacks.onSuccess : NULL; cb->onDestroy = mamaPublisher_onPublisherDestroyed; /* intercept onDestroy always to track state */ if (NULL != bridgeImpl->bridgeMamaPublisherSetUserCallbacks) @@ -751,6 +753,7 @@ mamaPublisher_getUserCallbacks (mamaPublisher publisher, cb->onCreate = impl->mUserCallbacks.onCreate; cb->onError = impl->mUserCallbacks.onError; + cb->onSuccess = impl->mUserCallbacks.onSuccess; cb->onDestroy = impl->mUserCallbacks.onDestroy; return MAMA_STATUS_OK; diff --git a/mama/c_cpp/src/cpp/MamaPublisher.cpp b/mama/c_cpp/src/cpp/MamaPublisher.cpp index fcd703136..5c6cfc1da 100644 --- a/mama/c_cpp/src/cpp/MamaPublisher.cpp +++ b/mama/c_cpp/src/cpp/MamaPublisher.cpp @@ -295,7 +295,8 @@ namespace Wombat { onPublisherCreate, onPublisherError, - onPublisherDestroy + onPublisherDestroy, + onPublisherSuccess }; mamaTry (mamaPublisher_createWithCallbacks (&mPublisher, @@ -480,9 +481,9 @@ namespace Wombat } void MAMACALLTYPE MamaPublisherImpl::onPublisherError (mamaPublisher publisher, - mama_status status, - const char* info, - void* closure) + mama_status status, + const char* info, + void* closure) { MamaPublisherImpl* i = (MamaPublisherImpl*) closure; if (NULL != i && NULL != i->mCallback) @@ -492,5 +493,18 @@ namespace Wombat } } + void MAMACALLTYPE MamaPublisherImpl::onPublisherSuccess (mamaPublisher publisher, + mama_status status, + const char* info, + void* closure) + { + MamaPublisherImpl* i = (MamaPublisherImpl*)closure; + if (NULL != i && NULL != i->mCallback) + { + MamaStatus cppstatus(status); + i->mCallback->onSuccess(i->mParent, cppstatus, info, i->mClosure); + } + } + } // namespace Wombat diff --git a/mama/c_cpp/src/cpp/MamaPublisherImpl.h b/mama/c_cpp/src/cpp/MamaPublisherImpl.h index 4c0d52180..e46caa58f 100644 --- a/mama/c_cpp/src/cpp/MamaPublisherImpl.h +++ b/mama/c_cpp/src/cpp/MamaPublisherImpl.h @@ -102,9 +102,14 @@ namespace Wombat void* closure); static void MAMACALLTYPE onPublisherError (mamaPublisher publisher, - mama_status status, - const char* info, - void* closure); + mama_status status, + const char* info, + void* closure); + + static void MAMACALLTYPE onPublisherSuccess (mamaPublisher publisher, + mama_status status, + const char* info, + void* closure); mamaPublisher mPublisher; MamaPublisherCallback* mCallback; diff --git a/mama/c_cpp/src/cpp/mama/MamaPublisherCallback.h b/mama/c_cpp/src/cpp/mama/MamaPublisherCallback.h index ed2da9bf2..8062ae5fb 100644 --- a/mama/c_cpp/src/cpp/mama/MamaPublisherCallback.h +++ b/mama/c_cpp/src/cpp/mama/MamaPublisherCallback.h @@ -45,6 +45,12 @@ namespace Wombat const MamaStatus& status, const char* info, void* closure) = 0; + + virtual void onSuccess( + MamaPublisher* publisher, + const MamaStatus& status, + const char* info, + void* closure) = 0; }; } // namespace Wombat diff --git a/mama/c_cpp/src/examples/c/mamapublisherc.c b/mama/c_cpp/src/examples/c/mamapublisherc.c index 9731566b6..6c80a3894 100644 --- a/mama/c_cpp/src/examples/c/mamapublisherc.c +++ b/mama/c_cpp/src/examples/c/mamapublisherc.c @@ -223,6 +223,21 @@ static void MAMACALLTYPE publisherOnErrorCb ( } } +static void MAMACALLTYPE publisherOnSuccessCb ( + mamaPublisher publisher, + mama_status status, + const char* info, + void* closure) +{ + if (gQuietLevel < 1) + { + const char* symbol = ""; + mamaPublisher_getSymbol(publisher, &symbol); + mama_log(MAMA_LOG_LEVEL_FINEST, "publisherOnSuccessCb: %s status=%d/%s info=%s", + symbol, status, mamaStatus_stringForStatus(status), info); + } +} + static void createPublisher (void) { mama_status status; @@ -233,6 +248,7 @@ static void createPublisher (void) mamaPublisherCallbacks_allocate (&cb); cb->onCreate = publisherOnCreateCb; cb->onError = publisherOnErrorCb; + cb->onSuccess = publisherOnSuccessCb; cb->onDestroy = publisherOnDestroyCb; status = mamaPublisher_createWithCallbacks (&gPublisher, gTransport, diff --git a/mama/c_cpp/src/examples/cpp/mamapublishercpp.cpp b/mama/c_cpp/src/examples/cpp/mamapublishercpp.cpp index f305049d5..91843c7bb 100644 --- a/mama/c_cpp/src/examples/cpp/mamapublishercpp.cpp +++ b/mama/c_cpp/src/examples/cpp/mamapublishercpp.cpp @@ -123,6 +123,12 @@ class MamaPublisherSample : public MamaBasicSubscriptionCallback const char* info, void* closure); + void onSuccess ( + MamaPublisher* publisher, + const MamaStatus& status, + const char* info, + void* closure); + private: int msgNumber; @@ -339,6 +345,16 @@ void MamaPublisherSample::onError ( publisher->getSource(), publisher->getSymbol(), status.toString(), info); } +void MamaPublisherSample::onSuccess( + MamaPublisher* publisher, + const MamaStatus& status, + const char* info, + void* closure) +{ + mama_log(MAMA_LOG_LEVEL_FINEST, "onPublishSuccess: %s.%s %s %s", + publisher->getSource(), publisher->getSymbol(), status.toString(), info); +} + void parseCommandLine (int argc, const char **argv) { int i = 0; diff --git a/mama/c_cpp/src/gunittest/c/publishertest.cpp b/mama/c_cpp/src/gunittest/c/publishertest.cpp index c5e883864..53425cd37 100644 --- a/mama/c_cpp/src/gunittest/c/publishertest.cpp +++ b/mama/c_cpp/src/gunittest/c/publishertest.cpp @@ -80,6 +80,7 @@ void MamaPublisherTestC::TearDown(void) */ int pubOnCreateCount = 0; int pubOnErrorCount = 0; +int pubOnSuccessCount = 0; int pubOnDestroyCount = 0; /** @@ -103,6 +104,15 @@ void pubOnError (mamaPublisher publisher, pubOnErrorCount++; } + +void pubOnSuccess (mamaPublisher publisher, + mama_status status, + const char* info, + void* closure) +{ + ++pubOnSuccessCount; +} + /** * Publisher event callbacks via transport topic callbacks */ @@ -136,6 +146,7 @@ TEST_F (MamaPublisherTestC, CreateDestroy) pubOnCreateCount = 0; pubOnErrorCount = 0; + pubOnSuccessCount = 0; pubOnDestroyCount = 0; ASSERT_EQ (MAMA_STATUS_OK, @@ -186,6 +197,7 @@ TEST_F (MamaPublisherTestC, GetTransportImpl) ASSERT_EQ (0, pubOnCreateCount); ASSERT_EQ (0, pubOnErrorCount); + ASSERT_EQ (0, pubOnSuccessCount); ASSERT_EQ (0, pubOnDestroyCount); } @@ -204,6 +216,7 @@ TEST_F (MamaPublisherTestC, Send) pubOnCreateCount = 0; pubOnErrorCount = 0; + pubOnSuccessCount = 0; pubOnDestroyCount = 0; ASSERT_EQ (MAMA_STATUS_OK, mama_open()); @@ -238,6 +251,7 @@ TEST_F (MamaPublisherTestC, Send) ASSERT_EQ (0, pubOnCreateCount); ASSERT_EQ (0, pubOnErrorCount); + ASSERT_EQ (0, pubOnSuccessCount); ASSERT_EQ (0, pubOnDestroyCount); } @@ -260,10 +274,12 @@ TEST_F (MamaPublisherTestC, EventSendWithCallbacks) pubOnCreateCount = 0; pubOnErrorCount = 0; + pubOnSuccessCount = 0; pubOnDestroyCount = 0; mamaPublisherCallbacks_allocate(&cb); cb->onError = (mama_publisherOnErrorCb) pubOnError; + cb->onSuccess = (mama_publisherOnSuccessCb) pubOnSuccess; cb->onCreate = (mama_publisherOnCreateCb) pubOnCreate; cb->onDestroy = (mama_publisherOnDestroyCb) pubOnDestroy; @@ -308,6 +324,7 @@ TEST_F (MamaPublisherTestC, EventSendWithCallbacks) ASSERT_EQ (1, pubOnCreateCount); ASSERT_EQ (0, pubOnErrorCount); + ASSERT_EQ (0, pubOnSuccessCount); // this should be numPublishers but no bridge calls onSuccess yet ASSERT_EQ (1, pubOnDestroyCount); mamaPublisherCallbacks_deallocate(cb); @@ -333,10 +350,12 @@ TEST_F (MamaPublisherTestC, DISABLED_EventSendWithCallbacksBadSource) pubOnCreateCount = 0; pubOnErrorCount = 0; + pubOnSuccessCount = 0; pubOnDestroyCount = 0; mamaPublisherCallbacks_allocate(&cb); cb->onError = (mama_publisherOnErrorCb) pubOnError; + cb->onSuccess = (mama_publisherOnSuccessCb) pubOnSuccess; cb->onCreate = (mama_publisherOnCreateCb) pubOnCreate; cb->onDestroy = (mama_publisherOnDestroyCb) pubOnDestroy; @@ -379,6 +398,7 @@ TEST_F (MamaPublisherTestC, DISABLED_EventSendWithCallbacksBadSource) ASSERT_EQ (1, pubOnCreateCount); ASSERT_EQ (numErrors, pubOnErrorCount); + ASSERT_EQ (0, pubOnSuccessCount); ASSERT_EQ (1, pubOnDestroyCount); mamaPublisherCallbacks_deallocate(cb); @@ -405,12 +425,14 @@ TEST_F (MamaPublisherTestC, EventSendWithCallbacksNoErrorCallback) pubOnCreateCount = 0; pubOnErrorCount = 0; + pubOnSuccessCount = 0; pubOnDestroyCount = 0; mamaPublisherCallbacks_allocate(&cb); cb->onError = NULL; cb->onCreate = (mama_publisherOnCreateCb) pubOnCreate; /* No error callback */ cb->onDestroy = (mama_publisherOnDestroyCb) pubOnDestroy; + cb->onSuccess = (mama_publisherOnSuccessCb) pubOnSuccess; ASSERT_EQ (MAMA_STATUS_OK, mama_open()); @@ -453,6 +475,7 @@ TEST_F (MamaPublisherTestC, EventSendWithCallbacksNoErrorCallback) ASSERT_EQ (1, pubOnCreateCount); ASSERT_EQ (0, pubOnErrorCount); + ASSERT_EQ (0, pubOnSuccessCount); ASSERT_EQ (1, pubOnDestroyCount); mamaPublisherCallbacks_deallocate(cb); @@ -479,10 +502,12 @@ TEST_F (MamaPublisherTestC, EventSendWithCallbacksNoCallbacks) pubOnCreateCount = 0; pubOnErrorCount = 0; + pubOnSuccessCount = 0; pubOnDestroyCount = 0; mamaPublisherCallbacks_allocate(&cb); cb->onError = NULL; + cb->onSuccess = NULL; cb->onCreate = NULL; cb->onDestroy = NULL; @@ -527,6 +552,7 @@ TEST_F (MamaPublisherTestC, EventSendWithCallbacksNoCallbacks) ASSERT_EQ (0, pubOnCreateCount); ASSERT_EQ (0, pubOnErrorCount); + ASSERT_EQ (0, pubOnSuccessCount); ASSERT_EQ (0, pubOnDestroyCount); mamaPublisherCallbacks_deallocate(cb); diff --git a/mama/c_cpp/src/gunittest/cpp/MamaPublisherTest.cpp b/mama/c_cpp/src/gunittest/cpp/MamaPublisherTest.cpp index d35d49d57..74d20ef99 100644 --- a/mama/c_cpp/src/gunittest/cpp/MamaPublisherTest.cpp +++ b/mama/c_cpp/src/gunittest/cpp/MamaPublisherTest.cpp @@ -74,18 +74,21 @@ class TestCallback : public MamaPublisherCallback private: int onCreateCount; int onErrorCount; + int onSuccessCount; int onDestroyCount; public: int getOnCreateCount() { return onCreateCount; } int getOnErrorCount() { return onErrorCount; } + int getOnSuccessCount() { return onSuccessCount; } int getOnDestroyCount() { return onDestroyCount; } TestCallback() { onCreateCount = 0; onErrorCount = 0; + onSuccessCount = 0; onDestroyCount = 0; } @@ -105,6 +108,15 @@ class TestCallback : public MamaPublisherCallback onErrorCount++; } + virtual void onSuccess ( + MamaPublisher* publisher, + const MamaStatus& status, + const char* info, + void* closure) + { + ++onSuccessCount; + } + virtual void onDestroy ( MamaPublisher* publisher, void* closure) @@ -205,6 +217,7 @@ TEST_F(MamaPublisherTest, PublishWithCallbacks) ASSERT_EQ(1, testCallback->getOnCreateCount()); ASSERT_EQ(0, testCallback->getOnErrorCount()); + ASSERT_EQ(0, testCallback->getOnSuccessCount()); // this should be 1 but no bridge calls onSuccess yet delete testCallback; } @@ -262,6 +275,7 @@ TEST_F(MamaPublisherTest, DISABLED_PublishWithCallbacksBadSource) ASSERT_EQ(1, testCallback->getOnCreateCount()); ASSERT_EQ(numPublishes, testCallback->getOnErrorCount()); + ASSERT_EQ(0, testCallback->getOnSuccessCount()); } /** diff --git a/mama/dotnet/src/cs/MamaPublisher.cs b/mama/dotnet/src/cs/MamaPublisher.cs index 44ce7042b..a2e8a1904 100644 --- a/mama/dotnet/src/cs/MamaPublisher.cs +++ b/mama/dotnet/src/cs/MamaPublisher.cs @@ -582,6 +582,22 @@ private static void onError(IntPtr nativeHandle, short status, string topic, Int } } + private static void onSuccess(IntPtr nativeHandle, short status, string topic, IntPtr closure) + { + // Obtain the handle from the closure + GCHandle handle = (GCHandle)closure; + + // Extract the impl from the handle + MamaPublisher pub = (MamaPublisher)handle.Target; + + // Use the impl to invoke the success callback + if (null != pub) + { + // Invoke the callback + pub.mCallback.onSuccess(pub, (MamaStatus.mamaStatus)status, topic); + } + } + private static void onDestroy(IntPtr nativeHandle, IntPtr closure) { // Obtain the handle from the closure @@ -613,12 +629,15 @@ static MamaPublisher() mCallbackDelegates.mCreate = new OnPublisherCreateDelegate(MamaPublisher.onCreate); mCallbackDelegates.mDestroy = new OnPublisherDestroyDelegate(MamaPublisher.onDestroy); mCallbackDelegates.mError = new OnPublisherErrorDelegate(MamaPublisher.onError); + mCallbackDelegates.mSuccess = new OnPublisherSuccessDelegate(MamaPublisher.onSuccess); } private delegate void OnPublisherCreateDelegate(IntPtr nativeHandle, IntPtr closure); private delegate void OnPublisherErrorDelegate(IntPtr nativeHandle, short status, string topic, IntPtr closure); + private delegate void OnPublisherSuccessDelegate(IntPtr nativeHandle, short status, string topic, IntPtr closure); + private delegate void OnPublisherDestroyDelegate(IntPtr nativeHandle, IntPtr closure); // ===================================================================================== @@ -630,6 +649,7 @@ public struct PublisherCallbacks { public OnPublisherCreateDelegate mCreate; public OnPublisherErrorDelegate mError; + public OnPublisherSuccessDelegate mSuccess; public OnPublisherDestroyDelegate mDestroy; public IntPtr mReserved; } diff --git a/mama/dotnet/src/cs/MamaPublisherCallback.cs b/mama/dotnet/src/cs/MamaPublisherCallback.cs index 6b3a16b4d..019a41301 100644 --- a/mama/dotnet/src/cs/MamaPublisherCallback.cs +++ b/mama/dotnet/src/cs/MamaPublisherCallback.cs @@ -44,6 +44,13 @@ public interface MamaPublisherCallback MamaStatus.mamaStatus status, string topic); + /// + /// See interface remarks for details + /// + void onSuccess(MamaPublisher publisher, + MamaStatus.mamaStatus status, + string topic); + /// /// See interface remarks for details /// diff --git a/mama/dotnet/src/examples/MamaPublisher/MamaPublisherCS.cs b/mama/dotnet/src/examples/MamaPublisher/MamaPublisherCS.cs index b247c5b05..03e52f153 100644 --- a/mama/dotnet/src/examples/MamaPublisher/MamaPublisherCS.cs +++ b/mama/dotnet/src/examples/MamaPublisher/MamaPublisherCS.cs @@ -197,12 +197,19 @@ public void onCreate(MamaPublisher publisher) } public void onError(MamaPublisher publisher, - MamaStatus.mamaStatus status, - string topic) + MamaStatus.mamaStatus status, + string topic) { Console.WriteLine("onPublishError: " + topic + " " + status.ToString()); } + public void onSuccess(MamaPublisher publisher, + MamaStatus.mamaStatus status, + string topic) + { + Console.WriteLine("onPublishSuccess: " + topic + " " + status.ToString()); + } + public void onDestroy(MamaPublisher publisher) { if (!quiet) diff --git a/mama/jni/src/c/mamapublisherjni.c b/mama/jni/src/c/mamapublisherjni.c index 0ce87b8e1..5a959280a 100644 --- a/mama/jni/src/c/mamapublisherjni.c +++ b/mama/jni/src/c/mamapublisherjni.c @@ -75,6 +75,7 @@ static jmethodID sendCallbackMethod_g = NULL; static jmethodID publisherCallbackMethoOnCreate_g = NULL; static jmethodID publisherCallbackMethoOnDestroy_g = NULL; static jmethodID publisherCallbackMethoOnError_g = NULL; +static jmethodID publisherCallbackMethoOnSuccess_g = NULL; extern JavaVM* javaVM_g; @@ -122,9 +123,9 @@ static void MAMACALLTYPE publisherOnDestroyCb (mamaPublisher publisher, void* cl } static void MAMACALLTYPE publisherOnErrorCb (mamaPublisher publisher, - mama_status status, - const char* info, - void* closure) + mama_status status, + const char* info, + void* closure) { JNIEnv* env = utils_getENV(javaVM_g); pubCallbackClosure* closureImpl = (pubCallbackClosure*) closure; @@ -139,6 +140,24 @@ static void MAMACALLTYPE publisherOnErrorCb (mamaPublisher publisher, (*env)->DeleteLocalRef(env, jmsg); /* delete this since this thread is not from the JVM */ } +static void MAMACALLTYPE publisherOnSuccessCb (mamaPublisher publisher, + mama_status status, + const char* info, + void* closure) +{ + JNIEnv* env = utils_getENV(javaVM_g); + pubCallbackClosure* closureImpl = (pubCallbackClosure*)closure; + + /* Invoke the onSuccess() callback method */ + jstring jmsg = (*env)->NewStringUTF(env, info); + (*env)->CallVoidMethod(env, closureImpl->mClientCb, + publisherCallbackMethoOnSuccess_g, + closureImpl->mPublisher, + status, + jmsg); + (*env)->DeleteLocalRef(env, jmsg); /* delete this since this thread is not from the JVM */ +} + /****************************************************************************** * Public function implementation *******************************************************************************/ @@ -244,6 +263,7 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create cb->onCreate = publisherOnCreateCb; cb->onDestroy = publisherOnDestroyCb; cb->onError = publisherOnErrorCb; + cb->onSuccess = publisherOnSuccessCb; status = mamaPublisher_createWithCallbacks( &cPublisher, @@ -836,6 +856,8 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher_initIDs "onDestroy", "(Lcom/wombat/mama/MamaPublisher;)V"); publisherCallbackMethoOnError_g = (*env)->GetMethodID(env, publisherCallbackClass, "onError", "(Lcom/wombat/mama/MamaPublisher;SLjava/lang/String;)V"); + publisherCallbackMethoOnSuccess_g = (*env)->GetMethodID(env, publisherCallbackClass, + "onSuccess", "(Lcom/wombat/mama/MamaPublisher;SLjava/lang/String;)V"); /* ----------------------------*/ /* get our send callback class */ diff --git a/mamda/c_cpp/src/examples/mamdapublisher.cpp b/mamda/c_cpp/src/examples/mamdapublisher.cpp index 1bad32a8d..8e0a67c97 100644 --- a/mamda/c_cpp/src/examples/mamdapublisher.cpp +++ b/mamda/c_cpp/src/examples/mamdapublisher.cpp @@ -44,6 +44,7 @@ #include using std::list; +using namespace std; using namespace Wombat; typedef vector SymbolList; diff --git a/mamda/c_cpp/src/examples/orderbooks/listenerBookPublisher.cpp b/mamda/c_cpp/src/examples/orderbooks/listenerBookPublisher.cpp index 5020f73ad..d210bb5e1 100644 --- a/mamda/c_cpp/src/examples/orderbooks/listenerBookPublisher.cpp +++ b/mamda/c_cpp/src/examples/orderbooks/listenerBookPublisher.cpp @@ -48,6 +48,7 @@ #include "../dictrequester.h" #include +using namespace std; using namespace Wombat; typedef list SubscriptionList; diff --git a/mamda/dotnet/src/examples/MamdaTradeTicker/MamdaTradeTicker.cs b/mamda/dotnet/src/examples/MamdaTradeTicker/MamdaTradeTicker.cs index 5a5e24568..3295a9bee 100644 --- a/mamda/dotnet/src/examples/MamdaTradeTicker/MamdaTradeTicker.cs +++ b/mamda/dotnet/src/examples/MamdaTradeTicker/MamdaTradeTicker.cs @@ -20,9 +20,7 @@ */ using System; -using System.Collections; using System.Threading; -using System.Collections; namespace Wombat.Mamda.Examples {