Skip to content
This repository has been archived by the owner on Jan 21, 2021. It is now read-only.

Commit

Permalink
Added use of integration headers to zmq bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
fquinner committed Dec 29, 2017
1 parent 533c5be commit 4fc1c46
Show file tree
Hide file tree
Showing 18 changed files with 157 additions and 145 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ before_install:
- cmake .. -DCMAKE_INSTALL_PREFIX=/usr -DSYSINSTALL_BINDINGS=ON -DBUILD_JAVA=OFF -DBUILD_PERL=OFF -DBUILD_PHP=OFF -DBUILD_PYTHON=OFF -DBUILD_RUBY=OFF
- sudo make install
- cd
- git clone https://github.com/OpenMAMA/OpenMAMA.git
- git clone https://github.com/fquinner/OpenMAMA.git
- cd OpenMAMA
- git checkout feature-bridge-public-headers
- "cd /usr/src/gtest && sudo cmake . && sudo cmake --build . && sudo mv libg* /usr/local/lib/ ; cd -"
- scons product=mama middleware=qpid with_unittest=y gtest_home=/usr/local
- export MAMA_SOURCE=$(pwd)
Expand All @@ -33,7 +34,7 @@ before_install:

script:
- cd $SOURCE_DIR
- cmake -DMAMA_SRC=$MAMA_SOURCE -DMAMA_ROOT=$MAMA_INSTALL -DCMAKE_INSTALL_PREFIX=$MAMA_INSTALL .
- cmake -DMAMA_ROOT=$MAMA_INSTALL -DCMAKE_INSTALL_PREFIX=$MAMA_INSTALL .
- make install
- UnitTestCommonC -m zmq -p qpidmsg -i Q
- UnitTestMamaC -m zmq -p qpidmsg -i Q
Expand Down
5 changes: 0 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,11 @@ else()
endif()

# Note path is relative to where it would be used
set(DEFAULT_MAMA_SRC ../../OpenMAMA)
set(DEFAULT_MAMA_ROOT "${DEFAULT_INSTALL_PREFIX}/OpenMAMA")
set(DEFAULT_ZMQ_ROOT "${DEFAULT_INSTALL_PREFIX}/ZeroMQ 4.0.4")
set(DEFAULT_EVENT_ROOT "${DEFAULT_INSTALL_PREFIX}/libevent")
endif()

if(NOT MAMA_SRC)
set(MAMA_SRC ${DEFAULT_MAMA_SRC})
endif()

if(NOT MAMA_ROOT)
set(MAMA_ROOT ${DEFAULT_MAMA_ROOT})
endif()
Expand Down
9 changes: 3 additions & 6 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
include_directories(.)
include_directories(${MAMA_SRC}/common/c_cpp/src/c)
include_directories(${MAMA_SRC}/mama/c_cpp/src/c)
include_directories(${MAMA_SRC}/mama/c_cpp/src/c/bridge/qpid)
include_directories(${MAMA_ROOT}/include)
include_directories(${EVENT_ROOT}/include)
include_directories(${ZMQ_ROOT}/include)
Expand All @@ -12,7 +9,7 @@ link_directories(${MAMA_ROOT}/lib/dynamic-debug)
link_directories(${ZMQ_ROOT}/lib)
link_directories(${EVENT_ROOT}/lib)

add_definitions(-DBRIDGE -DMAMA_DLL)
add_definitions(-DBRIDGE -DMAMA_DLL -DOPENMAMA_INTEGRATION)

if(WIN32)
if (CMAKE_BUILD_TYPE MATCHES "Debug")
Expand Down Expand Up @@ -41,8 +38,7 @@ add_library(mamazmqimpl${MAMA_LIB_SUFFIX}
transport.c
transport.h
zmqbridgefunctions.h
zmqdefs.h
${MAMA_SRC}/mama/c_cpp/src/c/bridge/qpid/endpointpool.c)
zmqdefs.h)

if(WIN32)
target_link_libraries(mamazmqimpl${MAMA_LIB_SUFFIX}
Expand All @@ -53,6 +49,7 @@ if(WIN32)
uuid
Ws2_32)

