Skip to content
Permalink
Browse files
Fixed the loading of 32-bit vs. 64-bit implementations.
Refactoring the publisher/subscriber objects.
Fixes [AMQNET-333]. (See https://issues.apache.org/jira/browse/AMQNET-333)
  • Loading branch information
Jim Gomes committed Jul 11, 2011
1 parent 65de9e9 commit a56c3bafa6301ecda0efe31a33edd917740b817f
Showing 8 changed files with 82 additions and 110 deletions.
@@ -90,7 +90,7 @@ private void LoadConnectionFactory()

try
{
factoryAssembly = Assembly.Load(fullFileName);
factoryAssembly = Assembly.LoadFile(fullFileName);
if(null != factoryAssembly)
{
Tracer.DebugFormat("Succesfully loaded provider: {0}", fullFileName);
@@ -126,18 +126,21 @@ private static string[] GetAssemblySearchPaths()
ArrayList pathList = new ArrayList();

// Check the current folder first.
pathList.Add("");
pathList.Add(Environment.CurrentDirectory);

// Check the folder the assembly is located in.
AppDomain currentDomain = AppDomain.CurrentDomain;

// Check the folder the assembly is located in.
pathList.Add(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location));
if(null != currentDomain.BaseDirectory)

// Check the domain's base directory
if(!string.IsNullOrEmpty(currentDomain.BaseDirectory))
{
pathList.Add(currentDomain.BaseDirectory);
}

if(null != currentDomain.RelativeSearchPath)
// Search the domain's relative paths.
if(!string.IsNullOrEmpty(currentDomain.RelativeSearchPath))
{
pathList.Add(currentDomain.RelativeSearchPath);
}
@@ -16,7 +16,7 @@
*/

using System;
using CLRZMQ = ZMQ;
using ZContext = ZMQ.Context;

namespace Apache.NMS.ZMQ
{
@@ -38,7 +38,7 @@ public class Connection : IConnection
/// <summary>
/// ZMQ context
/// </summary>
static private CLRZMQ.Context _context = new CLRZMQ.Context(1);
static private ZContext _context = new ZContext(1);

/// <summary>
/// Starts message delivery for this connection.
@@ -155,7 +155,7 @@ public ProducerTransformerDelegate ProducerTransformer
/// <summary>
/// Gets ZMQ connection context
/// </summary>
static internal CLRZMQ.Context Context
static internal ZContext Context
{
get
{
@@ -16,9 +16,12 @@
*/

using System;
using System.Text;
using System.Threading;
using Apache.NMS.Util;
using CLRZMQ = ZMQ;
using ZSocket = ZMQ.Socket;
using ZSocketType = ZMQ.SocketType;
using ZSendRecvOpt = ZMQ.SendRecvOpt;

namespace Apache.NMS.ZMQ
{
@@ -31,7 +34,14 @@ public class MessageConsumer : IMessageConsumer

private readonly Session session;
private readonly AcknowledgementMode acknowledgementMode;
private ZmqSubscriber messageSubscriber;
/// <summary>
/// Socket object
/// </summary>
private ZSocket messageSubscriber = null;
/// <summary>
/// Context binding string
/// </summary>
private string contextBinding;
private event MessageListener listener;
private int listenerCount = 0;
private Thread asyncDeliveryThread = null;
@@ -45,11 +55,31 @@ public ConsumerTransformerDelegate ConsumerTransformer
set { this.consumerTransformer = value; }
}

public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, ZmqSubscriber messageSubscriber)
public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, IDestination destination, string selector)
{
if(null == Connection.Context)
{
throw new NMSConnectionException();
}

this.session = session;
this.acknowledgementMode = acknowledgementMode;
this.messageSubscriber = messageSubscriber;
this.messageSubscriber = Connection.Context.Socket(ZSocketType.SUB);
if(null == this.messageSubscriber)
{
throw new ResourceAllocationException();
}

string clientId = session.Connection.ClientId;

this.contextBinding = session.Connection.BrokerUri.LocalPath;
if(!string.IsNullOrEmpty(clientId))
{
this.messageSubscriber.StringToIdentity(clientId, Encoding.Unicode);
}

this.messageSubscriber.Connect(contextBinding);
this.messageSubscriber.Subscribe(selector ?? string.Empty, Encoding.ASCII);
}

public event MessageListener Listener
@@ -87,7 +117,7 @@ public IMessage Receive()
IMessage nmsMessage = null;
if(null != messageSubscriber)
{
string messageText = messageSubscriber.Subscriber.Recv(System.Text.Encoding.ASCII, CLRZMQ.SendRecvOpt.NOBLOCK);
string messageText = messageSubscriber.Recv(Encoding.ASCII, ZSendRecvOpt.NOBLOCK);
if(!string.IsNullOrEmpty(messageText))
{
nmsMessage = ToNmsMessage(messageText);
@@ -107,7 +137,7 @@ public IMessage Receive(TimeSpan timeout)
IMessage nmsMessage = null;
if(null != messageSubscriber)
{
string messageText = messageSubscriber.Subscriber.Recv(System.Text.Encoding.ASCII, timeout.Milliseconds);
string messageText = messageSubscriber.Recv(Encoding.ASCII, timeout.Milliseconds);
if(!string.IsNullOrEmpty(messageText))
{
nmsMessage = ToNmsMessage(messageText);
@@ -173,7 +203,7 @@ protected virtual void StartAsyncDelivery()
if(asyncDelivery.CompareAndSet(false, true))
{
asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageSubscriber.Binding;
asyncDeliveryThread.Name = "Message Consumer Dispatch: " + contextBinding;
asyncDeliveryThread.IsBackground = true;
asyncDeliveryThread.Start();
}
@@ -277,7 +307,7 @@ protected virtual IMessage ToNmsMessage(string messageText)
private ZmqMessage ToZmqMessage(string messageText)
{
ZmqMessage message = new ZmqMessage();
message.Destination = new Queue(session.Connection.BrokerUri.LocalPath);
message.Destination = new Queue(this.contextBinding);
message.ClientId = session.Connection.ClientId;
message.Text = messageText;
return message;
@@ -16,6 +16,9 @@
*/

using System;
using ZSocket = ZMQ.Socket;
using ZSocketType = ZMQ.SocketType;
using System.Text;

namespace Apache.NMS.ZMQ
{
@@ -24,11 +27,13 @@ namespace Apache.NMS.ZMQ
/// </summary>
public class MessageProducer : IMessageProducer
{

private readonly Session session;
private Destination destination;
private IDestination destination;

//private long messageCounter;
/// <summary>
/// Socket object
/// </summary>
private ZSocket messageProducer = null;
private MsgDeliveryMode deliveryMode;
private TimeSpan timeToLive;
private MsgPriority priority;
@@ -42,13 +47,24 @@ public ProducerTransformerDelegate ProducerTransformer
set { this.producerTransformer = value; }
}

public MessageProducer(Session session, Destination destination)
public MessageProducer(Connection connection, Session session, IDestination destination)
{
if(null == Connection.Context)
{
throw new NMSConnectionException();
}

this.session = session;
this.destination = destination;
if(destination != null)
this.messageProducer = Connection.Context.Socket(ZSocketType.SUB);

string clientId = connection.ClientId;
if(!string.IsNullOrEmpty(clientId))
{
this.messageProducer.StringToIdentity(clientId, Encoding.Unicode);
}

this.messageProducer.Connect(connection.BrokerUri.LocalPath);
}

public void Send(IMessage message)
@@ -68,16 +84,21 @@ public void Send(IDestination destination, IMessage message)

public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
// TODO: Implement sending a message.
messageProducer.Send();
}

public void Close()
public void Dispose()
{
Close();
}

public void Dispose()
public void Close()
{
Close();
if(null != messageProducer)
{
messageProducer.Dispose();
messageProducer = null;
}
}

public IMessage CreateMessage()
@@ -144,7 +165,7 @@ public TimeSpan RequestTimeout
public IDestination Destination
{
get { return destination; }
set { destination = (Destination) value; }
set { destination = value; }
}

public MsgPriority Priority
@@ -57,7 +57,7 @@ public IMessageProducer CreateProducer()

public IMessageProducer CreateProducer(IDestination destination)
{
throw new NotSupportedException("Producer is not supported/implemented");
return new MessageProducer(connection, this, destination);
}
#endregion

@@ -76,7 +76,7 @@ public IMessageConsumer CreateConsumer(IDestination destination, string selector
{
// Subscriber client reads messages from a publisher and forwards messages
// through the message consumer
return new MessageConsumer(this, acknowledgementMode, new ZmqSubscriber(connection, destination, selector));
return new MessageConsumer(this, acknowledgementMode, destination, selector);
}

public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)

This file was deleted.

@@ -70,7 +70,6 @@
<SubType>Code</SubType>
</Compile>
<Compile Include="src\main\csharp\TextMessage.cs" />
<Compile Include="src\main\csharp\ZmqSubscriber.cs" />
<Compile Include="src\main\csharp\ZmqMessage.cs" />
</ItemGroup>
<ItemGroup>
@@ -70,7 +70,6 @@
<SubType>Code</SubType>
</Compile>
<Compile Include="src\main\csharp\TextMessage.cs" />
<Compile Include="src\main\csharp\ZmqSubscriber.cs" />
<Compile Include="src\main\csharp\ZmqMessage.cs" />
</ItemGroup>
<ItemGroup>

0 comments on commit a56c3ba

Please sign in to comment.