Skip to content
This repository has been archived by the owner on Jan 21, 2021. It is now read-only.

Commit

Permalink
Modified memory pools to be queue-local
Browse files Browse the repository at this point in the history
This will should reduce the amount of contention per-lock when
dispatching messages to internal queues.
  • Loading branch information
fquinner committed Jun 5, 2016
1 parent 8eb1485 commit f683cfd
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 47 deletions.
3 changes: 2 additions & 1 deletion src/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ elif env['PLATFORM'] == "posix":
'.',
],
CFLAGS = [
'-g'
'-g',
'-O3'
],
)
if not env.GetOption('clean') and not env.GetOption('help'):
Expand Down
6 changes: 3 additions & 3 deletions src/inbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
* THE SOFTWARE.
*/

#ifndef MAMA_BRIDGE_QPID_INBOX_H__
#define MAMA_BRIDGE_QPID_INBOX_H__
#ifndef MAMA_BRIDGE_ZMQ_INBOX_H__
#define MAMA_BRIDGE_ZMQ_INBOX_H__


/*=========================================================================
Expand Down Expand Up @@ -57,4 +57,4 @@ zmqBridgeMamaInboxImpl_getReplySubject (inboxBridge inbox);
}
#endif

#endif /* MAMA_BRIDGE_QPID_INBOX_H__ */
#endif /* MAMA_BRIDGE_ZMQ_INBOX_H__ */
30 changes: 29 additions & 1 deletion src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ zmqBridgeMamaQueue_destroy (queueBridge queue)
status = wombatQueue_destroy (impl->mQueue);
wthread_mutex_unlock (&impl->mDispatchLock);

if (NULL != impl->mClosureCleanupCb && NULL != impl->mClosure)
{
impl->mClosureCleanupCb (impl->mClosure);
}

/* Free the zmqQueueImpl container struct */
free (impl);

Expand Down Expand Up @@ -269,7 +274,7 @@ zmqBridgeMamaQueue_dispatch (queueBridge queue)
{
mama_log (MAMA_LOG_LEVEL_ERROR,
"zmqBridgeMamaQueue_dispatch (): "
"Failed to dispatch Qpid Middleware queue (%d). ",
"Failed to dispatch Zmq Middleware queue (%d). ",
status);
return MAMA_STATUS_PLATFORM;
}
Expand Down Expand Up @@ -485,6 +490,29 @@ zmqBridgeMamaQueue_setLowWatermark (queueBridge queue,
}


/*=========================================================================
= Public implementation functions =
=========================================================================*/

void
zmqBridgeMamaQueueImpl_setClosure (queueBridge queue,
void* closure,
zmqQueueClosureCleanup callback)
{
zmqQueueBridge* impl = (zmqQueueBridge*) queue;
impl->mClosure = closure;
impl->mClosureCleanupCb = callback;
}

void*
zmqBridgeMamaQueueImpl_getClosure (queueBridge queue)
{
zmqQueueBridge* impl = (zmqQueueBridge*) queue;

return impl->mClosure;
}


/*=========================================================================
= Private implementation functions =
=========================================================================*/
Expand Down
2 changes: 1 addition & 1 deletion src/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ zmqBridgeMamaSubscription_create (subscriptionBridge* subscriber,
return MAMA_STATUS_NOMEM;
}