add_definitions(-D_CRT_SECURE_NO_WARNINGS)
set_target_properties(mamazmqimpl${MAMA_LIB_SUFFIX} PROPERTIES PREFIX "lib")

# Windows Targets
Expand Down
21 changes: 12 additions & 9 deletions src/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
= Includes =
=========================================================================*/

#include <stdint.h>
#include <mama/mama.h>
#include <timers.h>
#include "io.h"
#include "zmqbridgefunctions.h"
#include <mama/integration/mama.h>


/*=========================================================================
Expand Down Expand Up @@ -72,8 +74,8 @@ mama_status zmqBridge_init (mamaBridge bridgeImpl)
mama_status
zmqBridge_open (mamaBridge bridgeImpl)
{
mama_status status = MAMA_STATUS_OK;
mamaBridgeImpl* bridge = (mamaBridgeImpl*) bridgeImpl;
mama_status status = MAMA_STATUS_OK;
mamaQueue defaultEventQueue = NULL;

wsocketstartup();

Expand All @@ -83,7 +85,7 @@ zmqBridge_open (mamaBridge bridgeImpl)
}

/* Create the default event queue */
status = mamaQueue_create (&bridge->mDefaultEventQueue, bridgeImpl);
status = mamaQueue_create (&defaultEventQueue, bridgeImpl);
if (MAMA_STATUS_OK != status)
{
mama_log (MAMA_LOG_LEVEL_ERROR,
Expand All @@ -92,9 +94,10 @@ zmqBridge_open (mamaBridge bridgeImpl)
return status;
}

mamaImpl_setDefaultEventQueue(bridgeImpl, defaultEventQueue);

/* Set the queue name (used to identify this queue in MAMA stats) */
mamaQueue_setQueueName (bridge->mDefaultEventQueue,
ZMQ_DEFAULT_QUEUE_NAME);
mamaQueue_setQueueName (defaultEventQueue, ZMQ_DEFAULT_QUEUE_NAME);

/* Create the timer heap */
if (0 != createTimerHeap (&gOmzmqTimerHeap))
Expand All @@ -121,8 +124,8 @@ zmqBridge_open (mamaBridge bridgeImpl)
mama_status
zmqBridge_close (mamaBridge bridgeImpl)
{
mama_status status = MAMA_STATUS_OK;
mamaBridgeImpl* bridge = (mamaBridgeImpl*) bridgeImpl;
mama_status status = MAMA_STATUS_OK;
mamaQueue defaultEventQueue = NULL;
wthread_t timerThread;

if (NULL == bridgeImpl)
Expand All @@ -147,8 +150,8 @@ zmqBridge_close (mamaBridge bridgeImpl)
gOmzmqTimerHeap = NULL;

/* Destroy once queue has been emptied */
mamaQueue_destroyTimedWait (bridge->mDefaultEventQueue,
ZMQ_SHUTDOWN_TIMEOUT);
mama_getDefaultEventQueue(bridgeImpl, &defaultEventQueue);
mamaQueue_destroyTimedWait (defaultEventQueue, ZMQ_SHUTDOWN_TIMEOUT);

/* Stop and destroy the io thread */
zmqBridgeMamaIoImpl_stop ();
Expand Down
2 changes: 1 addition & 1 deletion src/inbox.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#include <wombat/wUuid.h>
#include <wombat/port.h>
#include "zmqdefs.h"
#include "inbox.h"
#include <mama/integration/inbox.h>
#include "zmqbridgefunctions.h"


Expand Down
3 changes: 0 additions & 3 deletions src/inbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
= Includes =
=========================================================================*/

#include <bridge.h>


#if defined(__cplusplus)
extern "C" {
#endif
Expand Down
1 change: 1 addition & 0 deletions src/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
= Includes =
=========================================================================*/

#include <stdint.h>
#include <mama/mama.h>
#include <mama/io.h>
#include <wombat/port.h>
Expand Down
4 changes: 2 additions & 2 deletions src/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include <stdlib.h>
#include <string.h>
#include <mama/mama.h>
#include <msgimpl.h>
#include <mama/integration/msg.h>
#include "transport.h"
#include "msg.h"

Expand Down Expand Up @@ -741,7 +741,7 @@ zmqBridgeMamaMsgImpl_deserialize (msgBridge msg,

mama_status status = mamaMsgImpl_setMsgBuffer (target,
(void*) bufferPos,
payloadSize,
(uint32_t)payloadSize,
*bufferPos);

return status;
Expand Down
2 changes: 1 addition & 1 deletion src/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
= Includes =
=========================================================================*/

#include <bridge.h>
#include "zmqdefs.h"
#include <mama/integration/msg.h>


#if defined(__cplusplus)
Expand Down
8 changes: 4 additions & 4 deletions src/publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@
=========================================================================*/

#include <string.h>
#include <stdint.h>

#include <mama/mama.h>
#include <mama/inbox.h>
#include <mama/publisher.h>
#include <bridge.h>
#include <inboximpl.h>
#include <msgimpl.h>
#include <mama/integration/inbox.h>
#include <mama/integration/msg.h>
#include "transport.h"
#include "zmqdefs.h"
#include "msg.h"
#include "inbox.h"
#include "subscription.h"
#include "endpointpool.h"
#include "mama/integration/endpointpool.h"
#include "zmqbridgefunctions.h"
#include <errno.h>
#include <zmq.h>
Expand Down
3 changes: 1 addition & 2 deletions src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@

#include <mama/mama.h>
#include <wombat/queue.h>
#include <bridge.h>
#include "queueimpl.h"
#include <mama/integration/queue.h>
#include "zmqbridgefunctions.h"
#include "zmqdefs.h"

Expand Down
1 change: 1 addition & 0 deletions src/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
=========================================================================*/

#include "zmqdefs.h"
#include <mama/integration/queue.h>


#if defined(__cplusplus)
Expand Down
11 changes: 6 additions & 5 deletions src/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@
= Includes =
=========================================================================*/

#include <stdint.h>
#include <string.h>
#include <mama/mama.h>
#include <subscriptionimpl.h>
#include <transportimpl.h>
#include <msgimpl.h>
#include <queueimpl.h>
#include <mama/integration/subscription.h>
#include <mama/integration/transport.h>
#include <mama/integration/msg.h>
#include <mama/integration/queue.h>
#include <wombat/queue.h>
#include "transport.h"
#include "zmqdefs.h"
#include "subscription.h"
#include "endpointpool.h"
#include <mama/integration/endpointpool.h>
#include "zmqbridgefunctions.h"
#include "msg.h"
#include <zmq.h>
Expand Down
4 changes: 2 additions & 2 deletions src/timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ zmqBridgeMamaTimer_create (timerBridge* result,

/* Determine when the next timer should fire */
timeout.tv_sec = (time_t) interval;
timeout.tv_usec = ((interval-timeout.tv_sec) * 1000000.0);
timeout.tv_usec = (long)((interval-timeout.tv_sec) * 1000000.0);

/* Create the first single fire timer */
timerResult = createTimer (&impl->mTimerElement,
Expand Down Expand Up @@ -217,7 +217,7 @@ zmqBridgeMamaTimer_reset (timerBridge timer)

/* Calculate next time interval */
timeout.tv_sec = (time_t) impl->mInterval;
timeout.tv_usec = ((impl->mInterval- timeout.tv_sec) * 1000000.0);
timeout.tv_usec = (long)((impl->mInterval- timeout.tv_sec) * 1000000.0);

/* Create the timer for the next firing */
timerResult = createTimer (&impl->mTimerElement,
Expand Down
39 changes: 25 additions & 14 deletions src/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,21 @@
= Includes =
=========================================================================*/

#include <stdint.h>
#include <mama/mama.h>
#include <queueimpl.h>
#include <msgimpl.h>
#include <queueimpl.h>
#include <subscriptionimpl.h>
#include <transportimpl.h>
#include <mama/integration/mama.h>
#include <mama/integration/queue.h>
#include <mama/integration/msg.h>
#include <mama/integration/subscription.h>
#include <mama/integration/transport.h>
#include <timers.h>
#include <stdio.h>
#include <errno.h>
#include <wombat/queue.h>
#include "transport.h"
#include "zmqdefs.h"
#include "msg.h"
#include "endpointpool.h"
#include "mama/integration/endpointpool.h"
#include "zmqbridgefunctions.h"
#include <zmq.h>
#include <errno.h>
Expand Down Expand Up @@ -317,7 +318,7 @@ zmqBridgeMamaTransport_create (transportBridge* result,
impl->mName = name;

mama_log (MAMA_LOG_LEVEL_FINE,
"zmqBridgeMamaTransport_create(): Initializing Transport %s",
"zmqBridgeMamaTransport_create(): Initializing Transportttt %s",
name);

impl->mMemoryPoolSize = atol(zmqBridgeMamaTransportImpl_getParameter (
Expand All @@ -326,7 +327,9 @@ zmqBridgeMamaTransport_create (transportBridge* result,
TPORT_PARAM_PREFIX,
name,
TPORT_PARAM_MSG_POOL_SIZE));

mama_log(MAMA_LOG_LEVEL_FINE,
"zmqBridgeMamaTransport_create():creating endpoint A %s",
name);
impl->mMemoryNodeSize = atol(zmqBridgeMamaTransportImpl_getParameter (
DEFAULT_MEMNODE_SIZE,
"%s.%s.%s",
Expand Down Expand Up @@ -395,7 +398,9 @@ zmqBridgeMamaTransport_create (transportBridge* result,
impl->mOutgoingAddress[uri_index] = uri;
uri_index++;
}

mama_log(MAMA_LOG_LEVEL_FINE,
"zmqBridgeMamaTransport_create():creating endpoint 1 %s",
name);
status = endpointPool_create (&impl->mSubEndpoints, "mSubEndpoints");
if (MAMA_STATUS_OK != status)
{
Expand All @@ -405,7 +410,9 @@ zmqBridgeMamaTransport_create (transportBridge* result,
free (impl);
return MAMA_STATUS_PLATFORM;
}

mama_log(MAMA_LOG_LEVEL_FINE,
"zmqBridgeMamaTransport_create(): Creating endpoint 2 %s",
name);
status = endpointPool_create (&impl->mPubEndpoints, "mPubEndpoints");
if (MAMA_STATUS_OK != status)
{
Expand Down Expand Up @@ -745,7 +752,6 @@ zmqBridgeMamaTransportImpl_setupSocket (void* socket, const char* uri, zmqTransp
uri,
strerror(errno));
}
return rc;
}
else
{
Expand All @@ -757,9 +763,9 @@ zmqBridgeMamaTransportImpl_setupSocket (void* socket, const char* uri, zmqTransp
rc,
uri,
strerror(errno));
return rc;
}
}
return rc;
}

mama_status
Expand Down Expand Up @@ -895,7 +901,7 @@ zmqBridgeMamaTransportImpl_queueCallback (mamaQueue queue, void* closure)
memoryPool* pool = NULL;
memoryNode* node = (memoryNode*) closure;
zmqTransportMsg* tmsg = (zmqTransportMsg*) node->mNodeBuffer;
uint32_t bufferSize = tmsg->mNodeSize;
uint32_t bufferSize = (uint32_t)tmsg->mNodeSize;
const void* buffer = tmsg->mNodeBuffer;
const char* subject = (char*)buffer;
zmqSubscription* subscription = (zmqSubscription*) tmsg->mSubscription;
Expand Down Expand Up @@ -994,16 +1000,21 @@ const char* zmqBridgeMamaTransportImpl_getParameterWithVaList (
vsnprintf (paramName, PARAM_NAME_MAX_LENGTH,
format, arguments);

mama_log(MAMA_LOG_LEVEL_FINER, "GETTING PARAMETER %s", paramName);
mamaInternal_getProperties();
mama_log(MAMA_LOG_LEVEL_FINER, "CAN GET PROPERTIES");
/* Get the property out for analysis */
property = properties_Get (mamaInternal_getProperties (),
paramName);

mama_log(MAMA_LOG_LEVEL_FINER, "GOT PROPERTY");
/* Properties will return NULL if parameter is not specified in configs */
if (property == NULL)
{
property = defaultVal;
}

mama_log(MAMA_LOG_LEVEL_FINER, "RETURNING", property);

return property;
}

Expand Down
Loading

0 comments on commit 4fc1c46

Please sign in to comment.