Skip to content
Permalink
Browse files
Refactoring to support multiple producers and consumers. Fixed wire p…
…rotocol format. Added many new unit tests to validate the refactoring, and to give example usage.
  • Loading branch information
Jim Gomes committed Mar 8, 2014
1 parent 727eeaf commit bb8984883146635e9928ae2e5165b70ee6c8c822
Showing 20 changed files with 798 additions and 319 deletions.
@@ -13,7 +13,7 @@
//------------------------------------------------------------------------------

[assembly: ComVisibleAttribute(false)]
[assembly: CLSCompliantAttribute(true)]
[assembly: CLSCompliantAttribute(false)]
[assembly: AssemblyTitleAttribute("Apache NMS for ZMQ Class Library")]
[assembly: AssemblyDescriptionAttribute("Apache NMS for ZMQ Class Library (.Net Messaging Library Implementation): An imp" +
"lementation of the NMS API for ZMQ")]
@@ -17,6 +17,8 @@

using System;
using ZeroMQ;
using System.Collections.Generic;
using System.Text;

namespace Apache.NMS.ZMQ
{
@@ -26,19 +28,43 @@ namespace Apache.NMS.ZMQ
///
public class Connection : IConnection
{
private class ProducerRef
{
public ZmqSocket producer = null;
public int refCount = 1;
}

private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private IRedeliveryPolicy redeliveryPolicy;
private ConnectionMetaData metaData = null;
private bool closed = true;
private string clientId;
private Uri brokerUri;
private string producerContextBinding;
private string consumerContextBinding;

/// <summary>
/// ZMQ context
/// </summary>
private ZmqContext _context = ZmqContext.Create();
private static ZmqContext _context;
private static Dictionary<string, ProducerRef> producerCache;
private static object producerCacheLock;

/// <summary>
static Connection()
{
Connection._context = ZmqContext.Create();
Connection.producerCache = new Dictionary<string, ProducerRef>();
Connection.producerCacheLock = new object();
}

public Connection(Uri connectionUri)
{
this.brokerUri = connectionUri;
this.producerContextBinding = string.Format("{0}://*:{1}", this.brokerUri.Scheme, this.brokerUri.Port);
this.consumerContextBinding = string.Format("{0}://{1}:{2}", brokerUri.Scheme, brokerUri.Host, this.brokerUri.Port);
}

/// <summary>
/// Starts message delivery for this connection.
/// </summary>
public void Start()
@@ -79,15 +105,101 @@ public ISession CreateSession(AcknowledgementMode mode)
return new Session(this, mode);
}

public void Dispose()
internal ZmqSocket GetProducer()
{
ProducerRef producerRef;
string contextBinding = GetProducerContextBinding();

lock(producerCacheLock)
{
if(!producerCache.TryGetValue(contextBinding, out producerRef))
{
producerRef = new ProducerRef();
producerRef.producer = this.Context.CreateSocket(SocketType.PUB);
if(null == producerRef.producer)
{
throw new ResourceAllocationException();
}
producerRef.producer.Bind(contextBinding);
producerCache.Add(contextBinding, producerRef);
}
else
{
producerRef.refCount++;
}
}

return producerRef.producer;
}

internal void ReleaseProducer(ZmqSocket endpoint)
{
// UNREFERENCED_PARAM(endpoint);
ProducerRef producerRef;
string contextBinding = GetProducerContextBinding();

lock(producerCacheLock)
{
if(producerCache.TryGetValue(contextBinding, out producerRef))
{
producerRef.refCount--;
if(producerRef.refCount < 1)
{
producerCache.Remove(contextBinding);
producerRef.producer.Unbind(contextBinding);
}
}
}
}

internal ZmqSocket GetConsumer(Encoding encoding, string destinationName)
{
ZmqSocket endpoint = this.Context.CreateSocket(SocketType.SUB);

if(null == endpoint)
{
throw new ResourceAllocationException();
}
endpoint.Subscribe(encoding.GetBytes(destinationName));
endpoint.Connect(GetConsumerBindingPath());

return endpoint;
}

internal void ReleaseConsumer(ZmqSocket endpoint)
{
endpoint.Disconnect(GetConsumerBindingPath());
}

internal string GetProducerContextBinding()
{
return this.producerContextBinding;
}

private string GetConsumerBindingPath()
{
return this.consumerContextBinding;
}

public void Dispose()
{
Close();
}

public void Close()
{
Stop();
}

lock(producerCacheLock)
{
foreach(KeyValuePair<string, ProducerRef> cacheItem in producerCache)
{
cacheItem.Value.producer.Unbind(cacheItem.Key);
}

producerCache.Clear();
}
}