mamaQueue_getNativeHandle(queue, &impl->mQpidQueue);
mamaQueue_getNativeHandle(queue, &impl->mZmqQueue);
impl->mMamaCallback = callback;
impl->mMamaSubscription = subscription;
impl->mMamaQueue = queue;
Expand Down
101 changes: 64 additions & 37 deletions src/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ zmqBridgeMamaTransportImpl_getParameter (const char* defaultVal,
static void*
zmqBridgeMamaTransportImpl_dispatchThread (void* closure);

static void
zmqBridgeMamaTransportImpl_queueClosureCleanupCb (void* closure);

/*=========================================================================
= Public interface implementation functions =
Expand Down Expand Up @@ -285,8 +287,6 @@ zmqBridgeMamaTransport_destroy (transportBridge transport)
endpointPool_destroy (impl->mSubEndpoints);
endpointPool_destroy (impl->mPubEndpoints);

memoryPool_destroy (impl->mMemoryNodePool, NULL);

free (impl);

return status;
Expand All @@ -301,8 +301,6 @@ zmqBridgeMamaTransport_create (transportBridge* result,
mama_status status = MAMA_STATUS_OK;
char* mDefIncoming = NULL;
char* mDefOutgoing = NULL;
long int poolSize = 0;
long int nodeSize = 0;

if (NULL == result || NULL == name || NULL == parent)
{
Expand All @@ -323,28 +321,26 @@ zmqBridgeMamaTransport_create (transportBridge* result,
"zmqBridgeMamaTransport_create(): Initializing Transport %s",
name);

poolSize = atol(zmqBridgeMamaTransportImpl_getParameter (
DEFAULT_MEMPOOL_SIZE,
"%s.%s.%s",
TPORT_PARAM_PREFIX,
name,
TPORT_PARAM_MSG_POOL_SIZE));
impl->mMemoryPoolSize = atol(zmqBridgeMamaTransportImpl_getParameter (
DEFAULT_MEMPOOL_SIZE,
"%s.%s.%s",
TPORT_PARAM_PREFIX,
name,
TPORT_PARAM_MSG_POOL_SIZE));

nodeSize = atol(zmqBridgeMamaTransportImpl_getParameter (
DEFAULT_MEMNODE_SIZE,
"%s.%s.%s",
TPORT_PARAM_PREFIX,
name,
TPORT_PARAM_MSG_NODE_SIZE));
impl->mMemoryNodeSize = atol(zmqBridgeMamaTransportImpl_getParameter (
DEFAULT_MEMNODE_SIZE,
"%s.%s.%s",
TPORT_PARAM_PREFIX,
name,
TPORT_PARAM_MSG_NODE_SIZE));

mama_log (MAMA_LOG_LEVEL_FINE,
"zmqBridgeMamaTransport_create(): Creating a transport mempool "
"for %s with %lu nodes of %lu bytes.",
"zmqBridgeMamaTransport_create(): Any message pools created will "
"contain %lu nodes of %lu bytes.",
name,
poolSize,
nodeSize);

impl->mMemoryNodePool = memoryPool_create (poolSize, nodeSize);
impl->mMemoryPoolSize,
impl->mMemoryNodeSize);

if (0 == strcmp(impl->mName, "pub"))
{
Expand Down Expand Up @@ -847,13 +843,15 @@ zmqBridgeMamaTransportImpl_queueCallback (mamaQueue queue, void* closure)
mama_status status = MAMA_STATUS_OK;
mamaMsg tmpMsg = NULL;
msgBridge bridgeMsg = NULL;
memoryPool* pool = NULL;
memoryNode* node = (memoryNode*) closure;
zmqTransportMsg* tmsg = (zmqTransportMsg*) node->mNodeBuffer;
uint32_t bufferSize = tmsg->mNodeSize;
const void* buffer = tmsg->mNodeBuffer;
const char* subject = (char*)buffer;
zmqSubscription* subscription = (zmqSubscription*) tmsg->mSubscription;
zmqTransportBridge* impl = subscription->mTransport;
zmqQueueBridge* queueImpl = NULL;

/* Can't do anything without a subscriber */
if (NULL == subscription)
Expand Down Expand Up @@ -926,8 +924,11 @@ zmqBridgeMamaTransportImpl_queueCallback (mamaQueue queue, void* closure)
}
}

mamaQueue_getNativeHandle (queue, (void**)&queueImpl);

// Return the memory node to the pool
memoryPool_returnNode (node->mPool, node);
pool = (memoryPool*) zmqBridgeMamaQueueImpl_getClosure ((queueBridge) queueImpl);
memoryPool_returnNode (pool, node);

return;
}
Expand Down Expand Up @@ -1157,28 +1158,43 @@ void* zmqBridgeMamaTransportImpl_dispatchThread (void* closure)
*/
else
{
void* queueClosure = NULL;
size_t memLength = sizeof(zmqTransportMsg) +
zmq_msg_size(&zmsg);
memoryNode* node = memoryPool_getNode (impl->mMemoryNodePool,
memLength);
zmqTransportMsg* tmsg =
(zmqTransportMsg*) node->mNodeBuffer;

tmsg->mNodeBuffer = (uint8_t*)(tmsg + 1);
tmsg->mNodeSize = zmq_msg_size(&zmsg);
queueBridge queueImpl = NULL;
memoryPool* pool = NULL;
memoryNode* node = NULL;
zmqTransportMsg* tmsg = NULL;
size_t memLength = sizeof(zmqTransportMsg) +
zmq_msg_size(&zmsg);

queueImpl = (queueBridge) subscription->mZmqQueue;

/* Get the memory pool from the queue, creating if necessary */
pool = (memoryPool*) zmqBridgeMamaQueueImpl_getClosure (queueImpl);
if (NULL == zmqBridgeMamaQueueImpl_getClosure (queueImpl))
{
pool = memoryPool_create (impl->mMemoryPoolSize,
impl->mMemoryNodeSize);
zmqBridgeMamaQueueImpl_setClosure (
queueImpl,
pool,
zmqBridgeMamaTransportImpl_queueClosureCleanupCb);
}

node = memoryPool_getNode (pool, memLength);

tmsg = (zmqTransportMsg*) node->mNodeBuffer;

tmsg->mNodeBuffer = (uint8_t*)(tmsg + 1);
tmsg->mNodeSize = zmq_msg_size(&zmsg);
tmsg->mSubscription = subscription;

memcpy (tmsg->mNodeBuffer,
zmq_msg_data(&zmsg),
tmsg->mNodeSize);

queueClosure = (void*) node;

zmqBridgeMamaQueue_enqueueEvent (
(queueBridge) subscription->mQpidQueue,
(queueBridge) queueImpl,
zmqBridgeMamaTransportImpl_queueCallback,
queueClosure);
(void*) node);
}
}
}
Expand All @@ -1187,3 +1203,14 @@ void* zmqBridgeMamaTransportImpl_dispatchThread (void* closure)
return NULL;
}

void
zmqBridgeMamaTransportImpl_queueClosureCleanupCb (void* closure)
{
memoryPool* pool = (memoryPool*) closure;
if (NULL != pool)
{
mama_log (MAMA_LOG_LEVEL_FINE,
"Destroying memory pool for queue %p.", closure);
memoryPool_destroy (pool, NULL);
}
}
15 changes: 11 additions & 4 deletions src/zmqdefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <wombat/queue.h>
#include <wombat/mempool.h>
#include "endpointpool.h"
#include "queue.h"

#if defined(__cplusplus)
extern "C" {
Expand Down Expand Up @@ -95,16 +96,17 @@ typedef struct zmqTransportBridge_
mama_status mOmzmqDispatchStatus;
endpointPool_t mSubEndpoints;
endpointPool_t mPubEndpoints;
memoryPool* mMemoryNodePool;
long int mMemoryPoolSize;
long int mMemoryNodeSize;
} zmqTransportBridge;

typedef struct zmqSubscription_
{
mamaMsgCallbacks mMamaCallback;
mamaSubscription mMamaSubscription;
mamaQueue mMamaQueue;
void* mQpidQueue;
zmqTransportBridge* mTransport;
void* mZmqQueue;
zmqTransportBridge* mTransport;
const char* mSymbol;
char* mSubjectKey;
void* mClosure;
Expand All @@ -119,7 +121,7 @@ typedef struct zmqTransportMsg_
{
size_t mNodeSize;
size_t mNodeCapacity;
zmqSubscription* mSubscription;
zmqSubscription* mSubscription;
uint8_t* mNodeBuffer;
} zmqTransportMsg;

Expand All @@ -134,8 +136,13 @@ typedef struct zmqQueueBridge {
mamaQueueEnqueueCB mEnqueueCallback;
void* mClosure;
wthread_mutex_t mDispatchLock;
zmqQueueClosureCleanup mClosureCleanupCb;
void* mZmqContext;
void* mZmqSocketWorker;
void* mZmqSocketDealer;
} zmqQueueBridge;


#if defined(__cplusplus)
}
#endif
Expand Down

0 comments on commit f683cfd

Please sign in to comment.