public void PurgeTempDestinations()
{
@@ -114,7 +226,6 @@ public AcknowledgementMode AcknowledgementMode
public Uri BrokerUri
{
get { return brokerUri; }
set { brokerUri = value; }
}

/// <summary>
@@ -154,7 +265,7 @@ public ProducerTransformerDelegate ProducerTransformer
/// </summary>
internal ZmqContext Context
{
get { return _context; }
get { return Connection._context; }
}

/// <summary>
@@ -16,6 +16,7 @@
*/
using System;
using Apache.NMS.Policies;
using Apache.NMS.Util;

namespace Apache.NMS.ZMQ
{
@@ -25,7 +26,7 @@ namespace Apache.NMS.ZMQ
public class ConnectionFactory : IConnectionFactory
{
private Uri brokerUri;
private string clientID;
private string clientId;
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();

private const string DEFAULT_BROKER_URL = "tcp://localhost:5556";
@@ -36,20 +37,35 @@ public ConnectionFactory()
{
}

public ConnectionFactory(string brokerUri)
: this(brokerUri, null)
public ConnectionFactory(string rawBrokerUri)
: this(rawBrokerUri, null)
{
}

public ConnectionFactory(string brokerUri, string clientID)
: this(new Uri(brokerUri), clientID)
public ConnectionFactory(string rawBrokerUri, string clientID)
: this(URISupport.CreateCompatibleUri(rawBrokerUri), clientID)
{
}

public ConnectionFactory(Uri brokerUri, string clientID)
public ConnectionFactory(Uri rawBrokerUri)
: this(rawBrokerUri, null)
{
this.brokerUri = brokerUri;
this.clientID = clientID;
}

public ConnectionFactory(Uri rawBrokerUri, string clientID)
{
this.BrokerUri = rawBrokerUri;
if(this.BrokerUri.Port < 1)
{
throw new NMSConnectionException("Missing connection port number.");
}

if(null == clientID)
{
clientID = Guid.NewGuid().ToString();
}

this.ClientId = clientID;
}

/// <summary>
@@ -93,26 +109,32 @@ public IConnection CreateConnection(string userName, string password)
/// </summary>
public IConnection CreateConnection(string userName, string password, bool useLogging)
{
IConnection ReturnValue = null;
Connection connection = new Connection();
Connection connection = new Connection(this.BrokerUri);

connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
connection.ConsumerTransformer = this.consumerTransformer;
connection.ProducerTransformer = this.producerTransformer;
connection.BrokerUri = this.BrokerUri;
connection.ClientId = this.clientID;
ReturnValue = connection;

return ReturnValue;
connection.ClientId = this.ClientId;
return connection;
}

/// <summary>
/// Get/or set the broker Uri.
/// </summary>
public Uri BrokerUri
{
get { return brokerUri; }
set { brokerUri = value; }
get { return this.brokerUri; }
set
{
Tracer.InfoFormat("BrokerUri set {0}", value.OriginalString);
this.brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "zmq:"));
}
}

public string ClientId
{
get { return this.clientId; }
set { this.clientId = value; }
}

/// <summary>

0 comments on commit bb89848

Please sign in to comment.